[erlang-questions] [ANN] Asynchronous PostgreSQL driver

Tim Watson <>
Wed Nov 2 04:32:54 CET 2011


Good grief, sorry about the formatting. I didn't realise I'd preserved it
when I copied and pasted that in. :/

On 2 November 2011 03:21, Tim Watson <> wrote:

> I've found the gen_servers that use cast heavily can easily get
> overwhelmed without back pressure. Also you might consider stoping use of
> the proplists module - lists:keyfind/3 and the other lists module functions
> are much faster (implemented in C IIRC) and the API is just as easy to use.
>
> Now honestly, you're really missing a trick with this change. Does the
> client (pid) get a message as soon as data comes off the wire and a row has
> been successfully decoded? Because that is *far* more useful that just
> happening not to block the client pid while you go off and fetch everything
> in one go. If you either allow the client to pass a callback or simply send
> a message for each DataRow, you will have provided a boon for fast web
> applications.
>
> Then I can stream the query results directly to my client as they come off
> the wire without much further ado, which for a surprising number of CRUD
> applications is very commonplace.
>
> I already have some code that does this:
>
> handle_http(Req) ->
>     Req:stream(head, [{"Content-Type", "text/xml"}]),
>
>     W = xml_writer:new(fun(D) -> Req:stream(D) end),
>     Writer = xml_writer:start_element("obj-graph", W),
>
>   #assoc{ writer=Done } =
>       repository:exec(ogql:parse(Req),
>           fun build_tree/2, #assoc{ writer=Writer }),
>
>     xml_writer:close(Done),
>
>     Req:stream(close).
>
>
> build_tree({_, Obj={Type, Key, Name, Vsn, _, _, Lvl, _, Ctx, Node}},
>              #assoc{ depth=Depth, writer=Writer }) ->
>     case Lvl of
>         X when X == Depth ->
>             #assoc{ depth=X, writer=write_object(Obj, Writer, fun xml_writer:write_sibling/2) };
>         Y when Y > Depth orelse Y == 1 ->
>             #assoc{ depth=Y, writer=write_object(Obj, Writer, fun xml_writer:write_child/2) };
>         Lt when Lt < Depth ->
>             Ready = lists:foldl(
>                 fun(_, Acc) ->
>                     xml_writer:end_element(Acc)
>                 end, Writer, lists:seq(0, Depth - Lt)),
>             #assoc{ depth=Lt, writer=write_object(Obj, Ready, fun xml_writer:write_child/2) }
>     end.
>
> And some code tucked away in a gen_server that does exactly what I want
> with the build_tree/2 callback:
>
> handle_call({get, Query, RowCallback, InitVal}, _From, Connection) ->
>     case pgsql:equery(Connection, Query) of
>         {ok, Columns, Rows} ->
>             DataSet = lists:foldl(fun(Row, Acc) -> RowCallback({Columns, Row}, Acc) end, InitVal, Rows),
>             {reply, DataSet, Connection};
>         Other ->
>             io:format("Bad Result: ~p~n", [Other]),
>             {reply, error, Connection}
>     end;
>
>
> But it would be absolutely super-awesome-better if I could just go:
>
> pgsql:equery(Connection, Query, RowCallback)
>
> Naturally you sometimes want to use a callback that collects up (and maybe
> transforms) rows, and other times you just want to *do something* with the
> row, as in my example code above (where I serialise it to the client
> immediately).
>
> This code currently has to wait for the entire result set to become
> available before it can start returning the data, but it does a good job of
> returning around 15k rows back to the client in a structured tree-like
> hierarchy in around 1.5 ~ 2.0 seconds on average. Not having to queue up
> the intermediate data would improve on this I'm sure, as well as reducing
> memory consumption for larger datasets, which typically hold more like 100k
> rows. As you can see from the build_tree function, the query provides
> enough information for the serialisation to remain completely ignorant of
> the structure of the data and it simply relies of the "Level" column to
> figure out whether to move up or down in the XML document.
>
> Also, I would be wary of pooling postgres connections, as there is a
> considerable impact on the resource usage for the database server when you
> do this. There are pooling solutions out there (such as pgbouncer and
> pgpool), which will act as a broker to which you can connect and do all the
> multiplexing for you. IMO it's unlikely you'll improve on what they're
> doing by caching connections in the client.
>
> Cheers,
>
> Tim
>
> On 1 November 2011 06:18, Anton Lebedevich <> wrote:
>
>> Hello,
>>
>> There is asynchronous fork of Will Glozer's epgsql
>>
>> https://github.com/mabrek/epgsql branch name 'async'
>>
>> Asynchronous API is implemented by pgsql_sock.erl, see pgsql.erl for
>> examples.
>> It passes all tests from original driver except 3 timeout tests.
>> Backward compatibility is preserved by module pgsql.
>> Differences from original driver:
>>  + internal queue of client requests, so you don't need to wait
>> response to send next request
>>  + responses are delivered as regular erlang messages
>>  + single process to hold driver state and receive socket data
>>  + slight code reorganization
>>
>> It's an early release and it has many things to do. Any suggestions on
>> API and implementation are welcome.
>>
>> Regards,
>> Anton Lebedevich.
>> _______________________________________________
>> erlang-questions mailing list
>> 
>> http://erlang.org/mailman/listinfo/erlang-questions
>>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20111102/024cf7ce/attachment.html>


More information about the erlang-questions mailing list