[erlang-questions] eep: New gen_stream module

Jay Nelson jay@REDACTED
Mon Dec 10 16:47:57 CET 2007

As someone else pointed out, I left out gen_stream.hrl in the  
tarball.  The web page has been updated.

I excised the main eep text to address Per's questions below.

On Dec 10, 2007, at 4:21 AM, Per Gustafsson wrote:

> some things which are unclear how it would work.

>>      Behaviour callbacks:
> These are not really the behaviour callbacks, but rather the  
> interface to the gen_stream module,

Quite right, my bad.  Will incorporate a separate section in the eep  
to explain.  Inline below is the description.

> I was a little bit confused by this at first, but the code seems to  
> indicate that the actual callbacks for a gen_stream behaviour is:

Declaring a gen_stream behaviour means that each process launched by  
the gen_stream has to implement the following functions which are  
used to extract successive chunks of the stream:

> init/3,

Creates an initial state for the process.  My example opens a dets  
table and passes it back to the gen_stream server as its instance state.

> terminate/1,

Called when the chunking process goes down.  This can happen normally  
when gen_stream receives its first end_of_stream result (it sends a  
stop message to all behaviour processes) or when a failure condition  
occurs or an external event causes failure.

> stream_length/0,
> stream_length/1,

Before init/3 is called, the gen_stream determines the stream_length  
so that it can report progress and handle length requests.  The  
argument to the second signature is arbitrary module arguments that  
are passed in to init as well.  They come from the Options proplist  
of the gen_stream:start_link call.  The source option includes  
{behaviour, Module, ModuleArgs}.  The behaviour is free to interpret  
them as it wishes.

stream_length should return an integer for a fixed-sized stream, or  
an atom typically for another value such as infinite, unknown, or  
some other meaningful token for the domain.  It is possible to use  
any term, such as {fewer_than, 20000}.  A call to stream_length will  
return the integer, atom or term.  A call to pct_complete returns an  
integer from 0 to 100 in the case of an integer stream_length, the  
actual atom or term of stream_length in other cases.

> extract_block/3,

Given a normal request (there are enough elements to fill the chunk)  
for a full 'next_chunk', this function hands back the desired chunk.

> extract_split_block/4,

In the case of a circular data stream, this function is called when a  
chunk will span the end of the stream and the beginning of the stream.

> extract_final_block/3,

In the case of a fixed-length data stream, this function is called  
when the rest of the stream is less than one chunk size.

> inc_progress/2

Allows the behaviour to define what constitutes a chunk_size when  
computing pct_complete.  The chunk just extracted and returned to  
gen_stream by one of the extract_xxx_block calls, is handed to  
inc_progress along with the quantity already seen.  The function  
should do the math to increment the quantity seen.

Introducing this allowed the possibility of using a non-binary  
stream.  My example is a dets table that is accessed in sorted order  
by visiting its numeric keys from 1 to 20.  The return value was  
{Key, Object} where object could be anything.  I just count the  
number of keys returned in each block of results.

> I guess that the eep also needs to define what these functions  
> should do to make it possible to define gen_stream behaviours

I will write this up and add formal arguments with better descriptions.

>>          start, start_link as in gen_server
>>          init(Args, Options) -> Same as gen_server plus list of  
>> Options:
>>              {stream, {file, path_to_file()} |
>>                       {binary, binary()} |
>>                       {behaviour, atom(), ExtraArgs}}
> I think it would be nice to add a fourth lightweight option:
> {generator, fun(() -> {binary(), fun()} | end_of_stream)}
> That is a fun which returns a binary and a new fun which will  
> produce the next chunk or an end_of_stream marker, but this might  
> not fit with the OTP framework

I was worried about funs and hot code loading.  This is very easy to  
do without resorting to funs:

Bin = Binary:GeneratorFun(Args),
{ok, P1} = gen_stream:start_link([{binary, Bin}]).

>>              {chunk_size, integer()} returned sub-binary size,   
>> default is ~8K
> It would be nice to have a chunk terminator such as newline rather  
> than an explicit size or would this be implemented using a  
> gen_stream behaviour?

I knew this would be the first question / request.  It gets into a  
big side debate that I wanted to avoid:  just newlines?  How about  
arbitrary constant?  How about a regexp?

Right now I would like to focus on the concept of a serial stream  
that cannot fit in memory, doesn't exist physically or is a good  
abstraction for a data item you have your hands on.  I think that is  
generally useful, but feedback from the community and experience with  
some example code would provide concrete analysis as to whether it is  
in practice as useful as I expect.

For now, the simple solution is to do the line breaking yourself  
since it is a serial stream (not all the details are complete below,  
but to give you an idea of the complexity):

next_line(GS, PrevChunk) ->
      case GetLine(PrevChunk) of
          none ->
              Chunk = gen_stream:next_chunk(GS),
              {NewPrev, Line} = GetLine(PrevChunk,  
          {NewPrev, Line} = DesiredResult->

I don't want to put an arbitrarily long computation inside the  
gen_stream as a blocking call after a chunk has been retrieved.  It  
is better for the application writer to make decisions about serial  
blocking.  Therefore I need a way to push it down into the gen_stream  
chunking processes.  I have a solution that allows the stream to be  
consumed in "chunks" or in "segments" (a segment is a further  
subdivision of a chunk using application logic to extract a  
conceptual chunk rather than a size determined chunk), but I don't  
want to confuse the proposal or the performance by introducing  
segments in the initial implementation.

The biggest thing about the design pattern is that the performance  
characteristics should be tunable declaratively by the addition or  
removal of processes, buffers or both and that change needs to apply  
to segments as well as chunks.  Right now there is a segmenting  
solution possible on the single process (application writer's) side  
of the gen_stream.

>>              {chunks_per_proc, integer()} num of internal chunks,   
>> default is 1
>>              {circular, false | true} whether stream repeats,  
>> default  is false
>> 	    {num_processes, integer()} num_processes used, default 1
> It is not clear to me what this means. Is this the number of  
> processes which will communicate with the server or the number of  
> processes that the server will spawn?

The gen_stream is a single gen_server process and is accessed from  
your application as such, in a single process or concurrent manner as  
you choose, but with all requests serialized in the gen_stream's  
message queue.  (Think of it as a center point in a fan-in / fan-out  

The chunk_size parameter defines the blocking factor when returning  
data and when obtaining it from the stream source.  The num_processes  
determines how many worker processes are spawned by the gen_stream to  
concurrently load up buffers in anticipation of next_chunk requests.   
The chunks_per_proc is how many buffers (or chunks) each process  
should attempt to maintain.

These parameters allow you to control how much data to read ahead and  
how many concurrent processes are contending for a single resource (a  
raw binary can be accessed in parallel easily,  while 10 separate  
file handles may have contention other than the file position  
information).  Some applications need large blocking factors because  
of slow sources, others needs lots of buffers because the request  
rate is high.

>>          next_chunk(Server::pid()) -> binary() | end_of_stream

This can actually return any term in the case of a behaviour, but  
binary in the case of file or raw binary source.

>>          pct_complete(Server::pid()) -> integer() | atom()
>>          stream_size(Server::pid()) -> integer() | atom()
> can these return any atom or only specific ones e.g. 'infinite' or  
> 'error'

They both return the stored stream_size datum when the stream_size is  
not an integer.  A behaviour can choose to use its own significant  
term by returning it to the stream_length call, but raw binary and  
file stream can only return integer() or is_circular.


More information about the erlang-questions mailing list