architectural patterns for unifying state from multiple upstream processes
jdmeta@REDACTED
jdmeta@REDACTED
Wed Apr 29 03:11:31 CEST 2020
in order to take advantage of the concurrency intrinsic within a top-level
task, it is decomposed into a graph of inter-dependent processes.
there exists scenarios where, in order for a given process in this graph to do
its work, it requires the state contained in messages originating from
multiple other processes in said graph.
my question is: what are the optimal architectural patterns for dealing with
this scenario?
below is a trivial example of this scenario where a top-level service is
decomposed into xService and yService which run concurrently.
however, while yService performs some sub-task concurrently, it ultimately
needs the output of xService to finish this sub-task (in this contrived case,
to know which downstream service to send a message to where said message is
some union of the work of xService and yService).
the approach I have taken is to have the top-level task (called, strangely
enough, topLevel) put a common Ref in messages it sends to both xService and
yService.
this Ref is passed downstream by intermediate services and yService (the
unifying service) uses the Ref as a key into a map which coalesces state from
multiple messages.
after all state associated with a given Ref has arrived, yService sends a
message to the designated downstream process - either z1Service or z2Service
in this example.
note that in this contrived example only one set of linked messages is sent
downstream from topLevel.
I have another version where topLevel fabricates N sets of linked messages and
sends them to xService and yService in random order - the map approach works
in that case as well.
question: is there a better way to do this?
note: I am most interested in feedback on improving the macro-architectural
pattern - feedback on lower level coding details is not the primary focus of
this question (but of course I'm happy to hear alternative ideas - for
example, the two flavors of storeOrSend() grate against my soul).
thanx for firing your neurons!
danger: contrived example code follows...
-module( test ).
-export( [ start/0, topLevel/0 ] ).
-export( [ x/1, y/1, z1/0, z2/0 ] ).
start() ->
register( z1Service, spawn( test, z1, [ ] ) ),
register( z2Service, spawn( test, z2, [ ] ) ),
register( yService, spawn( test, y, [ maps:new() ] ) ),
register( xService, spawn( test, x, [ [ z1Service, z2Service ] ] ) ).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
topLevel() ->
Ref = make_ref(), % a common reference is used to link multiple messages
so that they can be coalesced downstream
xService ! { app, Ref, a },
yService ! { app, Ref, function, b }.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
x( Services ) ->
receive
{ app, Ref, a } -> % in a real-world app the 3rd element would
designate which more involved logic to apply
Select = rand:uniform( length( Services ) ),
yService ! { app, Ref, service, lists:nth( Select, Services ) };
Other ->
io:format( "x: invalid message: ~w~n", [ Other ] )
end,
x( Services ).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
y( Map ) ->
receive
{ app, Ref, Type, Value } ->
NewMap = storeOrSend( Map, Ref, Type, Value ), % save
intermediate message state in a map keyed by Ref
y( NewMap ); % when all
messages have arrived send to next downstream process
Other ->
io:format( "y: invalid message: ~w~n", [ Other ] ),
y( Map )
end.
storeOrSend( Map, Ref, function, F ) ->
case maps:find( Ref, Map ) of
{ ok, { service, S } } ->
S ! { app, c, F },
NewMap = maps:remove( Ref, Map );
{ ok, { function, _ } } ->
io:format( "storeOrSend: back-to-back function messages on same
Ref~n", [ ] ),
NewMap = Map;
error ->
NewMap = maps:put( Ref, { function, F }, Map )
end,
NewMap;
storeOrSend( Map, Ref, service, S ) ->
case maps:find( Ref, Map ) of
{ ok, { function, F } } ->
S ! { app, c, F },
NewMap = maps:remove( Ref, Map );
{ ok, { service, _ } } ->
io:format( "storeOrSend: back-to-back service messages on same
Ref~n", [ ] ),
NewMap = Map;
error ->
NewMap = maps:put( Ref, { service, S }, Map )
end,
NewMap.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% placeholder downstream processes
z1() ->
receive
Msg -> io:format( "z1 received: ~w~n", [ Msg ] )
end,
z1().
z2() ->
receive
Msg -> io:format( "z2 received: ~w~n", [ Msg ] )
end,
z2().
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20200428/d007f98e/attachment-0001.htm>
More information about the erlang-questions
mailing list