Multicast

Martin J. Logan mlogan@REDACTED
Fri Sep 3 16:50:32 CEST 2004


Disregard the last email - In my eagerness to try this last example I
plugged in the wrong IP for the multicast. I was using the localhost IP
addr which of course explains the round robin in the last case. Thanks
for the help - it works perfectly.


On Thu, 2004-09-02 at 20:58, Reto Kramer wrote:
> [ups, sent this from wrong account initially]
> 
> Martin,
> 
> Here's a complete testcase that you can run locally. It's a dumbed down 
> version of a larget app.
> 
> Compile this file and then run it in two terminals with:
> 
> erl -sname foo -s discover_server run
> erl -sname bar -s discover_server run
> 
> Each node (foo and bar) will send numbered heartbeats to the multicast 
> address and each one will pick up it's own packets as well as packets 
> send by the other node.
> 
> (foo@REDACTED)1> Running for 5 minutes.
> received packet 1 from node foo@REDACTED
> received packet 2 from node foo@REDACTED
> received packet 1 from node bar@REDACTED
> received packet 3 from node foo@REDACTED
> received packet 2 from node bar@REDACTED
> received packet 4 from node foo@REDACTED
> received packet 3 from node bar@REDACTED
> received packet 5 from node foo@REDACTED
> 
> (bar@REDACTED)1> Running for 5 minutes.
> received packet 2 from node foo@REDACTED
> received packet 1 from node bar@REDACTED
> received packet 3 from node foo@REDACTED
> received packet 2 from node bar@REDACTED
> received packet 4 from node foo@REDACTED
> received packet 3 from node bar@REDACTED
> 
> This is running on one OSX box.  E.g. foo picks up it's own heartbeat 
> 2, and bar picks up that same packet (foo's heartbeat 2) as well. So 
> here they are not eating each others packets, but each process sees the 
> same packets.
> 
> Run it to see what you'll get on your setup.
> 
> cheers,
> - Reto
> 
> %%%-------------------------------------------------------------------
> %%% File    : discover_server.erl
> %%% Author  : reto <kramer@REDACTED>
> %%% Description : Basic multicast based heart-beating
> %%%-------------------------------------------------------------------
> -module(discover_server).
> 
> -behaviour(gen_server).
> %%--------------------------------------------------------------------
> %% Include files
> %%--------------------------------------------------------------------
> 
> %%--------------------------------------------------------------------
> %% External exports
> -export([start_link/0]).
> 
> -export([run/0]).
> 
> -export([trigger_heartbeat/0]).
> 
> %% gen_server callbacks
> -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
> 	 terminate/2, code_change/3]).
> 
> -record(state, {socket,     % multicast udp socket
> 		nodes,      % set of nodes (atoms)
> 		send_timer, % timer used to send heartbeats
> 		counter
> 	       }).
> 
> -define(HB_PERIOD_MS, 1000).  % (if not in .app) default hb interval
> -define(ADDR, {239,0,42,1}).  % (if not in .app) default addr
> -define(PORT, 10042).         % (if not in .app) default port
> 
> -define(SERVER, ?MODULE).
> 
> %%====================================================================
> %% External functions
> %%====================================================================
> %%--------------------------------------------------------------------
> %% Function: start_link/0
> %% Description: Starts the server
> %%--------------------------------------------------------------------
> start_link() ->
>      gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
> 
> %%====================================================================
> %% Server functions
> %%====================================================================
> 
> %%--------------------------------------------------------------------
> %% Function: init/1
> %% Description: Initiates the server
> %% Returns: {ok, State}          |
> %%          {ok, State, Timeout} |
> %%          ignore               |
> %%          {stop, Reason}
> %%--------------------------------------------------------------------
> init([]) ->
>      process_flag(trap_exit, true),
>      {ok, Socket} = gen_udp:open(get_port(), [binary,
> 					     {reuseaddr,true},
> 					     {ip,get_addr()},
> 					     {multicast_loop,true},
> 					     {add_membership,{get_addr(),
> 							      {0,0,0,0}}}]),
>      % net_kernel:monitor_nodes(true),
>      {ok, TRef} = timer:apply_interval(get_period(),
> 				      ?MODULE, trigger_heartbeat, []),
>      {ok, #state{socket=Socket,
> 		nodes=sets:new(),
> 		send_timer=TRef,
> 		counter=1}}.
> 
> %%--------------------------------------------------------------------
> %% Function: handle_call/3
> %% Description: Handling call messages
> %% Returns: {reply, Reply, State}          |
> %%          {reply, Reply, State, Timeout} |
> %%          {noreply, State}               |
> %%          {noreply, State, Timeout}      |
> %%          {stop, Reason, Reply, State}   | (terminate/2 is called)
> %%          {stop, Reason, State}            (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_call(Request, From, State) ->
>      Reply = ok,
>      {reply, Reply, State}.
> 
> %%--------------------------------------------------------------------
> %% Function: handle_cast/2
> %% Description: Handling cast messages
> %% Returns: {noreply, State}          |
> %%          {noreply, State, Timeout} |
> %%          {stop, Reason, State}            (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_cast({send_heartbeat}, State) ->
>      send_heartbeat(State),
>      NewState = State#state{counter = State#state.counter + 1},
>      {noreply, NewState};
> handle_cast(Msg, State) ->
>      {noreply, State}.
> 
> %%--------------------------------------------------------------------
> %% Function: handle_info/2
> %% Description: Handling all non call/cast messages
> %% Returns: {noreply, State}          |
> %%          {noreply, State, Timeout} |
> %%          {stop, Reason, State}            (terminate/2 is called)
> %%--------------------------------------------------------------------
> handle_info({udp, Socket, IP, InPortNo, Packet}, State) ->
>      {node_name, Node,
>       counter, Counter} = binary_to_term(Packet),
>      io:format("received packet ~p from node ~p~n",[Counter, Node]),
>      {noreply, State};
> handle_info(Info, State) ->
>      {noreply, State}.
> 
> %%--------------------------------------------------------------------
> %% Function: terminate/2
> %% Description: Shutdown the server
> %% Returns: any (ignored by gen_server)
> %%--------------------------------------------------------------------
> terminate(Reason, State) ->
>      ok.
> 
> %%--------------------------------------------------------------------
> %% Func: code_change/3
> %% Purpose: Convert process state when code is changed
> %% Returns: {ok, NewState}
> %%--------------------------------------------------------------------
> code_change(OldVsn, State, Extra) ->
>      {ok, State}.
> 
> %%--------------------------------------------------------------------
> %%% Internal functions
> %%--------------------------------------------------------------------
> trigger_heartbeat() ->
>      gen_server:cast(?SERVER, {send_heartbeat}).
> 
> send_heartbeat(State) ->
>      Packet = {node_name, node(),
> 	      counter, State#state.counter},
>      ok = gen_udp:send(State#state.socket,
> 		      get_addr(),
> 		      get_port(),
> 		      term_to_binary(Packet)).
> 
> get_addr() ->
>      case application:get_application() of
> 	undefined ->
> 	    ?ADDR;
> 	{ok, App} ->
> 	    case application:get_env(App, addr) of
> 		undefined ->
> 		    ?ADDR;
> 		{ok, Val} ->
> 		    Val
> 	    end
>      end.
> 
> get_port() ->
>      case application:get_application() of
> 	undefined ->
> 	    ?PORT;
> 	{ok, App} ->
> 	    case application:get_env(App, port) of
> 		undefined ->
> 		    ?PORT;
> 		{ok, Val} ->
> 		    Val
> 	    end
>      end.
> 
> get_period() ->
>      case application:get_application() of
> 	undefined ->
> 	    ?HB_PERIOD_MS;
> 	{ok, App} ->
> 	    case application:get_env(App, period) of
> 		undefined ->
> 		    ?HB_PERIOD_MS;
> 		{ok, Val} ->
> 		    Val
> 	    end
>      end.
> 
> 
> %%--------------------------------------------------------------------
> %%% debug functions
> %%--------------------------------------------------------------------
> run() ->
>      {ok, Pid} = ?MODULE:start_link(),
>      io:format("Running for 5 minutes.~n",[]),
>      timer:sleep(5*60*1000),
>      io:format("done.~n",[]).
> 
> #
> 
> On Sep 2, 2004, at 12:39 PM, Reto Kramer wrote:
> 
> > The following works on the OSX and Linux for me.  I had trouble 
> > opening the multicast socket on WinXP, but no patience to figure out 
> > the cause.
> >
> > cheers,
> > - Reto
> >
> > Open a multicast port with:
> >     Res = gen_udp:open(get_port(), [binary,
> > 				    	{reuseaddr,true},
> > 				    {ip,get_addr()},
> > 				    {multicast_loop,true},
> > 				    {add_membership,{get_addr(),
> > 						     {0,0,0,0}}}]),
> >
> > where
> >   get_addr() ->
> >       {ok, App} = application:get_application(),
> >       case application:get_env(App, addr) of
> >   	   {ok, Val} ->
> > 	      Val;
> > 	   undefined ->
> > 	      ?ADDR
> >       end.
> >   get_port() -> ...
> >
> > and the env key of the .app file is
> >    {env,          [{addr, {239,0,0,1}}, % the multicast ip addr
> > 			     {port, 10042},       % the port we multicast at
> > 			     {period, 1000}]},    % heartbeat interval in ms
> >
> > Send packets with
> >     Packet = {node_name, node()},
> >     ok = gen_udp:send(State#state.socket,
> > 		      get_addr(),
> > 		      get_port(),
> > 		      term_to_binary(Packet)),
> >
> > and receive packets like this:
> >   receive
> > 	{udp, Socket, IP, InPortNo, Packet} ->
> > 	    {node_name, Node} = binary_to_term(Packet),
> >
> > #
> >
> > On Sep 2, 2004, at 12:03 PM, Martin J. Logan wrote:
> >
> >> Is it possible to have multiple nodes on a single machine listening 
> >> for
> >> the same multicast packets at the same time? How can this be
> >> accomplished within erts?
> >>
> >> Cheers,
> >> Martin
> >>
> >




More information about the erlang-questions mailing list