I/O streams

Richard Carlsson <>
Mon Aug 26 17:44:09 CEST 2002


Here is a hack that I put pogether since it surprised me that
there does not seem to exist a generic stream/pipe implementation
in Erlang. (This is probably because it's not quite as useful in
Erlang as elsewhere, but it is good to have when you otherwise
would be forced to write temporary files.)

If you find this useful, you might want to help clean it up so that
it could become a standard component in the Erlang libraries, or
at least a decent User Contribution at www.erlang.org.

It is supposed to work as a normal Erlang I/O stream, handle multiple
readers/writers, and have an unactivity timeout limit for
self-termination. Read-requests block until they can be satisfied,
either by more characters being written, or by EOF.

I don't claim to be an I/O expert, so please don't flame me too hard.
(Especially if this is already implemented by someone else.)

	/Richard



Richard Carlsson ()   (This space intentionally left blank.)
E-mail: 	WWW: http://user.it.uu.se/~richardc/
-------------- next part --------------
%% Hack-ish implementation of I/O streams in Erlang.
%% Copyright (C) 2002 Richard Carlsson

-module(iostream).

-export([open/0, open/1, open_link/0, open_link/1, string/1, string/2,
	 close/1]).

-compile(export_all).

-define(DEF_TIMEOUT, 60000).

open() ->
    open(?DEF_TIMEOUT).

open(T) ->
    spawn(fun () -> loop({buffer, buf__new()}, T) end).

open_link() ->
    open_link(?DEF_TIMEOUT).

open_link(T) ->
    spawn_link(fun () -> loop({buffer, buf__new()}, T) end).

string(Chars) ->
    string(Chars, ?DEF_TIMEOUT).

string(Chars, T) ->
    spawn(fun () -> loop({buffer, buf__close(buf__new(Chars))}, T) end).

close(Pid) ->
    Pid ! close,
    ok.

kill(Pid) ->
    Pid ! halt,
    ok.

loop(State, Timeout) ->
    receive
	{io_request, From, ReplyAs, Request} when pid(From) ->
	    loop(io_request(Request, From, ReplyAs, State), Timeout);
	close ->
	    loop(close_request(State), Timeout);
	halt ->
	    exit(normal);
	_Other ->
	    loop(State, Timeout)
    after Timeout ->
	    exit(timed_out)
    end.

close_request({buffer, Buf}) ->
    {buffer, buf__close(Buf)};
close_request({transfer, Reqs, Buf}) ->
    Buf1 = buf__close(Buf),
    {R, Reqs1} = buf__get(Reqs),
    transfer(R, eof, Buf1, Reqs1),
    {buffer, Buf1}.

io_request(Req, From, ReplyAs, State) ->
    case handle_request(Req, From, ReplyAs, State) of
	{ok, Reply, State1} ->
	    io_reply(From, ReplyAs, Reply),
	    State1;
	{later, State1} ->
	    State1;
	{error, _Reason, State1} ->
	    State1
    end.

io_reply(From, ReplyAs, Reply) ->
    From ! {io_reply, ReplyAs, Reply}.

handle_request({put_chars, Chars}, _From, _ReplyAs,
	       {buffer, Buf} = State) ->
    case buf__is_open(Buf) of
	true ->
	    Buf1 = buf__write(Chars, Buf),
	    {ok, ok, {buffer, Buf1}};
	false ->
	    {ok, {error, closed}, State}
    end;
handle_request({put_chars, Chars}, _From, _ReplyAs,
	       {transfer, Reqs, Buf} = State) ->
    case buf__is_open(Buf) of
	true ->
	    {R, Reqs1} = buf__get(Reqs),
	    transfer(R, Chars, Buf, Reqs1);
	false ->
	    {ok, {error, closed}, State}
    end;
handle_request({put_chars, Mod, Func, Args}, From, ReplyAs, State) ->
    handle_request({put_chars, catch apply(Mod, Func, Args)},
		   From, ReplyAs, State);
handle_request({get_until, _Prompt, M, F, As}, From, ReplyAs,
	       {buffer, Buf}) ->
    get_until(From, ReplyAs, M, F, As, [], Buf);
handle_request({get_until, _Prompt, M, F, As}, From, ReplyAs,
	       {transfer, Reqs, Buf}) ->
    {later, {transfer, buf__put({From, ReplyAs, M, F, As, []}, Reqs), Buf}};
handle_request({requests, Reqs}, From, ReplyAs, State) ->
    io_requests(Reqs, From, ReplyAs, {ok, ok, State});
handle_request(R, _From, _ReplyAs, State) ->
    {ok, {error, {request, R}}, State}.

