[erlang-questions] Re: Need parallel calls to gen_server:call, similar but different to gen_server:multi_call

Vlad Dumitrescu vladdu55@REDACTED
Thu Oct 1 11:02:10 CEST 2009


On Thu, Oct 1, 2009 at 10:09, Joe Armstrong <erlang@REDACTED> wrote:
> We should also probably have some abstractions for concurrent pipes.
>
>    pipe([F1, F2, F3, ..., Fn])
>
> in which spawn(F1)  starts a process which sends messages to a process
> started by
> spawn(F2) ... which are consumed by a process that is started by spawn(F3)
>
> The trick would be to handle the queue sizes between the steps in the pipeline
> and deal with out of band data and exceptions.
>
> A naive implementation of pipe is easy but to avoid buffer overruns
> would be inefficient.
> (consider the case where F1 produces messages extremely quickly, but
> F2 is very slow :-)

I wrote a (half-naive and probably over-engineered) implementation of
pipes for some year ago. It does something similar to the backpressure
mechanism Joe mentioned, by providing buffering and automatically
suspending producers if consumer queue is full.

The code is at http://code.google.com/p/erl-pipes/.

The problem with the implementation is that a pipe is implemented as
two processes: one is handling the flow control but messages go
through it and there is a lot of unneeded copying of messages. If flow
control would exist at VM-level, the throughput would be much higher.

<plug class="shameless">
Code example with current API, counting the number of lines in a file
containing "=" and the number of lines that don't contain it (cat
lines, etc are predefined pipe segments, just like in the unix
toolkit). This example might or might not work (but I think it does).

    P1 = pipes:pipe(cat, [FileName]),
    P2 = pipes:pipe(lines),
    P4 = pipes:pipe(grep, ["="]),
    P3 = pipes:pipe(count),
    P5 = pipes:pipe(count),

    pipes:connect([P1, P2, P4, P3]),
    pipes:connect({P4, nomatch}, P5),

    {WithEquals, WithoutEquals} = {pipes:get_result(P3), pipes:get_result(P5)}.

A DSL for building pipes is called for, as the above is way too
inconvenient. Maybe something similar to the Unix-ish

    {WithEquals, WithoutEquals} = pipes:build(
      " cat $FileName |
        lines |
        g: grep \"=\" |
        count > $1
      ",
      " :g/nomatch |
        count > $2
      "
    ).
</plug>

best regards,
Vlad


More information about the erlang-questions mailing list