Generic I/O and binary I/O

Robert Virding rv@REDACTED
Thu Jan 3 17:05:34 CET 2002


I have now written a first draft of a generic i/o module, gen_io.  It
provides a generic synchronous interface to the i/o protocol and uses
callbacks for the device specific stuff.  There is "some"
documentation in the beginning of the file.

I have also reworked my binary i/o module, bin_io, to use gen_io.  It
is reasonable small example of using gen_io.

There is also a first draft of tcp_io which provides an i/o interface
to a socket.  It is not complete, specifically it does not correctly
handle when the port closes for the generic i/o interface.  I'll fix
it.  N.B. this is probably not the most efficient way of doing
formatted i/o to a socket but it does allow writing using generic i/o
functions.

I am going to convert file_io_server, the main file interface, to use
gen_io to see if gen_io is sufficient.  If this works then I reckon
that gen_io will do for synchronous i/o.  Asynchronous i/o, like for
the tty interface, is another matter and I think that it might be
difficult to generalise it.  I will try however.

Comment?

Robert

-------------- next part --------------
%%% File    : gen_io.erl
%%% Author  : Robert Virding <rv@REDACTED>
%%% Purpose : 
%%% Created : 21 Nov 2001 by Robert Virding <rv@REDACTED>

%% Mod:init([Argument]) ->
%%      {ok,State}
%%      {stop,Reason}
%%
%% Mod:handle_request(Request, State) ->
%%      {reply,From,Reply,State}
%%      {noreply,State}
%%      {stop,Reason,From,Reply,State}
%%      {stop,Reason,State}
%%  Request is the full unparsed request.
%%
%% Mod:terminate(Reason, State) -> ok
%%
%% Mod:read_chars(State) ->
%%      {ok,[Char],State}
%%      {error,Reason,State}
%%
%% Mod:push_chars([Char], State) ->
%%      {ok,State}
%%      {error,Reason,State}
%%  After we have read all the characters we need we push the rest
%%  back into the state.  N.B. the char list may be empty.
%%
%% Mod:write_chars([Char], State) ->
%%      {ok,State}
%%      {error,Reason,State}

-module(gen_io).
-author('rv@REDACTED').

-compile(export_all).
-export([start/3,start_link/3]).
-export([request/2]).

%% start(Mod, [Arg], [Option])
%% start_link(Mod, [Arg], [Option])
%%
%%  where
%%   Mod is the callback module
%%   [Arg] init arguments
%%   [Option] 

start(Mod, Args, Opts) ->
    do_start(spawn, Mod, Args, Opts).

start_link(Mod, Args, Opts) ->
    do_start(spawn_link, Mod, Args, Opts).

do_start(Spawn, Mod, Args, Opts) ->
    Self = self(),
    Ref = make_ref(),
    Pid = erlang:Spawn(fun () -> init(Self, Ref, Mod, Args, Opts) end),
    %% Monitor process and wait for reply or termination.
    Mref = erlang:monitor(process, Pid),
    receive
	{Ref,ok} ->
	    erlang:demonitor(Mref),
	    receive
		{'DOWN',Mref,_,_,Reason} ->
		    {error,Reason}
	    after 0 ->
		    {ok,Pid}
	    end;
	{'DOWN',Mref,_,_,Reason} ->
	    {error,Reason}
    end.

%% init(StarterPid, StartRef, IoModule, [Arg], [Opt])

init(Start, Ref, IoM, Args, Opts) ->
    case catch IoM:init(Args) of
	{ok,State} ->
	    Start ! {Ref,ok},
	    loop(IoM, State);
	{stop,Reason} ->
	    exit(Reason);
	{'EXIT',Reason} ->
	    exit(Reason);
	Else ->
	    Error = {bad_return_value,Else},
	    exit(Error)
    end.

%% loop(IoModule, State) -> void.
%%  Main io loop.  This never returns anything, just terminates.

