[erlang-questions] gen_leader

wde <>
Thu Jun 25 03:25:28 CEST 2009


hello all,



few questions about gen_leader module :



---

the 'test_cb' module provided with the gen_leader module specify a {noreply,Dict} for handle_info/2 and  handle_cast/2 whereas  gen_leader use handle_common_reply to handle callback response for handle_info/2 and handle_cast/2 api. 

Why handle_info and handle_cast do not accept classical return values (gen_server like : {noreply,NewState} | {noreply,NewState,Timeout}  | {noreply,NewState,hibernate} ) ?


---

Buffered calls never deleted ? 

The following function handle_msg/4 in gen_leader :

 handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,#election{buffered = Buffered} = E) ->
    {value, {_,From}} = keysearch(Ref,1,Buffered),
    NewServer = reply(From, {leader,reply,Reply}, Server, Role, E#election{buffered = keydelete(Ref,1,Buffered)}),
    loop(NewServer, Role, E, Msg);


It seems that we loop without deleting the old reference. I modify the function like that :


handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,#election{buffered = Buffered} = E) ->
    {value, {_,From}} = keysearch(Ref,1,Buffered),
    E1 = E#election{buffered = keydelete(Ref,1,Buffered)},
    NewServer = reply(From, {leader,reply,Reply}, Server, Role, E1),
    % keep the new opaque election in the loop 
    loop(NewServer, Role, E1, Msg);


---

Incarnation value is stored on file system in a file which is unique for a node (the file name is the node name). 

A simple modification could use the callback module name (which is application specific) to build the incarnation file. By using this, we could have a gen_leader cluster "per callback_module" on the same node :


incarnation/1 must be replaced by incarnation/2

incarnation(Node,Mod) ->
    Fname = atom_to_list(Mod) ++ atom_to_list(Node),
    case file:read_file_info(Fname) of
	{error,Reason} ->
	    ok = file:write_file(Fname,term_to_binary(1)),
	    0;
	{ok,_} ->
	    {ok,Bin} = file:read_file(Fname),
	    Incarn = binary_to_term(Bin),
	    ok = file:write_file(Fname,term_to_binary(Incarn+1)),
	    Incarn
 end.

---


Idea of implementation to add dynamically node in the gen_leader cluster. I used a "synchronization process" to synchronize the new node with the current leader and others candidates/workers.



-> Start function :
	-> instead of specify the candidates and workers list, I just provide one argument which define the type of
		the node in the gen_cluster : candidate | worker

-> Initialization process : init_it function
	-> instead of calling safe_loop/4 I enter in a "synchronization state"  by calling sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug) where Type is the new argument to define the node type (candidate or worker)


sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug) ->
	LockId = {list_to_atom("gen_leader_" ++ atom_to_list(Name)),self()},
	global:sync(),
	try 
		true = global:set_lock(LockId),
		[
			begin
				io:format("Sending syncing request to ~p with name ~p\n",[Node,Name]),
				{Name,Node} ! {'$gen_sync_request',node(),Type,self()}  
			end|| Node <- nodes() 
		],
		receive
			% parent finished ?
			{'EXIT', _, Reason} ->
				.io:format("Received exit request during synching : ~p\n",[Reason]),
				exit(Reason);
			{'$gen_sync_response',_From,Candidates,Workers} ->
				.io:format("Received synching from leader : candidates(~p) - workers(~p)\n",[Candidates,Workers]),
				Election = initialize_election(LockId,Name,Type,Candidates,Workers),
				case Type of
					{candidate,_} ->
						NewE = startStage1(Election#election{incarn = incarnation(node(),Mod)}),
						proc_lib:init_ack(Starter, {ok, self()}),
						.io:format("After synching, ~p starts as candidate (~p)\n",[node(),NewE]),
						%safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, candidate, NewE,{init});
						hasBecomeLeader(NewE,#server{parent = Parent,mod = Mod,state = State,debug = Debug},{init});
					worker ->
						proc_lib:init_ack(Starter, {ok, self()}),
						.io:format("After synching, ~p starts as worker (~p)\n",[node(),Election]),
						safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, waiting_worker, Election,{init})
				end;
			_Msg ->
				.io:format("Sync loop received unknown message : ~p\n",[_Msg]),
				sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug)
		after ?SYNC_DELAY ->
			.io:format("Synching timeout : ~p (~p)\n",[node(),Type]),
			Election = initialize_election(LockId,Name,Type,[],[]),
			case Type of
				{candidate,_} ->
					NewE = startStage1(Election#election{incarn = incarnation(node(),Mod)}),
					proc_lib:init_ack(Starter, {ok, self()}),
					.io:format("After synching, ~p starts as candidate (~p)\n",[node(),NewE]),
					%safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, candidate, NewE,{init});
					hasBecomeLeader(NewE,#server{parent = Parent,mod = Mod,state = State,debug = Debug},{init});
				worker ->
					proc_lib:init_ack(Starter, {ok, self()}),
					.io:format("After synching, ~p starts as worker (~p)\n",[node(),Election]),
					safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, waiting_worker, Election,{init})
			end
		end
	after
		global:del_lock(LockId)
	end.
	


