Multicast
Martin J. Logan
mlogan@REDACTED
Fri Sep 3 16:50:32 CEST 2004
Disregard the last email - In my eagerness to try this last example I
plugged in the wrong IP for the multicast. I was using the localhost IP
addr which of course explains the round robin in the last case. Thanks
for the help - it works perfectly.
On Thu, 2004-09-02 at 20:58, Reto Kramer wrote:
> [ups, sent this from wrong account initially]
>
> Martin,
>
> Here's a complete testcase that you can run locally. It's a dumbed down
> version of a larget app.
>
> Compile this file and then run it in two terminals with:
>
> erl -sname foo -s discover_server run
> erl -sname bar -s discover_server run
>
> Each node (foo and bar) will send numbered heartbeats to the multicast
> address and each one will pick up it's own packets as well as packets
> send by the other node.
>
> (foo@REDACTED)1> Running for 5 minutes.
> received packet 1 from node foo@REDACTED
> received packet 2 from node foo@REDACTED
> received packet 1 from node bar@REDACTED
> received packet 3 from node foo@REDACTED
> received packet 2 from node bar@REDACTED
> received packet 4 from node foo@REDACTED
> received packet 3 from node bar@REDACTED
> received packet 5 from node foo@REDACTED
>
> (bar@REDACTED)1> Running for 5 minutes.
> received packet 2 from node foo@REDACTED
> received packet 1 from node bar@REDACTED
> received packet 3 from node foo@REDACTED
> received packet 2 from node bar@REDACTED
> received packet 4 from node foo@REDACTED
> received packet 3 from node bar@REDACTED
>
> This is running on one OSX box. E.g. foo picks up it's own heartbeat
> 2, and bar picks up that same packet (foo's heartbeat 2) as well. So
> here they are not eating each others packets, but each process sees the
> same packets.
>
> Run it to see what you'll get on your setup.
>
> cheers,
> - Reto
>
> %%%-------------------------------------------------------------------
> %%% File : discover_server.erl
> %%% Author : reto <kramer@REDACTED>
> %%% Description : Basic multicast based heart-beating
> %%%-------------------------------------------------------------------
> -module(discover_server).
>
> -behaviour(gen_server).
> %%--------------------------------------------------------------------
> %% Include files
> %%--------------------------------------------------------------------
>
> %%--------------------------------------------------------------------
> %% External exports
> -export([start_link/0]).
>
> -export([run/0]).
>
> -export([trigger_heartbeat/0]).
>
> %% gen_server callbacks
> -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
> terminate/2, code_change/3]).
>
> -record(state, {socket, % multicast udp socket
> nodes, % set of nodes (atoms)
> send_timer, % timer used to send heartbeats
> counter
> }).
>
> -define(HB_PERIOD_MS, 1000). % (if not in .app) default hb interval
> -define(ADDR, {239,0,42,1}). % (if not in .app) default addr
> -define(PORT, 10042). % (if not in .app) default port
>
> -define(SERVER, ?MODULE).
>
> %%====================================================================
> %% External functions
> %%====================================================================
> %%--------------------------------------------------------------------
> %% Function: start_link/0
> %% Description: Starts the server
> %%--------------------------------------------------------------------
> start_link() ->
> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
>
> %%====================================================================
> %% Server functions
> %%====================================================================
>
> %%--------------------------------------------------------------------
> %% Function: init/1
> %% Description: Initiates the server
> %% Returns: {ok, State} |
> %% {ok, State, Timeout} |
> %% ignore |
> %% {stop, Reason}
> %%--------------------------------------------------------------------
> init([]) ->
> process_flag(trap_exit, true),
> {ok, Socket} = gen_udp:open(get_port(), [binary,
> {reuseaddr,true},
> {ip,get_addr()},
> {multicast_loop,true},
> {add_membership,{get_addr(),
> {0,0,0,0}}}]),
> % net_kernel:monitor_nodes(true),
> {ok, TRef} = timer:apply_interval(get_period(),
> ?MODULE, trigger_heartbeat, []),
> {ok, #state{socket=Socket,
> nodes=sets:new(),
> send_timer=TRef,
> counter=1}}.
>
> %%--------------------------------------------------------------------
> %% Function: handle_call/3
> %% Description: Handling call messages
> %% Returns: {reply, Reply, State} |
> %% {reply, Reply, State, Timeout} |
> %% {noreply, State} |
> %% {noreply, State, Timeout} |
> %% {stop, Reason, Reply, State} | (terminate/2 is called)
> %% {stop, Reason, State} (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_call(Request, From, State) ->
> Reply = ok,
> {reply, Reply, State}.
>
> %%--------------------------------------------------------------------
> %% Function: handle_cast/2
> %% Description: Handling cast messages
> %% Returns: {noreply, State} |
> %% {noreply, State, Timeout} |
> %% {stop, Reason, State} (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_cast({send_heartbeat}, State) ->
> send_heartbeat(State),
> NewState = State#state{counter = State#state.counter + 1},
> {noreply, NewState};
> handle_cast(Msg, State) ->
> {noreply, State}.
>
> %%--------------------------------------------------------------------
> %% Function: handle_info/2
> %% Description: Handling all non call/cast messages
> %% Returns: {noreply, State} |
> %% {noreply, State, Timeout} |
> %% {stop, Reason, State} (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_info({udp, Socket, IP, InPortNo, Packet}, State) ->
> {node_name, Node,
> counter, Counter} = binary_to_term(Packet),
> io:format("received packet ~p from node ~p~n",[Counter, Node]),
> {noreply, State};
> handle_info(Info, State) ->
> {noreply, State}.
>
> %%--------------------------------------------------------------------
> %% Function: terminate/2
> %% Description: Shutdown the server
> %% Returns: any (ignored by gen_server)
> %%--------------------------------------------------------------------
> terminate(Reason, State) ->
> ok.
>
> %%--------------------------------------------------------------------
> %% Func: code_change/3
> %% Purpose: Convert process state when code is changed
> %% Returns: {ok, NewState}
> %%--------------------------------------------------------------------
> code_change(OldVsn, State, Extra) ->
> {ok, State}.
>
> %%--------------------------------------------------------------------
> %%% Internal functions
> %%--------------------------------------------------------------------
> trigger_heartbeat() ->
> gen_server:cast(?SERVER, {send_heartbeat}).
>
> send_heartbeat(State) ->
> Packet = {node_name, node(),
> counter, State#state.counter},
> ok = gen_udp:send(State#state.socket,
> get_addr(),
> get_port(),
> term_to_binary(Packet)).
>
> get_addr() ->
> case application:get_application() of
> undefined ->
> ?ADDR;
> {ok, App} ->
> case application:get_env(App, addr) of
> undefined ->
> ?ADDR;
> {ok, Val} ->
> Val
> end
> end.
>
> get_port() ->
> case application:get_application() of
> undefined ->
> ?PORT;
> {ok, App} ->
> case application:get_env(App, port) of
> undefined ->
> ?PORT;
> {ok, Val} ->
> Val
> end
> end.
>
> get_period() ->
> case application:get_application() of
> undefined ->
> ?HB_PERIOD_MS;
> {ok, App} ->
> case application:get_env(App, period) of
> undefined ->
> ?HB_PERIOD_MS;
> {ok, Val} ->
> Val
> end
> end.
>
>
> %%--------------------------------------------------------------------
> %%% debug functions
> %%--------------------------------------------------------------------
> run() ->
> {ok, Pid} = ?MODULE:start_link(),
> io:format("Running for 5 minutes.~n",[]),
> timer:sleep(5*60*1000),
> io:format("done.~n",[]).
>
> #
>
> On Sep 2, 2004, at 12:39 PM, Reto Kramer wrote:
>
> > The following works on the OSX and Linux for me. I had trouble
> > opening the multicast socket on WinXP, but no patience to figure out
> > the cause.
> >
> > cheers,
> > - Reto
> >
> > Open a multicast port with:
> > Res = gen_udp:open(get_port(), [binary,
> > {reuseaddr,true},
> > {ip,get_addr()},
> > {multicast_loop,true},
> > {add_membership,{get_addr(),
> > {0,0,0,0}}}]),
> >
> > where
> > get_addr() ->
> > {ok, App} = application:get_application(),
> > case application:get_env(App, addr) of
> > {ok, Val} ->
> > Val;
> > undefined ->
> > ?ADDR
> > end.
> > get_port() -> ...
> >
> > and the env key of the .app file is
> > {env, [{addr, {239,0,0,1}}, % the multicast ip addr
> > {port, 10042}, % the port we multicast at
> > {period, 1000}]}, % heartbeat interval in ms
> >
> > Send packets with
> > Packet = {node_name, node()},
> > ok = gen_udp:send(State#state.socket,
> > get_addr(),
> > get_port(),
> > term_to_binary(Packet)),
> >
> > and receive packets like this:
> > receive
> > {udp, Socket, IP, InPortNo, Packet} ->
> > {node_name, Node} = binary_to_term(Packet),
> >
> > #
> >
> > On Sep 2, 2004, at 12:03 PM, Martin J. Logan wrote:
> >
> >> Is it possible to have multiple nodes on a single machine listening
> >> for
> >> the same multicast packets at the same time? How can this be
> >> accomplished within erts?
> >>
> >> Cheers,
> >> Martin
> >>
> >
More information about the erlang-questions
mailing list