[erlang-questions] large scale deployments and netsplits

Joel Reymont joelr1@REDACTED
Mon Sep 14 20:05:22 CEST 2009

On Sep 14, 2009, at 6:28 PM, Ulf Wiger wrote:

> If you have network connections that are not responding within
> at least 45 seconds, it seems as if you have some rather
> severe overload on your links.

I'm attaching the source code to my "flash bot", one that uses
socket connections to the server. The bot restarts whenever
something goes wrong which happens often. For example, the bot
may not connect to the server within 5 seconds or the other
side may drop the connection, or the subscription ack may not
come on time, etc.

I'm starting 50 EC2 instances, 10k bots each (bot code below).
I'm running a single Erlang VM per small instance.

When a bot node comes up, it starts fetching a server list
from Amazon S3 every 10 seconds and updating a local cache.
Bots grab the list from the local cache.

I also redirect local error_logger on every instance to the
"mothership", the node that I'm launching the simulation from
and that I'm staring at.

Whenever a bot starts up, it will fetch the server list from
the local cache, look up 2 globally registered processes
(semaphore, sim_histo) and connect to the server.

Once connected, bots subscribe to a topic, wait for an ack
and then go into a wait mode. The "semaphore" process will
exit when a given number of bots have "bumped" it and bots
will then star waiting for a chunk of data.

There's also a global "statistics" process, hidden in the stats
module, that bots send async requests to periodically.

10 thousand bots per node and EC2 instance will repeat the
above process but the nodes seem to loose connectivity
before bots subscribe successfully. Notice that I have
a random delay before each reconnect to lessen the burden
on the application server.

All of the above doesn't seem like it should burden the
network links in theory. That's not what I'm seeing
in practice, though.

I'm thinking of complex schemes where 500k bots don't
monitor a single process, etc. but I wonder if there's
something simple that I'm missing.

	Thanks, Joel

P.S. The transition bit is there to centralize error handling,
since I want to restart from scratch any time something goes wrong.



-export([start/1, status/1]).

-export([not_connected/2, connected/2,
          packet/2, subscribed/2, network_split/2]).

-export([init/1, handle_event/3, handle_sync_event/4,
          handle_info/3, terminate/3, code_change/4]).

-define(MAX_FAILS, 60).
-define(MIN_DELAY, 2000).
-define(TIMEOUT, 5000).