loop(IoM, St0) ->
    receive
	{io_request,From,ReplyAs,Req} when pid(From) ->
	    %% Handle general io requests.
	    case io_request(Req, IoM, St0) of
		{ok,Rep,St1} ->
		    io_reply(From, ReplyAs, Rep),
		    loop(IoM, St1);
		{error,Rep,St1} ->
		    io_reply(From, ReplyAs, Rep),
		    loop(IoM, St1);
		{stop,Reason,Rep,St1} ->
		    terminate(Reason, From, {io_reply,ReplyAs,Rep}, IoM, St1)
	    end;
	Request ->
	    case catch IoM:handle_request(Request, St0) of
		{reply,From,Rep,St1} ->
		    From ! Rep,
		    loop(IoM, St1);
		{noreply,St1} ->
		    loop(IoM, St1);
		{stop,Reason,From,Rep,St1} ->
		    terminate(Reason, From, Rep, IoM, St1);
		{stop,Reason,St1} ->
		    terminate(Reason, IoM, St1)
	    end
    end.

io_reply(From, ReplyAs, Reply) ->
    From ! {io_reply, ReplyAs, Reply}.

terminate(Reason, From, Reply, IoM, St) ->
    From ! Reply,
    terminate(Reason, IoM, St).

terminate(Reason, IoM, St) ->
    catch IoM:terminate(Reason, St),
    exit(Reason).

%% io_request(Request, IoModule, State) ->
%%      {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%%
%%  Handle general io requests.

io_request({put_chars,Chars}, IoM, St0) ->
    case catch IoM:write_chars(Chars, St0) of
	{ok,St1} -> {ok,ok,St1};
	{error,Reason,St1} -> {stop,normal,{error,Reason},St1};
	Other -> {stop,Other,{error,Other},St0}
    end;
io_request({put_chars,Mod,Func,Args}, IoM, St) ->
    case catch apply(Mod, Func, Args) of
	{'EXIT',Reason} -> {error,{error,Func},St};
	Cs -> io_request({put_chars,Cs}, IoM, St)
    end;
io_request({get_until,Prompt,Mod,Func,ExtraArgs}, IoM, St) ->
    get_until(Mod, Func, ExtraArgs, IoM, St);
io_request({requests,Reqs}, IoM, St) when list(Reqs) ->
    io_request_loop(Reqs, IoM, {ok,ok,St});
io_request(Unknown, IoM, St) ->
    Reason = {error,Unknown},
    {error,{error,Reason},St}.

%% io_request_loop([Request], IoModule, Result) -> Result.
%%  Process list of requests as long as results are ok.

io_request_loop([], IoM, Res) -> Res;
io_request_loop([Req|Reqs], IoM, {ok,Rep,St}) ->
    io_request_loop(Reqs, IoM, io_request(Req, IoM, St));
io_request_loop([Req|Reqs], IoM, Res) -> Res.

%% get_until(Module, Func, [ExtraArg], IoModule, State) ->
%%      {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%%  Apply the get_until loop scanning the binary until the scan
%%  function has enough.  Buffer any remaining bytes until the next
%%  call.

get_until(Mod, Func, ExtraArgs, IoM, St) ->
    get_until_loop(Mod, Func, ExtraArgs, IoM, St, {more,[]}).

get_until_loop(M, F, As, IoM, St0, {more,Cont}) ->
    case catch IoM:read_chars(St0) of
	{ok,Cs,St1} ->
	    get_until_loop(M, F, As, IoM, St1,
			   catch apply(M, F, [Cont,Cs|As]));
	{error,Reason,St1} ->
	    {stop,Reason,{error,Reason},St1};
	Other ->
	    {stop,Other,{error,Other},St0}
    end;
get_until_loop(M, F, As, IoM, St0, {done,Res,Buf}) ->
    case catch IoM:push_chars(Buf, St0) of
	{ok,St1} -> {ok,Res,St1};
	{error,Reason,St1} ->
	    {stop,Reason,{error,Reason},St1};
	Other ->
	    {stop,Other,{error,Other},St0}
    end;
get_until_loop(M, F, As, IoM, St, Other) ->
    {error,{error,F},St}.

%% request(IoServer, Request) -> {ok,Reply} | {error,Reason}.
%%  Send a standard io request to to an io server and wait for the reply.

request(Pid, Request) when pid(Pid) ->
    Mref = erlang:monitor(process,Pid),
    Pid ! {io_request,self(),Pid,Request},
    wait_io_mon_reply(Pid,Mref);
request(Name, Request) when atom(Name) ->
    case whereis(Name) of
	undefined ->
	    {error, arguments};
	Pid ->
	    request(Pid, Request)
    end.

wait_io_mon_reply(From, Mref) ->
    receive
	{io_reply,From,Reply} ->
	    erlang:demonitor(Mref),
	    receive 
		{'DOWN', Mref, _, _, _} -> true
	    after 0 -> true
	    end,
	    Reply;
	{'EXIT', From, _What} ->
	    receive
		{'DOWN', Mref, _, _, _} -> true
	    after 0 -> true
	    end,
	    {error,terminated};
	{'DOWN', Mref, _, _, _} ->
	    receive
		{'EXIT', From, _What} -> true
	    after 0 -> true
	    end,
	    {error,terminated}
    end.
-------------- next part --------------
%% File    : bin_io.erl
%% Author  : Robert Virding
%% Purpose : Open a binary for standard i/o requests.

-module(bin_io).

%% The main user interface.
-export([open_read/1,open_create/0,open_append/1,close/1]).

%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
	 read_chars/1,write_chars/2,push_chars/2]).

