[erlang-questions] eep: New gen_stream module

Jay Nelson <>
Tue Dec 11 07:07:47 CET 2007


Per wrote:

 > I think it would be nice to add a fourth lightweight option:

 > {generator, fun(() -> {binary(), fun()} | end_of_stream)}

 > That is a fun which returns a binary and a new fun which will
 > produce the next chunk or an end_of_stream marker, but this
 > might not fit with the OTP framework

To which I replied:

 > I was worried about funs and hot code loading.  This is very
 > easy to do without resorting to funs:

 > Bin = Binary:GeneratorFun(Args),
 > {ok, P1} = gen_stream:start_link([{binary, Bin}]).

My reply was not equivalent to what Per was requesting,
especially in the case where the fun is an infinite stream.

I chose a partitioning of the problem domain, rather than a
continuation-based approach as Per requested, because the latter is a
sequential approach.  By partitioning the stream, concurrent processes
can buffer their assigned chunks ahead of time.  With a
continuation-based approach, the future funs are not known until the
earlier chunks are fully extracted.

Another problem with doing a quick fun approach, is that the stream
could not be made circular even if it was of fixed size, because there
would be no technique for getting the stream length or indicating
progress / pct_complete.  Not really a problem, but then the
continuation approach has some additional restrictions that the other
sources don't have.

In my (lame, but functional) behaviour example code, I have a module
called odd_nums.erl which returns an infinite stream of odd numbers.
A given subsequence of the stream can be computed without any state
except the starting point and the chunk size.  There is another module
called sorted_dets.erl which requires the open table reference as
state, but again can address to a particular conceptual position and
extract a chunk without the previous chunks having been extracted
yet.  I do not have an example which updates the state, although the
gen_server as written supports this approach, partly because I
couldn't think of one that would be useful.

A function continuation could be a state that is updated when a chunk
is returned, but it is very difficult to coordinate properly in the
face of concurrency.  The worker processes that are spawned by the
gen_stream are addressed in round-robin fashion so that successive
messages handled by the gen_stream are given to different workers so
that each worker will have time to replenish its delivered buffer.
Changing the parameter for the number of processes should not affect
the resulting stream, so the continutation would need to return the
function that gets its next chunk as current_chunk + N where N is the
number of spawned processes.  It would be difficult to find useful
problems where this would be possible and I believe all of those can
be transformed into a full-blown behaviour without much effort --
providing the stream_length and pct_complete options, plus circularity
if it is a fixed-length functional stream.

jay




More information about the erlang-questions mailing list