[erlang-questions] gen_leader
wde
wde@REDACTED
Thu Jun 25 03:25:28 CEST 2009
hello all,
few questions about gen_leader module :
---
the 'test_cb' module provided with the gen_leader module specify a {noreply,Dict} for handle_info/2 and handle_cast/2 whereas gen_leader use handle_common_reply to handle callback response for handle_info/2 and handle_cast/2 api.
Why handle_info and handle_cast do not accept classical return values (gen_server like : {noreply,NewState} | {noreply,NewState,Timeout} | {noreply,NewState,hibernate} ) ?
---
Buffered calls never deleted ?
The following function handle_msg/4 in gen_leader :
handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,#election{buffered = Buffered} = E) ->
{value, {_,From}} = keysearch(Ref,1,Buffered),
NewServer = reply(From, {leader,reply,Reply}, Server, Role, E#election{buffered = keydelete(Ref,1,Buffered)}),
loop(NewServer, Role, E, Msg);
It seems that we loop without deleting the old reference. I modify the function like that :
handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,#election{buffered = Buffered} = E) ->
{value, {_,From}} = keysearch(Ref,1,Buffered),
E1 = E#election{buffered = keydelete(Ref,1,Buffered)},
NewServer = reply(From, {leader,reply,Reply}, Server, Role, E1),
% keep the new opaque election in the loop
loop(NewServer, Role, E1, Msg);
---
Incarnation value is stored on file system in a file which is unique for a node (the file name is the node name).
A simple modification could use the callback module name (which is application specific) to build the incarnation file. By using this, we could have a gen_leader cluster "per callback_module" on the same node :
incarnation/1 must be replaced by incarnation/2
incarnation(Node,Mod) ->
Fname = atom_to_list(Mod) ++ atom_to_list(Node),
case file:read_file_info(Fname) of
{error,Reason} ->
ok = file:write_file(Fname,term_to_binary(1)),
0;
{ok,_} ->
{ok,Bin} = file:read_file(Fname),
Incarn = binary_to_term(Bin),
ok = file:write_file(Fname,term_to_binary(Incarn+1)),
Incarn
end.
---
Idea of implementation to add dynamically node in the gen_leader cluster. I used a "synchronization process" to synchronize the new node with the current leader and others candidates/workers.
-> Start function :
-> instead of specify the candidates and workers list, I just provide one argument which define the type of
the node in the gen_cluster : candidate | worker
-> Initialization process : init_it function
-> instead of calling safe_loop/4 I enter in a "synchronization state" by calling sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug) where Type is the new argument to define the node type (candidate or worker)
sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug) ->
LockId = {list_to_atom("gen_leader_" ++ atom_to_list(Name)),self()},
global:sync(),
try
true = global:set_lock(LockId),
[
begin
io:format("Sending syncing request to ~p with name ~p\n",[Node,Name]),
{Name,Node} ! {'$gen_sync_request',node(),Type,self()}
end|| Node <- nodes()
],
receive
% parent finished ?
{'EXIT', _, Reason} ->
.io:format("Received exit request during synching : ~p\n",[Reason]),
exit(Reason);
{'$gen_sync_response',_From,Candidates,Workers} ->
.io:format("Received synching from leader : candidates(~p) - workers(~p)\n",[Candidates,Workers]),
Election = initialize_election(LockId,Name,Type,Candidates,Workers),
case Type of
{candidate,_} ->
NewE = startStage1(Election#election{incarn = incarnation(node(),Mod)}),
proc_lib:init_ack(Starter, {ok, self()}),
.io:format("After synching, ~p starts as candidate (~p)\n",[node(),NewE]),
%safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, candidate, NewE,{init});
hasBecomeLeader(NewE,#server{parent = Parent,mod = Mod,state = State,debug = Debug},{init});
worker ->
proc_lib:init_ack(Starter, {ok, self()}),
.io:format("After synching, ~p starts as worker (~p)\n",[node(),Election]),
safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, waiting_worker, Election,{init})
end;
_Msg ->
.io:format("Sync loop received unknown message : ~p\n",[_Msg]),
sync_with_leader(Starter,Parent,Name,Mod,Type,State,Debug)
after ?SYNC_DELAY ->
.io:format("Synching timeout : ~p (~p)\n",[node(),Type]),
Election = initialize_election(LockId,Name,Type,[],[]),
case Type of
{candidate,_} ->
NewE = startStage1(Election#election{incarn = incarnation(node(),Mod)}),
proc_lib:init_ack(Starter, {ok, self()}),
.io:format("After synching, ~p starts as candidate (~p)\n",[node(),NewE]),
%safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, candidate, NewE,{init});
hasBecomeLeader(NewE,#server{parent = Parent,mod = Mod,state = State,debug = Debug},{init});
worker ->
proc_lib:init_ack(Starter, {ok, self()}),
.io:format("After synching, ~p starts as worker (~p)\n",[node(),Election]),
safe_loop(#server{parent = Parent,mod = Mod,state = State,debug = Debug}, waiting_worker, Election,{init})
end
end
after
global:del_lock(LockId)
end.
initialize_election(LockId,Name,candidate,Candidates,Workers) ->
.io:format("Initialize election for a candidate\n"),
#election{
lockid = LockId,
locked = true,
name = Name,
candidate_nodes =
case lists:member(node(),Candidates) of
true -> Candidates;
false -> Candidates ++ [node()]
end,
worker_nodes = Workers,
nextel = 0
};
initialize_election(LockId,Name,worker,Candidates,Workers) ->
.io:format("Initialize election for a worker\n"),
#election{
lockid = LockId,
locked = true,
name = Name,
candidate_nodes = Candidates,
worker_nodes =
case lists:member(node(),Workers) of
true -> Workers;
false -> Workers ++ [node()]
end,
nextel = 0
}.
-> safe_loop remove the synchronization lock :
% when locking is activated
safe_loop(Server, Role,#election{name = Name, locked = Locked, lockid = LockId} = E, PrevMsg) when Locked == true ->
.io:format("~p unlock election process\n",[node()]),
global:del_lock(LockId),
safe_loop(Server, Role, E#election{locked = false}, PrevMsg);
-> same thing for loop :
loop(Server, Role,#election{name = Name, locked = Locked, lockid = LockId} = E, PrevMsg) when Locked == true ->
.io:format("~p unlock election process\n",[node()]),
global:del_lock(LockId),
loop(Server, Role, E#election{locked = false}, PrevMsg);
-> new handlers for the new synchronization messages '$gen_sync_request' '$gen_sync_response' :
% Ack to synchronize
handle_msg({'$gen_sync_request',Node,Type,From} = Msg, #server{mod = Mod, state = State} = Server, Role, E) ->
if
Role == elected ->
.io:format("Leader received synchronization request from ~p\n",[From]),
From ! {'$gen_sync_response',self(),E#election.candidate_nodes, E#election.worker_nodes};
true ->
ok
end,
case Type of
candidate ->
NewCandidates = [Node | E#election.candidate_nodes -- [Node]],
NewDown = [Node | E#election.down -- [Node]],
E1 = E#election{down = NewDown, candidate_nodes = NewCandidates};
worker ->
NewWorkers = [Node | E#election.worker_nodes -- [Node]],
E1 = E#election{ work_down = [Node | (E#election.work_down -- [Node])], worker_nodes = NewWorkers}
end,
loop(Server, Role, E1,Msg);
handle_msg({'$gen_sync_response',_From,_Candidates,_Workers} = Msg,Server,Role,E) ->
.io:format("Old message gen_sync_response discarded\n"),
loop(Server,Role,E,Msg);
And that's all...
I can't provide you at this time with the full code (not cleaned, external dependencies, and i introduced a weight parameter for each node (in order to select the best candidate) which is not yet tested) ...
thank you
wde
More information about the erlang-questions
mailing list