-record(bin_io, {mode,bin,buf}).

-define(READ_SIZE, 256).			%Bytes per chunk read

%% The main interface.

open_read(Bin) ->
    gen_io:start_link(?MODULE, [read,Bin], []).

open_create() ->
    gen_io:start_link(?MODULE, [create], []).

open_append(Bin) ->
    gen_io:start_link(?MODULE, [append,Bin], []).

close(Io) ->
    Io ! {bin_request,self(),Io,close},
    receive
	{bin_reply,Io,Rep} -> Rep
    end.

%% init([Arg]) -> {ok,State}.

init([read,Bin]) when binary(Bin) ->
    {ok,#bin_io{mode=read,bin=Bin,buf=[]}};
init([create]) ->
    {ok,#bin_io{mode=write,bin= <<>>,buf=[]}};
init([append,Bin]) when binary(Bin) ->
    {ok,#bin_io{mode=write,bin=Bin,buf=[]}}.

%% terminate(Reason, State) -> ok.

terminate(R, St) -> ok.

%% handle_request(Request, State) ->
%%      {reply,From,Reply,State} |
%%      {noreply,State} |
%%      {stop,Reason,From,Reply,State} |
%%      {stop,Reason,State}.
%%
%%  Handle bin_io specific requests.

handle_request({bin_request,From,ReplyAs,close}, #bin_io{mode=Mode}=St)
  when pid(From) ->
    Rep = case Mode of
	      read -> ok;
	      write -> {ok,list_to_binary([St#bin_io.bin,St#bin_io.buf])}
	  end,
    {stop,normal,From,{bin_reply,ReplyAs,Rep},St#bin_io{buf=[]}};
handle_request({bin_request,From,ReplyAs,Other}, #bin_io{}=St)
  when pid(From) ->
    Rep = {error,{request,Other}},
    {reply,From,{bin_reply,ReplyAs,Rep},St};
handle_request(Unknown, State) ->
    %% Just ignore unknown requests.
    {noreply,State}.

%% write_chars(Chars, State) ->
%%      {ok,State} | {error,Reason,State}.

write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) when binary(Cs) ->
    {ok,St#bin_io{buf=[Buf|Cs]}};
write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) ->
    case catch list_to_binary(Cs) of
	{'EXIT',Reason} -> {error,Reason,St};
	MoreBin -> {ok,St#bin_io{buf=[Buf|MoreBin]}}
    end;
write_chars(Cs, #bin_io{mode=read}=St) ->
    {error,badmode,St}.

%% read_chars(State) ->
%%      {ok,Chars,State} | {error,Reason,State}.

read_chars(#bin_io{mode=read,buf=[],bin=Bin}=St) ->
    if Bin == <<>> -> {ok,eof,St};
       size(Bin) < ?READ_SIZE ->
	    {ok,binary_to_list(Bin),St#bin_io{bin= <<>>}};
       true ->
	    {B1,B2} = split_binary(Bin, ?READ_SIZE),
	    {ok,binary_to_list(B1),St#bin_io{bin=B2}}
    end;
read_chars(#bin_io{mode=read,buf=Buf}=St) ->
    {ok,Buf,St#bin_io{buf=[]}};
read_chars(#bin_io{mode=write}=St) ->
    {error,badmode,St}.	

%% push_chars([Char], State) ->
%%      {ok,State} | {error,Reason,State}.

push_chars(Cs, #bin_io{buf=[]}=St) ->
    {ok,St#bin_io{buf=Cs}}.
-------------- next part --------------
%% File    : tcp_io.erl
%% Author  : Robert Virding
%% Purpose : Open a tcp connection for standard i/o requests.

-module(tcp_io).

%% The main user interface.
-export([open/2,open/3,close/1]).

%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
	 read_chars/1,write_chars/2,push_chars/2]).

-record(tcp_io, {server,			%Server
		 port,				%Port
		 sock=none,			%Socket
		 ibuf=none}).

open(Server, Port) ->
    open(Server, Port, []).

open(Server, Port, SockOpts) ->
    gen_io:start_link(?MODULE, [Server,Port,SockOpts], []).

close(Io) ->
    Io ! {tcp_request,self(),Io,close},
    receive
	{tcp_reply,Io,Rep} -> Rep
    end.

%% init([Arg]) -> {ok,State}.
%%  Try to open the socket with the given arguments.

init([Server,Port,SockOpts]) ->
    case gen_tcp:connect(Server, Port, [list,{active,false}|SockOpts]) of
	{ok,S} ->
	    {ok,#tcp_io{server=Server,port=Port,sock=S,ibuf=[]}};
	{error,Reason} -> {stop,Reason}
    end.

%% terminate(Reason, State) -> ok.
%%  This just closes the socket.

terminate(Reason, #tcp_io{sock=S}) ->
    gen_tcp:close(S).

%% handle_request(Request, State) ->
%%      {reply,From,Reply,State} |
%%      {stop,Reason,From,Reply,State} |
%%      {noreply,State}.

handle_request({tcp_request,From,ReplyAs,close}, #tcp_io{}=St)
  when pid(From) ->
    {stop,normal,From,{tcp_reply,ReplyAs,ok},St};
handle_request({tcp_request,From,ReplyAs,Other}, #tcp_io{}=St)
  when pid(From) ->
    Rep = {error,{request,Other}},
    {reply,From,{tcp_reply,ReplyAs,Rep},St};
handle_request(Unknown, St) ->
    %% Just ignore unknown requests.
    {noreply,St}.

%% write_chars(Chars, State) ->
%%      {ok,State} | {error,Reason,State}.

write_chars(Cs, #tcp_io{sock=S}=St) ->
    case gen_tcp:send(S, Cs) of
	ok -> {ok,St};
	{error,Reason} -> {error,Reason,St}
    end.

%% read_chars(State) ->
%%      {ok,Chars,State} | {error,Reason,State}.

read_chars(#tcp_io{sock=S,ibuf=[]}=St) ->
    case gen_tcp:recv(S, 0) of
	{ok,Cs} -> {ok,Cs,St};
	{error,closed} -> {ok,eof,St};
	{error,Reason} -> {error,Reason,St}
    end;
read_chars(#tcp_io{ibuf=Cs}=St) ->
    {ok,Cs,St#tcp_io{ibuf=[]}}.

%% push_chars([Char], State) ->
%%      {ok,State} | {error,Reason,State}.

push_chars(Cs, #tcp_io{ibuf=[]}=St) ->
    {ok,St#tcp_io{ibuf=Cs}}.


More information about the erlang-questions mailing list