%%  Process a list of output requests as long as the status is 'ok'.

io_requests([R | Rs], From, ReplyAs, {ok, _, State}) ->
    io_requests(Rs, From, ReplyAs, io_request(R, From, ReplyAs, State));
io_requests(_, _, _, Final) ->
    Final.

%% Beginning a 'get_until' request received in "buffer" state.

get_until(From, ReplyAs, Mod, Func, Args, Cont, Buf) ->
    {Chars, Buf1} =  buf__read(Buf),
    case catch apply(Mod, Func, [Cont, Chars | Args]) of
	{done, Result, Chars1} ->
	    {ok, Result, {buffer, buf__push(Chars1, Buf1)}};
	{more, Cont1} ->
	    case buf__is_open(Buf1) of
		true ->
		    {later,
		     {transfer,
		      buf__new([{From, ReplyAs, Mod, Func, Args, Cont1}]),
		      Buf1}};
		false ->
		    get_until(From, ReplyAs, Mod, Func, Args, Cont1, Buf1)
	    end;
	_Other ->
	    {error, {error, {Mod, Func, Args}},
	     {buffer, buf__push(Chars, Buf1)}}
    end.

%% Resuming an active 'get_until' request when new chars (or EOF) have
%% arrived. (Note that the buffer is empty here.)

transfer(R, [], Buf, Reqs) ->
    %% Don't send empty strings to the consumer function;
    %% instead return directly and wait for more characters.
    {ok, ok, {transfer, buf__push([R], Reqs), Buf}};
transfer({From, ReplyAs, Mod, Func, Args, Cont}, Chars, Buf, Reqs) ->
    case catch apply(Mod, Func, [Cont, Chars | Args]) of
	{done, Result, Chars1} ->
	    io_reply(From, ReplyAs, Result),  %% reply to read-request
	    case buf__get(Reqs) of
		{empty, _} ->
		    {ok, ok, {buffer, buf__push(Chars1, Buf)}};
		{R, Reqs1} ->
		    transfer(R, Chars1, Buf, Reqs1)
	    end;
	{more, Cont1} ->
	    {ok, ok,
	     {transfer,
	      buf__push([{From, ReplyAs, Mod, Func, Args, Cont1}], Reqs),
	      Buf}};
	_Other ->
	    {error, {error, {Mod, Func, Args}},
	     {buffer, buf__push(Chars, Buf)}}
    end.


%% An efficient functional buffer with 1-level constant time push-back.

buf__new() ->
    {[], [], [], true}.

buf__new(Ps) ->
    {[], [], Ps, true}.

buf__close({In, Out, Ps, _}) ->
    {In, Out, Ps, false}.

buf__is_open({_, _, _, Open}) ->
    Open.

buf__put(X, {In, Out, Ps, true}) ->
    {[X | In], Out, Ps, true}.

buf__write(List, {In, Out, Ps, true}) ->
    {flatrev(List, In), Out, Ps, true}.

buf__get({In, Out, [X | Ps], Open}) ->
    {X, {In, Out, Ps, Open}};
buf__get({In, [X | Out], [], Open}) ->
    {X, {In, Out, [], Open}};
buf__get({[], [], [], true} = Buf) ->
    {empty, Buf};
buf__get({[], [], [], false} = Buf) ->
    {eof, Buf};
buf__get({In, [], [], Open}) ->
    [X | Out] = lists:reverse(In),
    {X, {[], Out, [], Open}}.

buf__read({[], [], [], false} = Buf) ->
    {eof, Buf};
buf__read({[], [], [], true} = Buf) ->
    {[], Buf};
buf__read({[], [], Ps, Open}) ->
    {Ps, {[], [], [], Open}};
buf__read({In, Out, Ps, Open}) ->
    {Ps ++ lists:reverse(In, Out), {[], [], [], Open}}.

buf__push(Ps, {In, Out, [], Open}) ->
    {In, Out, Ps, Open};
buf__push(Ps, {In, Out, Ps1, Open}) ->
    {In, Out, Ps ++ Ps1, Open}.

flatrev(Xs, Tail) ->
    flatrev(Xs, [], Tail).

flatrev([L | Xs], Ls, Tail) when list(L) ->
    flatrev(L, [Xs | Ls], Tail);
flatrev([X | Xs], Ls, Tail) ->
    flatrev(Xs, Ls, [X | Tail]);
flatrev([], [Xs | Ls], Tail) ->
    flatrev(Xs, Ls, Tail);
flatrev([], [], Tail) ->
    Tail.


More information about the erlang-questions mailing list