-record(state, {

start(Expected) ->
     gen_fsm:start_link(?MODULE, [Expected], []).

status(Ref) ->
     gen_fsm:sync_send_all_state_event(Ref, status).

init([Expected]) ->
     put(state, pre_init),
     Sem = global:whereis_name(semaphore),
     Histo = global:whereis_name(sim_histo),
     init([Expected, Sem, Histo]);

init([Expected, Sem, Histo])
   when is_pid(Sem),
        is_pid(Histo) ->
     {A1, A2, A3} = now(),
     random:seed(A1, A2, A3),
     State = #state{
       sim = node(Histo),
       histo = Histo,
       expected = Expected,
       semaphore = Sem,
       fails = -1,
       timeout = random:uniform(?MIN_DELAY)
     catch erlang:monitor(process, Sem),
     {_, _, State1} = reconnect(State),
     stats:add(flash_total_started, 1),
     stats:sum(flash_started, 1),
     put(state, completed_init),
     {ok, not_connected, State1};

init([Expected, _, _]) ->

network_split(rejoin, State) ->
     put(state, netsplit),
     case net_adm:ping(State#state.sim) of
         pong ->
             Sem = global:whereis_name(semaphore),
             Histo = global:whereis_name(sim_histo),
             put(state, rejoined),
             case {Sem, Histo} of
                 {P1, P2} when is_pid(P1), is_pid(P2) ->
                     stats:add(flash_total_rejoins, 1),
                     State1 = State#state{semaphore = P1, histo = P2},
                     transition({error, netsplit}, network_split,  
                 _ ->
                     F = initially(rejoin, ?TIMEOUT),
                     transition(ok, network_split, F, State)
         _ ->
             F = initially(rejoin, ?TIMEOUT),
             transition(ok, network_split, F, State)

network_split(_, State) ->
     {next_state, network_split, State}.

not_connected(connect, State) ->
     put(state, connecting),
     put(state, fetching_server_list),
     Result = case (catch sim_cache:server_list()) of
                  {'EXIT', _} ->
                      {error, server_list_timeout};
                  Other ->
     put(state, have_server_list),
     F = fun({ok, [{Host, _, Port, _}|_]}, S) ->
                 Ref = gen_fsm:send_event_after(50, {connect, Host,  
                 S#state{timer = Ref}
     transition(Result, not_connected, not_connected, F, State);

not_connected({connect, Host, Port}, State) ->
     Result = gen_tcp:connect(Host, Port,
                               {packet, 0},
                               {active, true},
                               {reuseaddr, true}
                              ], ?TIMEOUT),
     stats:add(flash_total_connected, 1),
     stats:sum(flash_connected, 1),
     F = fun({ok, Sock}, S) ->
                 Ref = gen_fsm:send_event_after(50, packet2),
                 S#state{socket = Sock, timer = Ref}
     transition(Result, not_connected, packet, F, State).

packet(packet2, State) ->
     Result = send(<<255, 255, 255, 255>>, State),
     inet:setopts(State#state.socket, [{packet, 2}]),
     transition(Result, packet, initially(timeout, ?TIMEOUT), State);

packet(timeout, State) ->
     transition({error, packet_ack_timeout}, packet, connected, State);

packet(ack, State) ->
     transition(ok, packet, connected, initially(subscribe), State).

connected(subscribe, State) ->
     JSON = mochijson2:encode({struct, [{action, <<"subscribe">>},
                                        {data, <<"events">>}
     Result = send(JSON, State),
         Result == ok ->
             stats:add(flash_total_sub_req, 1),
             stats:sum(flash_sub_req, 1);
         true ->
     %% re-subscribe after a timeout
     transition(Result, connected, initially(timeout, ?TIMEOUT), State);

connected(timeout, State) ->
     %% subscribe ack timed out
     transition({error, subscribe_timeout}, connected, subscribed,  

connected(ack, State) ->
     %% subscribe confirmation received
     catch gen_fsm:cancel_timer(State#state.timer),
     stats:add(flash_total_sub_ack, 1),
     stats:sum(flash_sub_ack, 1),
     transition(ok, connected, subscribed, State#state{timer =  

subscribed(ready, State) ->
     F = fun(ok, S) ->
                 Ref = gen_fsm:send_event_after(?TIMEOUT, timeout),
                 S#state{timer = Ref}
     transition(ok, subscribed, subscribed, F, State);

subscribed(timeout, State) ->
     {stop, normal, State};

subscribed(Expected, State)
   when Expected == State#state.expected ->
     sim_histo:success(State#state.histo, State#state.latency),
     JSON = mochijson2:encode({struct, [{action, <<"unsubscribe">>},
                                        {data, <<"events">>}]}),
     send(JSON, State),
     {stop, normal, State}.

handle_event(Event, Where, State) ->
     {stop, {unknown_event, Event, Where}, State}.

handle_sync_event(status, _From, Where, State) ->
     {reply, State#state.transition, Where, State};

handle_sync_event(Event, From, Where, State) ->
     {stop, {unknown_sync_event, Event, From, Where}, State}.

handle_info({'DOWN', _, process, Pid, noconnection}, _, State)
   when State#state.semaphore == Pid ->
     stats:add(flash_total_splits, 1),
     network_split(rejoin, State);

handle_info({'DOWN', _, process, Pid, normal}, Where, State)
   when State#state.semaphore == Pid ->
     ?MODULE:Where(ready, State);

handle_info({'DOWN', _, process, _, shutdown}, Where, State) ->
     {next_state, Where, State};

handle_info({tcp, _Sock, <<"ACK">>}, Where, State) ->
     ?MODULE:Where(ack, State);

handle_info({tcp, _Sock, Bin}, Where, State) ->
     Now = now(),
     {Then, Bin1} = binary_to_term(Bin),
     JSON = mochijson2:decode(Bin1),
     %% grab the timestamp
     Delta = timer:now_diff(Now, Then),
     State1 = State#state{latency = Delta},
     ?MODULE:Where(JSON, State1);

handle_info(Error, Where, State = #state{})
   when element(1, Error) == tcp_closed;
        element(1, Error) == tcp_error;
        element(1, Error) == error ->
     stats:add(flash_total_tcp_errors, 1),
     stats:sum(flash_tcp_errors, 1),
     transition(Error, Where, State);

handle_info(Info, Where, State) ->
     {stop, {unknown_info, Info, Where}, State}.

terminate(_Reason, _Where, State) ->
     catch gen_tcp:close(State#state.socket),
     stats:add(flash_total_stopped, 1),
     stats:sum(flash_stopped, 1),

code_change(_OldVsn, Where, State, _Extra) ->
     {ok, Where, State}.

send(Bin, State = #state{}) ->
     gen_tcp:send(State#state.socket, Bin).

reconnect(State = #state{fails = ?MAX_FAILS}) ->
     {stop, max_fails, State};

reconnect(State) ->
     catch gen_tcp:close(State#state.socket),
     catch gen_fsm:cancel_timer(State#state.timer),
     Ref = gen_fsm:send_event_after(50, connect),
     State1 = State#state{timer = Ref, fails = State#state.fails + 1},
%%     Timeout = State1#state.fails * State1#state.timeout
%%         + random:uniform(?MIN_DELAY),
     Timeout = State1#state.timeout + random:uniform(?MIN_DELAY),
     State2 = State1#state{timeout = Timeout},
%%     io:format("~p: reconnecting. fails: ~p, timeout: ~p~n",
%%               [self(), State2#state.fails, State#state.timeout]),
     stats:add(flash_total_fails, 1),
     {next_state, not_connected, State2}.

flush() ->
         _ ->
     after 0 ->

transition(Error, Next, State = #state{})
   when is_atom(Next) ->
     transition(Error, Next, Next, State).

transition(Error, Next, F, State = #state{})
   when is_atom(Next),
        is_function(F) ->
     transition(Error, Next, Next, F, State);

transition(Error, Current, Next, State = #state{})
   when is_atom(Current),
        is_atom(Next) ->
     transition(Error, Current, Next, fun(_, S) -> S end, State).

transition(Good, Prev, Next, F, State = #state{})
   when is_atom(Next), is_function(F), Good == ok;
        is_atom(Next), is_function(F), element(1, Good) == ok ->
     State1 = State#state{transition = {Prev, Next, Good}},
     put(state, {transition, Prev, Next, Good}),
     {next_state, Next, F(Good, State1)};

transition(Error, subscribed, Next, F, State = #state{semaphore =  
Sem}) ->
     transition(Error, undefined, Next, F, State);

transition(Error, Prev, Next, _, State = #state{}) ->
     %% io:format("~p: ~p -> ~p = ~p~n", [self(), Prev, Next, Error]),
     stats:add(flash_errors, 1),
     State1 = State#state{transition = {Prev, Next, Error}},
     put(state, {transition, Prev, Next, Error}),

initially(Event) ->
     initially(Event, 50).

initially(Event, Timeout) ->
     fun(ok, State = #state{}) ->
             catch gen_fsm:cancel_timer(State#state.timer),
             Ref = gen_fsm:send_event_after(Timeout, Event),
             State#state{timer = Ref}

fastest mac firefox!

More information about the erlang-questions mailing list