initialize_election(LockId,Name,candidate,Candidates,Workers) ->
	.io:format("Initialize election for a candidate\n"),
	#election{
		lockid = LockId,
		locked = true,
		name = Name, 
		candidate_nodes = 
			case lists:member(node(),Candidates) of
				true -> Candidates;
				false -> Candidates ++ [node()]
			end,
		worker_nodes = Workers,
		nextel = 0
	};
	
initialize_election(LockId,Name,worker,Candidates,Workers) ->
	.io:format("Initialize election for a worker\n"),
	#election{
		lockid = LockId,
		locked = true,
		name = Name, 
		candidate_nodes = Candidates,
		worker_nodes = 
			case lists:member(node(),Workers) of
				true -> Workers;
				false -> Workers ++ [node()]
			end,
		nextel = 0
	}.	




-> safe_loop remove the synchronization lock :

%  when locking is activated
safe_loop(Server, Role,#election{name = Name, locked = Locked, lockid = LockId} = E, PrevMsg) when Locked == true ->
	.io:format("~p unlock election process\n",[node()]),
	global:del_lock(LockId),
	safe_loop(Server, Role, E#election{locked = false}, PrevMsg);


-> same thing for loop :

loop(Server, Role,#election{name = Name, locked = Locked, lockid = LockId} = E, PrevMsg) when Locked == true ->
	.io:format("~p unlock election process\n",[node()]),
	global:del_lock(LockId),
	loop(Server, Role, E#election{locked = false}, PrevMsg);



-> new handlers for the new synchronization messages '$gen_sync_request' '$gen_sync_response' :

% Ack to synchronize
handle_msg({'$gen_sync_request',Node,Type,From} = Msg, #server{mod = Mod, state = State} = Server, Role, E) ->
	if 
		Role == elected ->
			.io:format("Leader received synchronization request from ~p\n",[From]),
			From ! {'$gen_sync_response',self(),E#election.candidate_nodes, E#election.worker_nodes};
		true ->
			ok
	end,
	case Type of
		candidate ->
			NewCandidates = [Node | E#election.candidate_nodes -- [Node]],
			NewDown = [Node | E#election.down -- [Node]], 
			E1 = E#election{down = NewDown, candidate_nodes = NewCandidates};
		worker ->
			NewWorkers = [Node | E#election.worker_nodes -- [Node]],
			E1 = E#election{ work_down = [Node | (E#election.work_down -- [Node])], worker_nodes = NewWorkers}
	end,
	loop(Server, Role, E1,Msg);


handle_msg({'$gen_sync_response',_From,_Candidates,_Workers} = Msg,Server,Role,E) ->
	.io:format("Old message gen_sync_response discarded\n"),
	loop(Server,Role,E,Msg);	



And that's all...


I can't provide you at this time with the full code (not cleaned, external dependencies, and i introduced a weight parameter for each node (in order to select the best candidate) which is not yet tested) ...





thank you


wde














 




More information about the erlang-questions mailing list