Generic I/O and binary I/O
Robert Virding
rv@REDACTED
Thu Jan 3 17:05:34 CET 2002
I have now written a first draft of a generic i/o module, gen_io. It
provides a generic synchronous interface to the i/o protocol and uses
callbacks for the device specific stuff. There is "some"
documentation in the beginning of the file.
I have also reworked my binary i/o module, bin_io, to use gen_io. It
is reasonable small example of using gen_io.
There is also a first draft of tcp_io which provides an i/o interface
to a socket. It is not complete, specifically it does not correctly
handle when the port closes for the generic i/o interface. I'll fix
it. N.B. this is probably not the most efficient way of doing
formatted i/o to a socket but it does allow writing using generic i/o
functions.
I am going to convert file_io_server, the main file interface, to use
gen_io to see if gen_io is sufficient. If this works then I reckon
that gen_io will do for synchronous i/o. Asynchronous i/o, like for
the tty interface, is another matter and I think that it might be
difficult to generalise it. I will try however.
Comment?
Robert
-------------- next part --------------
%%% File : gen_io.erl
%%% Author : Robert Virding <rv@REDACTED>
%%% Purpose :
%%% Created : 21 Nov 2001 by Robert Virding <rv@REDACTED>
%% Mod:init([Argument]) ->
%% {ok,State}
%% {stop,Reason}
%%
%% Mod:handle_request(Request, State) ->
%% {reply,From,Reply,State}
%% {noreply,State}
%% {stop,Reason,From,Reply,State}
%% {stop,Reason,State}
%% Request is the full unparsed request.
%%
%% Mod:terminate(Reason, State) -> ok
%%
%% Mod:read_chars(State) ->
%% {ok,[Char],State}
%% {error,Reason,State}
%%
%% Mod:push_chars([Char], State) ->
%% {ok,State}
%% {error,Reason,State}
%% After we have read all the characters we need we push the rest
%% back into the state. N.B. the char list may be empty.
%%
%% Mod:write_chars([Char], State) ->
%% {ok,State}
%% {error,Reason,State}
-module(gen_io).
-author('rv@REDACTED').
-compile(export_all).
-export([start/3,start_link/3]).
-export([request/2]).
%% start(Mod, [Arg], [Option])
%% start_link(Mod, [Arg], [Option])
%%
%% where
%% Mod is the callback module
%% [Arg] init arguments
%% [Option]
start(Mod, Args, Opts) ->
do_start(spawn, Mod, Args, Opts).
start_link(Mod, Args, Opts) ->
do_start(spawn_link, Mod, Args, Opts).
do_start(Spawn, Mod, Args, Opts) ->
Self = self(),
Ref = make_ref(),
Pid = erlang:Spawn(fun () -> init(Self, Ref, Mod, Args, Opts) end),
%% Monitor process and wait for reply or termination.
Mref = erlang:monitor(process, Pid),
receive
{Ref,ok} ->
erlang:demonitor(Mref),
receive
{'DOWN',Mref,_,_,Reason} ->
{error,Reason}
after 0 ->
{ok,Pid}
end;
{'DOWN',Mref,_,_,Reason} ->
{error,Reason}
end.
%% init(StarterPid, StartRef, IoModule, [Arg], [Opt])
init(Start, Ref, IoM, Args, Opts) ->
case catch IoM:init(Args) of
{ok,State} ->
Start ! {Ref,ok},
loop(IoM, State);
{stop,Reason} ->
exit(Reason);
{'EXIT',Reason} ->
exit(Reason);
Else ->
Error = {bad_return_value,Else},
exit(Error)
end.
%% loop(IoModule, State) -> void.
%% Main io loop. This never returns anything, just terminates.
loop(IoM, St0) ->
receive
{io_request,From,ReplyAs,Req} when pid(From) ->
%% Handle general io requests.
case io_request(Req, IoM, St0) of
{ok,Rep,St1} ->
io_reply(From, ReplyAs, Rep),
loop(IoM, St1);
{error,Rep,St1} ->
io_reply(From, ReplyAs, Rep),
loop(IoM, St1);
{stop,Reason,Rep,St1} ->
terminate(Reason, From, {io_reply,ReplyAs,Rep}, IoM, St1)
end;
Request ->
case catch IoM:handle_request(Request, St0) of
{reply,From,Rep,St1} ->
From ! Rep,
loop(IoM, St1);
{noreply,St1} ->
loop(IoM, St1);
{stop,Reason,From,Rep,St1} ->
terminate(Reason, From, Rep, IoM, St1);
{stop,Reason,St1} ->
terminate(Reason, IoM, St1)
end
end.
io_reply(From, ReplyAs, Reply) ->
From ! {io_reply, ReplyAs, Reply}.
terminate(Reason, From, Reply, IoM, St) ->
From ! Reply,
terminate(Reason, IoM, St).
terminate(Reason, IoM, St) ->
catch IoM:terminate(Reason, St),
exit(Reason).
%% io_request(Request, IoModule, State) ->
%% {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%%
%% Handle general io requests.
io_request({put_chars,Chars}, IoM, St0) ->
case catch IoM:write_chars(Chars, St0) of
{ok,St1} -> {ok,ok,St1};
{error,Reason,St1} -> {stop,normal,{error,Reason},St1};
Other -> {stop,Other,{error,Other},St0}
end;
io_request({put_chars,Mod,Func,Args}, IoM, St) ->
case catch apply(Mod, Func, Args) of
{'EXIT',Reason} -> {error,{error,Func},St};
Cs -> io_request({put_chars,Cs}, IoM, St)
end;
io_request({get_until,Prompt,Mod,Func,ExtraArgs}, IoM, St) ->
get_until(Mod, Func, ExtraArgs, IoM, St);
io_request({requests,Reqs}, IoM, St) when list(Reqs) ->
io_request_loop(Reqs, IoM, {ok,ok,St});
io_request(Unknown, IoM, St) ->
Reason = {error,Unknown},
{error,{error,Reason},St}.
%% io_request_loop([Request], IoModule, Result) -> Result.
%% Process list of requests as long as results are ok.
io_request_loop([], IoM, Res) -> Res;
io_request_loop([Req|Reqs], IoM, {ok,Rep,St}) ->
io_request_loop(Reqs, IoM, io_request(Req, IoM, St));
io_request_loop([Req|Reqs], IoM, Res) -> Res.
%% get_until(Module, Func, [ExtraArg], IoModule, State) ->
%% {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%% Apply the get_until loop scanning the binary until the scan
%% function has enough. Buffer any remaining bytes until the next
%% call.
get_until(Mod, Func, ExtraArgs, IoM, St) ->
get_until_loop(Mod, Func, ExtraArgs, IoM, St, {more,[]}).
get_until_loop(M, F, As, IoM, St0, {more,Cont}) ->
case catch IoM:read_chars(St0) of
{ok,Cs,St1} ->
get_until_loop(M, F, As, IoM, St1,
catch apply(M, F, [Cont,Cs|As]));
{error,Reason,St1} ->
{stop,Reason,{error,Reason},St1};
Other ->
{stop,Other,{error,Other},St0}
end;
get_until_loop(M, F, As, IoM, St0, {done,Res,Buf}) ->
case catch IoM:push_chars(Buf, St0) of
{ok,St1} -> {ok,Res,St1};
{error,Reason,St1} ->
{stop,Reason,{error,Reason},St1};
Other ->
{stop,Other,{error,Other},St0}
end;
get_until_loop(M, F, As, IoM, St, Other) ->
{error,{error,F},St}.
%% request(IoServer, Request) -> {ok,Reply} | {error,Reason}.
%% Send a standard io request to to an io server and wait for the reply.
request(Pid, Request) when pid(Pid) ->
Mref = erlang:monitor(process,Pid),
Pid ! {io_request,self(),Pid,Request},
wait_io_mon_reply(Pid,Mref);
request(Name, Request) when atom(Name) ->
case whereis(Name) of
undefined ->
{error, arguments};
Pid ->
request(Pid, Request)
end.
wait_io_mon_reply(From, Mref) ->
receive
{io_reply,From,Reply} ->
erlang:demonitor(Mref),
receive
{'DOWN', Mref, _, _, _} -> true
after 0 -> true
end,
Reply;
{'EXIT', From, _What} ->
receive
{'DOWN', Mref, _, _, _} -> true
after 0 -> true
end,
{error,terminated};
{'DOWN', Mref, _, _, _} ->
receive
{'EXIT', From, _What} -> true
after 0 -> true
end,
{error,terminated}
end.
-------------- next part --------------
%% File : bin_io.erl
%% Author : Robert Virding
%% Purpose : Open a binary for standard i/o requests.
-module(bin_io).
%% The main user interface.
-export([open_read/1,open_create/0,open_append/1,close/1]).
%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
read_chars/1,write_chars/2,push_chars/2]).
-record(bin_io, {mode,bin,buf}).
-define(READ_SIZE, 256). %Bytes per chunk read
%% The main interface.
open_read(Bin) ->
gen_io:start_link(?MODULE, [read,Bin], []).
open_create() ->
gen_io:start_link(?MODULE, [create], []).
open_append(Bin) ->
gen_io:start_link(?MODULE, [append,Bin], []).
close(Io) ->
Io ! {bin_request,self(),Io,close},
receive
{bin_reply,Io,Rep} -> Rep
end.
%% init([Arg]) -> {ok,State}.
init([read,Bin]) when binary(Bin) ->
{ok,#bin_io{mode=read,bin=Bin,buf=[]}};
init([create]) ->
{ok,#bin_io{mode=write,bin= <<>>,buf=[]}};
init([append,Bin]) when binary(Bin) ->
{ok,#bin_io{mode=write,bin=Bin,buf=[]}}.
%% terminate(Reason, State) -> ok.
terminate(R, St) -> ok.
%% handle_request(Request, State) ->
%% {reply,From,Reply,State} |
%% {noreply,State} |
%% {stop,Reason,From,Reply,State} |
%% {stop,Reason,State}.
%%
%% Handle bin_io specific requests.
handle_request({bin_request,From,ReplyAs,close}, #bin_io{mode=Mode}=St)
when pid(From) ->
Rep = case Mode of
read -> ok;
write -> {ok,list_to_binary([St#bin_io.bin,St#bin_io.buf])}
end,
{stop,normal,From,{bin_reply,ReplyAs,Rep},St#bin_io{buf=[]}};
handle_request({bin_request,From,ReplyAs,Other}, #bin_io{}=St)
when pid(From) ->
Rep = {error,{request,Other}},
{reply,From,{bin_reply,ReplyAs,Rep},St};
handle_request(Unknown, State) ->
%% Just ignore unknown requests.
{noreply,State}.
%% write_chars(Chars, State) ->
%% {ok,State} | {error,Reason,State}.
write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) when binary(Cs) ->
{ok,St#bin_io{buf=[Buf|Cs]}};
write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) ->
case catch list_to_binary(Cs) of
{'EXIT',Reason} -> {error,Reason,St};
MoreBin -> {ok,St#bin_io{buf=[Buf|MoreBin]}}
end;
write_chars(Cs, #bin_io{mode=read}=St) ->
{error,badmode,St}.
%% read_chars(State) ->
%% {ok,Chars,State} | {error,Reason,State}.
read_chars(#bin_io{mode=read,buf=[],bin=Bin}=St) ->
if Bin == <<>> -> {ok,eof,St};
size(Bin) < ?READ_SIZE ->
{ok,binary_to_list(Bin),St#bin_io{bin= <<>>}};
true ->
{B1,B2} = split_binary(Bin, ?READ_SIZE),
{ok,binary_to_list(B1),St#bin_io{bin=B2}}
end;
read_chars(#bin_io{mode=read,buf=Buf}=St) ->
{ok,Buf,St#bin_io{buf=[]}};
read_chars(#bin_io{mode=write}=St) ->
{error,badmode,St}.
%% push_chars([Char], State) ->
%% {ok,State} | {error,Reason,State}.
push_chars(Cs, #bin_io{buf=[]}=St) ->
{ok,St#bin_io{buf=Cs}}.
-------------- next part --------------
%% File : tcp_io.erl
%% Author : Robert Virding
%% Purpose : Open a tcp connection for standard i/o requests.
-module(tcp_io).
%% The main user interface.
-export([open/2,open/3,close/1]).
%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
read_chars/1,write_chars/2,push_chars/2]).
-record(tcp_io, {server, %Server
port, %Port
sock=none, %Socket
ibuf=none}).
open(Server, Port) ->
open(Server, Port, []).
open(Server, Port, SockOpts) ->
gen_io:start_link(?MODULE, [Server,Port,SockOpts], []).
close(Io) ->
Io ! {tcp_request,self(),Io,close},
receive
{tcp_reply,Io,Rep} -> Rep
end.
%% init([Arg]) -> {ok,State}.
%% Try to open the socket with the given arguments.
init([Server,Port,SockOpts]) ->
case gen_tcp:connect(Server, Port, [list,{active,false}|SockOpts]) of
{ok,S} ->
{ok,#tcp_io{server=Server,port=Port,sock=S,ibuf=[]}};
{error,Reason} -> {stop,Reason}
end.
%% terminate(Reason, State) -> ok.
%% This just closes the socket.
terminate(Reason, #tcp_io{sock=S}) ->
gen_tcp:close(S).
%% handle_request(Request, State) ->
%% {reply,From,Reply,State} |
%% {stop,Reason,From,Reply,State} |
%% {noreply,State}.
handle_request({tcp_request,From,ReplyAs,close}, #tcp_io{}=St)
when pid(From) ->
{stop,normal,From,{tcp_reply,ReplyAs,ok},St};
handle_request({tcp_request,From,ReplyAs,Other}, #tcp_io{}=St)
when pid(From) ->
Rep = {error,{request,Other}},
{reply,From,{tcp_reply,ReplyAs,Rep},St};
handle_request(Unknown, St) ->
%% Just ignore unknown requests.
{noreply,St}.
%% write_chars(Chars, State) ->
%% {ok,State} | {error,Reason,State}.
write_chars(Cs, #tcp_io{sock=S}=St) ->
case gen_tcp:send(S, Cs) of
ok -> {ok,St};
{error,Reason} -> {error,Reason,St}
end.
%% read_chars(State) ->
%% {ok,Chars,State} | {error,Reason,State}.
read_chars(#tcp_io{sock=S,ibuf=[]}=St) ->
case gen_tcp:recv(S, 0) of
{ok,Cs} -> {ok,Cs,St};
{error,closed} -> {ok,eof,St};
{error,Reason} -> {error,Reason,St}
end;
read_chars(#tcp_io{ibuf=Cs}=St) ->
{ok,Cs,St#tcp_io{ibuf=[]}}.
%% push_chars([Char], State) ->
%% {ok,State} | {error,Reason,State}.
push_chars(Cs, #tcp_io{ibuf=[]}=St) ->
{ok,St#tcp_io{ibuf=Cs}}.
More information about the erlang-questions
mailing list