[erlang-questions] TCP client
Stavros Filargyropoulos
stafil@REDACTED
Thu Apr 20 22:34:45 CEST 2017
Hey,
Just starting out on erlang, and trying to implement a message broker using
erlang. I have a gen_fsm process for each client that handles the command
and data receiving and I was wondering what is the best way to for the main
broker process to send data to the client.
Do I just send a message to the client process and handle the sending
inside `handle_info`? How would I handle race conditions in this case? E.g.
When the client process terminates just after the main process sent the
message.
My code looks something like this at the moment:
-module(pq_test).
-export([init/1,handle_info/3,handle_event/3]).
-export([start/0]).
-export([recv_cmd/2,recv_data/2]).
-record(state, {socket, buffer, current_command}).
start() ->
spawn_link(fun start_link/0).
start_link() ->
{ok, ListenSocket} = gen_tcp:listen(11880, [{active, false}, binary]),
accept(ListenSocket).
accept(ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
{ok, Pid} = gen_fsm:start_link({local, ?MODULE}, ?MODULE, Socket, []),
erlang:display(Pid),
gen_tcp:controlling_process(Socket, Pid),
accept(ListenSocket).
init(Socket)->
inet:setopts(Socket, [{active, true}]),
{ok, recv_cmd,
#state{socket = Socket, buffer = <<"">>, current_command = null}}.
cmd_parse(publish, [Topic, Key, Length], #state{socket = Socket})->
{Len, _} = string:to_integer(Length),
erlang:display(Len),
io:format("Publish -~s- -~s- -~B-~n", [Topic, Key, Len]),
{next_state, recv_data,
#state{socket = Socket, buffer = <<"">>, current_command = {publish,
Len}}};
cmd_parse(_, _, #state{socket = Socket}) ->
io:format("Unknown command~n"),
{next_state, recv_cmd,
#state{socket = Socket, buffer = <<"">>, current_command = null}}.
cmd_process(CommandLine, #state{socket = Socket} = State) ->
io:format("1~n"),
case string:tokens(binary_to_list(CommandLine), [$\s, $\r, $\n]) of
[Command | Args] ->
cmd_parse(list_to_atom(Command), Args, State);
_ -> {next_state, recv_cmd,
#state{socket = Socket, buffer = <<"">>, current_command = null}}
end.
cmd_more(CommandLine, #state{socket = Socket}) ->
{next_state, recv_cmd,
#state{socket=Socket, buffer=CommandLine, current_command = null}}.
recv_cmd({data, Data}, #state{buffer = Buffer} = State) ->
CommandLine = <<Buffer/binary, Data/binary>>,
Last = binary:last(CommandLine),
case Last of
$\n -> cmd_process(CommandLine, State);
_ -> cmd_more(CommandLine, State)
end.
recv_data({data, Data},
#state{socket = Socket,
buffer = Buffer,
current_command = {publish, Len}}) when Len > byte_size(Data) ->
{next_state, recv_data,
#state{socket = Socket, buffer = <<Buffer/binary, Data/binary>>,
current_command = {publish, Len - byte_size(Data)}}};
recv_data({data, Data}, #state{socket = Socket, buffer = Buffer}) ->
io:format("Got all data: ~s~n", [<<Buffer/binary, Data/binary>>]),
{next_state, recv_cmd,
#state{socket = Socket, buffer = <<"">>, current_command = null}}.
handle_info({tcp, _, Message}, StateName, State) ->
?MODULE:StateName({data, Message}, State);
handle_info(Event, StateName, State) ->
io:format("handle info~n"),
erlang:display(Event),
{next_state, StateName, State}.
handle_event(_, _, _) ->
io:format("handle event").
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20170420/45f1c81d/attachment.htm>
More information about the erlang-questions
mailing list