%% ===================================================================== %% I/O streams/pipes in Erlang. %% %% Copyright (C) 2002 Richard Carlsson %% %% This library is free software; you can redistribute it and/or modify %% it under the terms of the GNU Lesser General Public License as %% published by the Free Software Foundation; either version 2 of the %% License, or (at your option) any later version. %% %% This library is distributed in the hope that it will be useful, but %% WITHOUT ANY WARRANTY; without even the implied warranty of %% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %% Lesser General Public License for more details. %% %% You should have received a copy of the GNU Lesser General Public %% License along with this library; if not, write to the Free Software %% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 %% USA %% %% Author contact: richardc@csd.uu.se %% %% --------------------------------------------------------------------- -module(iostream). -export([open/0, open/1, open_link/0, open_link/1, string/1, string/2, close/1]). -compile(export_all). %% The default timeout is 'infinity', i.e., someone has to be %% responsible for making the I/O stream process terminate when it is %% not needed anymore. Typically, the reader does this. -define(DEF_TIMEOUT, 'infinity'). %% Creates a new I/O stream. Uses default timeout. open() -> open(?DEF_TIMEOUT). %% Uses given timeout. T is milliseconds or 'infinity'. open(T) -> {ok, spawn(fun () -> loop({buffer, buf__new()}, T) end)}. %% Like open/0, but also links stream process to creator process. open_link() -> open_link(?DEF_TIMEOUT). %% Like open/1, but also links stream process to creator process. open_link(T) -> {ok, spawn_link(fun () -> loop({buffer, buf__new()}, T) end)}. %% Creates a closed stream containing Chars. Uses default timeout. string(Chars) -> string(Chars, ?DEF_TIMEOUT). %% Like string/1, but uses the given timeout. string(Chars, T) -> spawn(fun () -> loop({buffer, buf__close(buf__new(Chars))}, T) end). %% Closes the stream. The stream process stays alive until the timeout. close(Pid) -> Pid ! close, ok. %% Terminates the stream process. kill(Pid) -> Pid ! exit, ok. %% ------------------------------------------------------------------ %% This is the main server loop for the I/O stream process. %% %% The server has two states: buffering and transferring. In the %% buffering state, written characters are simply queued up. If a read %% request can be satisfied, the server returns to the buffering state %% afterwards. The transfer state is entered if a read request needs %% more input. Any writes will be passed directly to the reader, until %% it is satisfied. Meanwhile, any additional read requests are queued. %% When the last read request is satisfied, the process returns to the %% buffering state. %% %% If there is no activity for the given 'timeout limit' period, the I/O %% process terminates. By default the timeout is 'infinity'. loop(State, Timeout) -> receive {io_request, From, ReplyAs, Request} when pid(From) -> loop(io_request(Request, From, ReplyAs, State), Timeout); close -> loop(close_request(State), Timeout); exit -> exit(normal); _Other -> loop(State, Timeout) after Timeout -> exit(timed_out) end. close_request({buffer, Buf}) -> {buffer, buf__close(Buf)}; close_request({transfer, Reqs, Buf}) -> Buf1 = buf__close(Buf), {R, Reqs1} = buf__get(Reqs), transfer(R, eof, Buf1, Reqs1), {buffer, Buf1}. io_request(Req, From, ReplyAs, State) -> case handle_request(Req, From, ReplyAs, State) of {ok, Reply, State1} -> io_reply(From, ReplyAs, Reply), State1; {later, State1} -> State1; {error, _Reason, State1} -> State1 end. io_reply(From, ReplyAs, Reply) -> From ! {io_reply, ReplyAs, Reply}. handle_request({put_chars, Chars}, _From, _ReplyAs, {buffer, Buf} = State) -> case buf__is_open(Buf) of true -> Buf1 = buf__write(Chars, Buf), {ok, ok, {buffer, Buf1}}; false -> {ok, {error, closed}, State} end; handle_request({put_chars, Chars}, _From, _ReplyAs, {transfer, Reqs, Buf} = State) -> case buf__is_open(Buf) of true -> {R, Reqs1} = buf__get(Reqs), transfer(R, Chars, Buf, Reqs1); false -> {ok, {error, closed}, State} end; handle_request({put_chars, Mod, Func, Args}, From, ReplyAs, State) -> handle_request({put_chars, catch apply(Mod, Func, Args)}, From, ReplyAs, State); handle_request({get_until, _Prompt, M, F, As}, From, ReplyAs, {buffer, Buf}) -> get_until(From, ReplyAs, M, F, As, [], Buf); handle_request({get_until, _Prompt, M, F, As}, From, ReplyAs, {transfer, Reqs, Buf}) -> {later, {transfer, buf__put({From, ReplyAs, M, F, As, []}, Reqs), Buf}}; handle_request({requests, Reqs}, From, ReplyAs, State) -> io_requests(Reqs, From, ReplyAs, {ok, ok, State}); handle_request(R, _From, _ReplyAs, State) -> {ok, {error, {request, R}}, State}. %% Process a list of output requests as long as the status is 'ok'. io_requests([R | Rs], From, ReplyAs, {ok, _, State}) -> io_requests(Rs, From, ReplyAs, io_request(R, From, ReplyAs, State)); io_requests(_, _, _, Final) -> Final. %% Beginning a 'get_until' request received in "buffer" state. get_until(From, ReplyAs, Mod, Func, Args, Cont, Buf) -> io:fwrite("Get until. ~p\n", [Buf]), {Chars, Buf1} = buf__read(Buf), case catch apply(Mod, Func, [Cont, Chars | Args]) of {done, Result, Chars1} -> io:fwrite("Done: ~p, ~p.\n", [Result, Chars1]), {ok, Result, {buffer, buf__push(Chars1, Buf1)}}; {more, Cont1} -> io:fwrite("More: ~p.\n", [Cont1]), case buf__is_open(Buf1) of true -> {later, {transfer, buf__new([{From, ReplyAs, Mod, Func, Args, Cont1}]), Buf1}}; false -> get_until(From, ReplyAs, Mod, Func, Args, Cont1, Buf1) end; _Other -> {error, {error, {Mod, Func, Args}}, {buffer, buf__push(Chars, Buf1)}} end. %% Resuming an active 'get_until' request when new chars (or EOF) have %% arrived. Note that the buffer is empty here. If there are chars over %% when we have processed all queued read requests, these are pushed %% back onto the buffer before returning. transfer(R, [], Buf, Reqs) -> %% Don't send empty strings to the consumer function; %% instead return directly and wait for more characters. {ok, ok, {transfer, buf__push([R], Reqs), Buf}}; transfer({From, ReplyAs, Mod, Func, Args, Cont}, Chars, Buf, Reqs) -> case catch apply(Mod, Func, [Cont, Chars | Args]) of {done, Result, Chars1} -> io_reply(From, ReplyAs, Result), %% reply to read-request case buf__get(Reqs) of {empty, _} -> {ok, ok, {buffer, buf__push(Chars1, Buf)}}; {R, Reqs1} -> transfer(R, Chars1, Buf, Reqs1) end; {more, Cont1} -> {ok, ok, {transfer, buf__push([{From, ReplyAs, Mod, Func, Args, Cont1}], Reqs), Buf}}; _Other -> {error, {error, {Mod, Func, Args}}, {buffer, buf__push(Chars, Buf)}} end. %% An efficient functional buffer with 1-level constant time push-back. buf__new() -> {[], [], [], true}. buf__new(Ps) -> {[], [], Ps, true}. buf__close({In, Out, Ps, _}) -> {In, Out, Ps, false}. buf__is_open({_, _, _, Open}) -> Open. buf__put(X, {In, Out, Ps, true}) -> {[X | In], Out, Ps, true}. buf__write(List, {In, Out, Ps, true}) -> {flatrev(List, In), Out, Ps, true}. buf__get({In, Out, [X | Ps], Open}) -> {X, {In, Out, Ps, Open}}; buf__get({In, [X | Out], [], Open}) -> {X, {In, Out, [], Open}}; buf__get({[], [], [], true} = Buf) -> {empty, Buf}; buf__get({[], [], [], false} = Buf) -> {eof, Buf}; buf__get({In, [], [], Open}) -> [X | Out] = lists:reverse(In), {X, {[], Out, [], Open}}. buf__read({[], [], [], false} = Buf) -> {eof, Buf}; buf__read({[], [], [], true} = Buf) -> {[], Buf}; buf__read({[], [], Ps, Open}) -> {Ps, {[], [], [], Open}}; buf__read({In, Out, Ps, Open}) -> {Ps ++ lists:reverse(In, Out), {[], [], [], Open}}. buf__push(Ps, {In, Out, [], Open}) -> {In, Out, Ps, Open}; buf__push(Ps, {In, Out, Ps1, Open}) -> {In, Out, Ps ++ Ps1, Open}. flatrev(Xs, Tail) -> flatrev(Xs, [], Tail). flatrev([L | Xs], Ls, Tail) when list(L) -> flatrev(L, [Xs | Ls], Tail); flatrev([X | Xs], Ls, Tail) -> flatrev(Xs, Ls, [X | Tail]); flatrev([], [Xs | Ls], Tail) -> flatrev(Xs, Ls, Tail); flatrev([], [], Tail) -> Tail.