<div dir="ltr"><div><div>Hey,<br><br></div>Just starting out on erlang, and trying to implement a message broker using erlang. I have a gen_fsm process for each client that handles the command and data receiving and I was wondering what is the best way to for the main broker process to send data to the client. <br><br>Do I just send a message to the client process and handle the sending inside `handle_info`? How would I handle race conditions in this case? E.g. When the client process terminates just after the main process sent the message.<br></div><div><br></div><div>My code looks something like this at the moment:<br><br>-module(pq_test).<br>-export([init/1,handle_info/3,handle_event/3]).<br>-export([start/0]).<br>-export([recv_cmd/2,recv_data/2]).<br><br>-record(state, {socket, buffer, current_command}).<br><br>start() -><br>Â Â Â spawn_link(fun start_link/0).<br><br>start_link() -><br>Â Â Â {ok, ListenSocket} = gen_tcp:listen(11880, [{active, false}, binary]),<br>Â Â Â accept(ListenSocket).<br><br>accept(ListenSocket) -><br>Â Â Â {ok, Socket} = gen_tcp:accept(ListenSocket),<br>Â Â Â {ok, Pid} = gen_fsm:start_link({local, ?MODULE}, ?MODULE, Socket, []),<br>Â Â Â erlang:display(Pid),<br>Â Â Â gen_tcp:controlling_process(Socket, Pid),<br>Â Â Â accept(ListenSocket).<br><br>init(Socket)-><br>Â Â Â inet:setopts(Socket, [{active, true}]),<br>Â Â Â {ok, recv_cmd, <br>Â Â Â Â #state{socket = Socket, buffer = <<"">>, current_command = null}}.<br><br>cmd_parse(publish, [Topic, Key, Length], #state{socket = Socket})-><br>Â Â Â {Len, _} = string:to_integer(Length),<br>Â Â Â erlang:display(Len),<br>Â Â Â io:format("Publish -~s- -~s- -~B-~n", [Topic, Key, Len]),<br>Â Â Â {next_state, recv_data, <br>Â Â Â Â #state{socket = Socket, buffer = <<"">>, current_command = {publish, Len}}};<br>cmd_parse(_, _, #state{socket = Socket}) -><br>Â Â Â io:format("Unknown command~n"),<br>Â Â Â {next_state, recv_cmd, <br>Â Â Â Â #state{socket = Socket, buffer = <<"">>, current_command = null}}.<br><br>cmd_process(CommandLine, #state{socket = Socket} = State) -><br>Â Â Â io:format("1~n"),<br>Â Â Â case string:tokens(binary_to_list(CommandLine), [$\s, $\r, $\n]) of<br>Â Â Â [Command | Args] -> <br>Â Â Â Â Â Â cmd_parse(list_to_atom(Command), Args, State);<br>Â Â Â _ -> {next_state, recv_cmd, <br>Â Â Â Â Â Â Â Â #state{socket = Socket, buffer = <<"">>, current_command = null}}<br>Â Â Â end.<br><br>cmd_more(CommandLine, #state{socket = Socket}) -><br>Â Â Â {next_state, recv_cmd, <br>Â Â Â Â #state{socket=Socket, buffer=CommandLine, current_command = null}}.<br><br>recv_cmd({data, Data}, #state{buffer = Buffer} = State) -><br>Â Â Â CommandLine = <<Buffer/binary, Data/binary>>,<br>Â Â Â Last = binary:last(CommandLine),<br>Â Â Â case Last of<br>Â Â Â $\n -> cmd_process(CommandLine, State);<br>Â Â Â _Â -> cmd_more(CommandLine, State)<br>Â Â Â end.<br><br>recv_data({data, Data}, <br>Â Â Â Â #state{socket = Socket, <br>Â Â Â Â Â Â Â buffer = Buffer, <br>Â Â Â Â Â Â Â current_command = {publish, Len}}) when Len > byte_size(Data) -><br>Â Â Â {next_state, recv_data,<br>Â Â Â Â #state{socket = Socket, buffer = <<Buffer/binary, Data/binary>>, <br>Â Â Â Â Â Â current_command = {publish, Len - byte_size(Data)}}};<br>recv_data({data, Data}, #state{socket = Socket, buffer = Buffer}) -><br>Â Â Â io:format("Got all data: ~s~n", [<<Buffer/binary, Data/binary>>]),<br>Â Â Â {next_state, recv_cmd,<br>Â Â Â Â #state{socket = Socket, buffer = <<"">>, current_command = null}}.<br><br>handle_info({tcp, _, Message}, StateName, State) -><br>Â Â Â ?MODULE:StateName({data, Message}, State);<br>handle_info(Event, StateName, State) -><br>Â Â Â io:format("handle info~n"),<br>Â Â Â erlang:display(Event),<br>Â Â Â {next_state, StateName, State}.<br><br>handle_event(_, _, _) -><br>Â Â Â io:format("handle event").<br><br></div></div>