[erlang-questions] eep: New gen_stream module
Jay Nelson
jay@REDACTED
Mon Dec 10 16:47:57 CET 2007
As someone else pointed out, I left out gen_stream.hrl in the
tarball. The web page has been updated.
I excised the main eep text to address Per's questions below.
On Dec 10, 2007, at 4:21 AM, Per Gustafsson wrote:
> some things which are unclear how it would work.
>> Behaviour callbacks:
>
> These are not really the behaviour callbacks, but rather the
> interface to the gen_stream module,
Quite right, my bad. Will incorporate a separate section in the eep
to explain. Inline below is the description.
> I was a little bit confused by this at first, but the code seems to
> indicate that the actual callbacks for a gen_stream behaviour is:
Declaring a gen_stream behaviour means that each process launched by
the gen_stream has to implement the following functions which are
used to extract successive chunks of the stream:
>
> init/3,
Creates an initial state for the process. My example opens a dets
table and passes it back to the gen_stream server as its instance state.
> terminate/1,
Called when the chunking process goes down. This can happen normally
when gen_stream receives its first end_of_stream result (it sends a
stop message to all behaviour processes) or when a failure condition
occurs or an external event causes failure.
> stream_length/0,
> stream_length/1,
Before init/3 is called, the gen_stream determines the stream_length
so that it can report progress and handle length requests. The
argument to the second signature is arbitrary module arguments that
are passed in to init as well. They come from the Options proplist
of the gen_stream:start_link call. The source option includes
{behaviour, Module, ModuleArgs}. The behaviour is free to interpret
them as it wishes.
stream_length should return an integer for a fixed-sized stream, or
an atom typically for another value such as infinite, unknown, or
some other meaningful token for the domain. It is possible to use
any term, such as {fewer_than, 20000}. A call to stream_length will
return the integer, atom or term. A call to pct_complete returns an
integer from 0 to 100 in the case of an integer stream_length, the
actual atom or term of stream_length in other cases.
> extract_block/3,
Given a normal request (there are enough elements to fill the chunk)
for a full 'next_chunk', this function hands back the desired chunk.
> extract_split_block/4,
In the case of a circular data stream, this function is called when a
chunk will span the end of the stream and the beginning of the stream.
> extract_final_block/3,
In the case of a fixed-length data stream, this function is called
when the rest of the stream is less than one chunk size.
> inc_progress/2
Allows the behaviour to define what constitutes a chunk_size when
computing pct_complete. The chunk just extracted and returned to
gen_stream by one of the extract_xxx_block calls, is handed to
inc_progress along with the quantity already seen. The function
should do the math to increment the quantity seen.
Introducing this allowed the possibility of using a non-binary
stream. My example is a dets table that is accessed in sorted order
by visiting its numeric keys from 1 to 20. The return value was
{Key, Object} where object could be anything. I just count the
number of keys returned in each block of results.
>
> I guess that the eep also needs to define what these functions
> should do to make it possible to define gen_stream behaviours
I will write this up and add formal arguments with better descriptions.
>
>
>> start, start_link as in gen_server
>> init(Args, Options) -> Same as gen_server plus list of
>> Options:
>> {stream, {file, path_to_file()} |
>> {binary, binary()} |
>> {behaviour, atom(), ExtraArgs}}
>
> I think it would be nice to add a fourth lightweight option:
>
> {generator, fun(() -> {binary(), fun()} | end_of_stream)}
>
> That is a fun which returns a binary and a new fun which will
> produce the next chunk or an end_of_stream marker, but this might
> not fit with the OTP framework
I was worried about funs and hot code loading. This is very easy to
do without resorting to funs:
Bin = Binary:GeneratorFun(Args),
{ok, P1} = gen_stream:start_link([{binary, Bin}]).
>
>> {chunk_size, integer()} returned sub-binary size,
>> default is ~8K
>
> It would be nice to have a chunk terminator such as newline rather
> than an explicit size or would this be implemented using a
> gen_stream behaviour?
I knew this would be the first question / request. It gets into a
big side debate that I wanted to avoid: just newlines? How about
arbitrary constant? How about a regexp?
Right now I would like to focus on the concept of a serial stream
that cannot fit in memory, doesn't exist physically or is a good
abstraction for a data item you have your hands on. I think that is
generally useful, but feedback from the community and experience with
some example code would provide concrete analysis as to whether it is
in practice as useful as I expect.
For now, the simple solution is to do the line breaking yourself
since it is a serial stream (not all the details are complete below,
but to give you an idea of the complexity):
next_line(GS, PrevChunk) ->
case GetLine(PrevChunk) of
none ->
Chunk = gen_stream:next_chunk(GS),
{NewPrev, Line} = GetLine(PrevChunk,
gen_stream:next_chunk(GS));
{NewPrev, Line} = DesiredResult->
DesiredResult
end.
I don't want to put an arbitrarily long computation inside the
gen_stream as a blocking call after a chunk has been retrieved. It
is better for the application writer to make decisions about serial
blocking. Therefore I need a way to push it down into the gen_stream
chunking processes. I have a solution that allows the stream to be
consumed in "chunks" or in "segments" (a segment is a further
subdivision of a chunk using application logic to extract a
conceptual chunk rather than a size determined chunk), but I don't
want to confuse the proposal or the performance by introducing
segments in the initial implementation.
The biggest thing about the design pattern is that the performance
characteristics should be tunable declaratively by the addition or
removal of processes, buffers or both and that change needs to apply
to segments as well as chunks. Right now there is a segmenting
solution possible on the single process (application writer's) side
of the gen_stream.
>
>> {chunks_per_proc, integer()} num of internal chunks,
>> default is 1
>> {circular, false | true} whether stream repeats,
>> default is false
>> {num_processes, integer()} num_processes used, default 1
>
> It is not clear to me what this means. Is this the number of
> processes which will communicate with the server or the number of
> processes that the server will spawn?
The gen_stream is a single gen_server process and is accessed from
your application as such, in a single process or concurrent manner as
you choose, but with all requests serialized in the gen_stream's
message queue. (Think of it as a center point in a fan-in / fan-out
configuration.)
The chunk_size parameter defines the blocking factor when returning
data and when obtaining it from the stream source. The num_processes
determines how many worker processes are spawned by the gen_stream to
concurrently load up buffers in anticipation of next_chunk requests.
The chunks_per_proc is how many buffers (or chunks) each process
should attempt to maintain.
These parameters allow you to control how much data to read ahead and
how many concurrent processes are contending for a single resource (a
raw binary can be accessed in parallel easily, while 10 separate
file handles may have contention other than the file position
information). Some applications need large blocking factors because
of slow sources, others needs lots of buffers because the request
rate is high.
>
>> next_chunk(Server::pid()) -> binary() | end_of_stream
This can actually return any term in the case of a behaviour, but
binary in the case of file or raw binary source.
>
>
>> pct_complete(Server::pid()) -> integer() | atom()
>> stream_size(Server::pid()) -> integer() | atom()
> can these return any atom or only specific ones e.g. 'infinite' or
> 'error'
They both return the stored stream_size datum when the stream_size is
not an integer. A behaviour can choose to use its own significant
term by returning it to the stream_length call, but raw binary and
file stream can only return integer() or is_circular.
jay
More information about the erlang-questions
mailing list