[erlang-questions] [ANN] Asynchronous PostgreSQL driver

Tim Watson watson.timothy@REDACTED
Wed Nov 2 04:21:35 CET 2011


I've found the gen_servers that use cast heavily can easily get overwhelmed
without back pressure. Also you might consider stoping use of the proplists
module - lists:keyfind/3 and the other lists module functions are much
faster (implemented in C IIRC) and the API is just as easy to use.

Now honestly, you're really missing a trick with this change. Does the
client (pid) get a message as soon as data comes off the wire and a row has
been successfully decoded? Because that is *far* more useful that just
happening not to block the client pid while you go off and fetch everything
in one go. If you either allow the client to pass a callback or simply send
a message for each DataRow, you will have provided a boon for fast web
applications.

Then I can stream the query results directly to my client as they come off
the wire without much further ado, which for a surprising number of CRUD
applications is very commonplace.

I already have some code that does this:

handle_http(Req) ->
    Req:stream(head, [{"Content-Type", "text/xml"}]),

    W = xml_writer:new(fun(D) -> Req:stream(D) end),
    Writer = xml_writer:start_element("obj-graph", W),

  #assoc{ writer=Done } =
      repository:exec(ogql:parse(Req),
          fun build_tree/2, #assoc{ writer=Writer }),

    xml_writer:close(Done),

    Req:stream(close).


build_tree({_, Obj={Type, Key, Name, Vsn, _, _, Lvl, _, Ctx, Node}},
             #assoc{ depth=Depth, writer=Writer }) ->
    case Lvl of
        X when X == Depth ->
            #assoc{ depth=X, writer=write_object(Obj, Writer, fun
xml_writer:write_sibling/2) };
        Y when Y > Depth orelse Y == 1 ->
            #assoc{ depth=Y, writer=write_object(Obj, Writer, fun
xml_writer:write_child/2) };
        Lt when Lt < Depth ->
            Ready = lists:foldl(
                fun(_, Acc) ->
                    xml_writer:end_element(Acc)
                end, Writer, lists:seq(0, Depth - Lt)),
            #assoc{ depth=Lt, writer=write_object(Obj, Ready, fun
xml_writer:write_child/2) }
    end.

And some code tucked away in a gen_server that does exactly what I want
with the build_tree/2 callback:

handle_call({get, Query, RowCallback, InitVal}, _From, Connection) ->
    case pgsql:equery(Connection, Query) of
        {ok, Columns, Rows} ->
            DataSet = lists:foldl(fun(Row, Acc) ->
RowCallback({Columns, Row}, Acc) end, InitVal, Rows),
            {reply, DataSet, Connection};
        Other ->
            io:format("Bad Result: ~p~n", [Other]),
            {reply, error, Connection}
    end;


But it would be absolutely super-awesome-better if I could just go:

pgsql:equery(Connection, Query, RowCallback)

Naturally you sometimes want to use a callback that collects up (and maybe
transforms) rows, and other times you just want to *do something* with the
row, as in my example code above (where I serialise it to the client
immediately).

This code currently has to wait for the entire result set to become
available before it can start returning the data, but it does a good job of
returning around 15k rows back to the client in a structured tree-like
hierarchy in around 1.5 ~ 2.0 seconds on average. Not having to queue up
the intermediate data would improve on this I'm sure, as well as reducing
memory consumption for larger datasets, which typically hold more like 100k
rows. As you can see from the build_tree function, the query provides
enough information for the serialisation to remain completely ignorant of
the structure of the data and it simply relies of the "Level" column to
figure out whether to move up or down in the XML document.

Also, I would be wary of pooling postgres connections, as there is a
considerable impact on the resource usage for the database server when you
do this. There are pooling solutions out there (such as pgbouncer and
pgpool), which will act as a broker to which you can connect and do all the
multiplexing for you. IMO it's unlikely you'll improve on what they're
doing by caching connections in the client.

Cheers,

Tim

On 1 November 2011 06:18, Anton Lebedevich <mabrek@REDACTED> wrote:

> Hello,
>
> There is asynchronous fork of Will Glozer's epgsql
>
> https://github.com/mabrek/epgsql branch name 'async'
>
> Asynchronous API is implemented by pgsql_sock.erl, see pgsql.erl for
> examples.
> It passes all tests from original driver except 3 timeout tests.
> Backward compatibility is preserved by module pgsql.
> Differences from original driver:
>  + internal queue of client requests, so you don't need to wait
> response to send next request
>  + responses are delivered as regular erlang messages
>  + single process to hold driver state and receive socket data
>  + slight code reorganization
>
> It's an early release and it has many things to do. Any suggestions on
> API and implementation are welcome.
>
> Regards,
> Anton Lebedevich.
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@REDACTED
> http://erlang.org/mailman/listinfo/erlang-questions
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20111102/503256eb/attachment.htm>


More information about the erlang-questions mailing list