Multicast

Reto Kramer kramer@REDACTED
Fri Sep 3 03:58:48 CEST 2004


[ups, sent this from wrong account initially]

Martin,

Here's a complete testcase that you can run locally. It's a dumbed down 
version of a larget app.

Compile this file and then run it in two terminals with:

erl -sname foo -s discover_server run
erl -sname bar -s discover_server run

Each node (foo and bar) will send numbered heartbeats to the multicast 
address and each one will pick up it's own packets as well as packets 
send by the other node.

(foo@REDACTED)1> Running for 5 minutes.
received packet 1 from node foo@REDACTED
received packet 2 from node foo@REDACTED
received packet 1 from node bar@REDACTED
received packet 3 from node foo@REDACTED
received packet 2 from node bar@REDACTED
received packet 4 from node foo@REDACTED
received packet 3 from node bar@REDACTED
received packet 5 from node foo@REDACTED

(bar@REDACTED)1> Running for 5 minutes.
received packet 2 from node foo@REDACTED
received packet 1 from node bar@REDACTED
received packet 3 from node foo@REDACTED
received packet 2 from node bar@REDACTED
received packet 4 from node foo@REDACTED
received packet 3 from node bar@REDACTED

This is running on one OSX box.  E.g. foo picks up it's own heartbeat 
2, and bar picks up that same packet (foo's heartbeat 2) as well. So 
here they are not eating each others packets, but each process sees the 
same packets.

Run it to see what you'll get on your setup.

cheers,
- Reto

%%%-------------------------------------------------------------------
%%% File    : discover_server.erl
%%% Author  : reto <kramer@REDACTED>
%%% Description : Basic multicast based heart-beating
%%%-------------------------------------------------------------------
-module(discover_server).

-behaviour(gen_server).
%%--------------------------------------------------------------------
%% Include files
%%--------------------------------------------------------------------

%%--------------------------------------------------------------------
%% External exports
-export([start_link/0]).

-export([run/0]).

-export([trigger_heartbeat/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
	 terminate/2, code_change/3]).

-record(state, {socket,     % multicast udp socket
		nodes,      % set of nodes (atoms)
		send_timer, % timer used to send heartbeats
		counter
	       }).

-define(HB_PERIOD_MS, 1000).  % (if not in .app) default hb interval
-define(ADDR, {239,0,42,1}).  % (if not in .app) default addr
-define(PORT, 10042).         % (if not in .app) default port

-define(SERVER, ?MODULE).

%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%====================================================================
%% Server functions
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%--------------------------------------------------------------------
init([]) ->
     process_flag(trap_exit, true),
     {ok, Socket} = gen_udp:open(get_port(), [binary,
					     {reuseaddr,true},
					     {ip,get_addr()},
					     {multicast_loop,true},
					     {add_membership,{get_addr(),
							      {0,0,0,0}}}]),
     % net_kernel:monitor_nodes(true),
     {ok, TRef} = timer:apply_interval(get_period(),
				      ?MODULE, trigger_heartbeat, []),
     {ok, #state{socket=Socket,
		nodes=sets:new(),
		send_timer=TRef,
		counter=1}}.

%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_call(Request, From, State) ->
     Reply = ok,
     {reply, Reply, State}.

%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast({send_heartbeat}, State) ->
     send_heartbeat(State),
     NewState = State#state{counter = State#state.counter + 1},
     {noreply, NewState};
handle_cast(Msg, State) ->
     {noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({udp, Socket, IP, InPortNo, Packet}, State) ->
     {node_name, Node,
      counter, Counter} = binary_to_term(Packet),
     io:format("received packet ~p from node ~p~n",[Counter, Node]),
     {noreply, State};
handle_info(Info, State) ->
     {noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(Reason, State) ->
     ok.

%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%--------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
     {ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
trigger_heartbeat() ->
     gen_server:cast(?SERVER, {send_heartbeat}).

send_heartbeat(State) ->
     Packet = {node_name, node(),
	      counter, State#state.counter},
     ok = gen_udp:send(State#state.socket,
		      get_addr(),
		      get_port(),
		      term_to_binary(Packet)).

get_addr() ->
     case application:get_application() of
	undefined ->
	    ?ADDR;
	{ok, App} ->
	    case application:get_env(App, addr) of
		undefined ->
		    ?ADDR;
		{ok, Val} ->
		    Val
	    end
     end.

get_port() ->
     case application:get_application() of
	undefined ->
	    ?PORT;
	{ok, App} ->
	    case application:get_env(App, port) of
		undefined ->
		    ?PORT;
		{ok, Val} ->
		    Val
	    end
     end.

get_period() ->
     case application:get_application() of
	undefined ->
	    ?HB_PERIOD_MS;
	{ok, App} ->
	    case application:get_env(App, period) of
		undefined ->
		    ?HB_PERIOD_MS;
		{ok, Val} ->
		    Val
	    end
     end.


%%--------------------------------------------------------------------
%%% debug functions
%%--------------------------------------------------------------------
run() ->
     {ok, Pid} = ?MODULE:start_link(),
     io:format("Running for 5 minutes.~n",[]),
     timer:sleep(5*60*1000),
     io:format("done.~n",[]).

#

On Sep 2, 2004, at 12:39 PM, Reto Kramer wrote:

> The following works on the OSX and Linux for me.  I had trouble 
> opening the multicast socket on WinXP, but no patience to figure out 
> the cause.
>
> cheers,
> - Reto
>
> Open a multicast port with:
>     Res = gen_udp:open(get_port(), [binary,
> 				    	{reuseaddr,true},
> 				    {ip,get_addr()},
> 				    {multicast_loop,true},
> 				    {add_membership,{get_addr(),
> 						     {0,0,0,0}}}]),
>
> where
>   get_addr() ->
>       {ok, App} = application:get_application(),
>       case application:get_env(App, addr) of
>   	   {ok, Val} ->
> 	      Val;
> 	   undefined ->
> 	      ?ADDR
>       end.
>   get_port() -> ...
>
> and the env key of the .app file is
>    {env,          [{addr, {239,0,0,1}}, % the multicast ip addr
> 			     {port, 10042},       % the port we multicast at
> 			     {period, 1000}]},    % heartbeat interval in ms
>
> Send packets with
>     Packet = {node_name, node()},
>     ok = gen_udp:send(State#state.socket,
> 		      get_addr(),
> 		      get_port(),
> 		      term_to_binary(Packet)),
>
> and receive packets like this:
>   receive
> 	{udp, Socket, IP, InPortNo, Packet} ->
> 	    {node_name, Node} = binary_to_term(Packet),
>
> #
>
> On Sep 2, 2004, at 12:03 PM, Martin J. Logan wrote:
>
>> Is it possible to have multiple nodes on a single machine listening 
>> for
>> the same multicast packets at the same time? How can this be
>> accomplished within erts?
>>
>> Cheers,
>> Martin
>>
>




More information about the erlang-questions mailing list