architectural patterns for unifying state from multiple upstream processes

jdmeta@REDACTED jdmeta@REDACTED
Wed Apr 29 03:11:31 CEST 2020


in order to take advantage of the concurrency intrinsic within a top-level
task, it is decomposed into a graph of inter-dependent processes.

there exists scenarios where, in order for a given process in this graph to do
its work, it requires the state contained in messages originating from
multiple other processes in said graph.

my question is: what are the optimal architectural patterns for dealing with
this scenario?

 

below is a trivial example of this scenario where a top-level service is
decomposed into xService and yService which run concurrently.

however, while yService performs some sub-task concurrently, it ultimately
needs the output of xService to finish this sub-task (in this contrived case,
to know which downstream service to send a message to where said message is
some union of the work of xService and yService).

 

the approach I have taken is to have the top-level task (called, strangely
enough, topLevel) put a common Ref in messages it sends to both xService and
yService.

this Ref is passed downstream by intermediate services and yService (the
unifying service) uses the Ref as a key into a map which coalesces state from
multiple messages.

after all state associated with a given Ref has arrived, yService sends a
message to the designated downstream process - either z1Service or z2Service
in this example.

note that in this contrived example only one set of linked messages is sent
downstream from topLevel.

I have another version where topLevel fabricates N sets of linked messages and
sends them to xService and yService in random order - the map approach works
in that case as well.

 

question: is there a better way to do this?

note: I am most interested in feedback on improving the macro-architectural
pattern - feedback on lower level coding details is not the primary focus of
this question (but of course I'm happy to hear alternative ideas - for
example, the two flavors of storeOrSend() grate against my soul).

 

thanx for firing your neurons!

 

danger: contrived example code follows...

 

-module( test ).

-export( [ start/0, topLevel/0 ] ).

-export( [ x/1, y/1, z1/0, z2/0 ] ).

 

start() ->

    register( z1Service, spawn( test, z1, [ ] ) ),

    register( z2Service, spawn( test, z2, [ ] ) ),

    register( yService,  spawn( test, y,  [ maps:new() ] ) ),

    register( xService,  spawn( test, x,  [ [ z1Service, z2Service ] ] ) ).

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

topLevel() ->

    Ref = make_ref(),   % a common reference is used to link multiple messages
so that they can be coalesced downstream

    xService ! { app, Ref, a },

    yService ! { app, Ref, function, b }.

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

x( Services ) ->

    receive

        { app, Ref, a } ->   % in a real-world app the 3rd element would
designate which more involved logic to apply

            Select = rand:uniform( length( Services ) ),

            yService ! { app, Ref, service, lists:nth( Select, Services ) };

        Other ->

            io:format( "x: invalid message: ~w~n", [ Other ] )

    end,

    x( Services ).

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

 

y( Map ) ->

    receive

        { app, Ref, Type, Value } ->

            NewMap = storeOrSend( Map, Ref, Type, Value ),   % save
intermediate message state in a map keyed by Ref

            y( NewMap );                                     % when all
messages have arrived send to next downstream process

        Other ->

            io:format( "y: invalid message: ~w~n", [ Other ] ),

            y( Map )

    end.

 

storeOrSend( Map, Ref, function, F ) ->

    case maps:find( Ref, Map ) of

        { ok, { service, S } } ->

            S ! { app, c, F },

            NewMap = maps:remove( Ref, Map );

        { ok, { function, _ } } ->

            io:format( "storeOrSend: back-to-back function messages on same
Ref~n", [ ] ),

            NewMap = Map;

        error ->

            NewMap = maps:put( Ref, { function, F }, Map )

    end,

    NewMap;

 

storeOrSend( Map, Ref, service, S ) ->

    case maps:find( Ref, Map ) of

        { ok, { function, F } } ->

            S ! { app, c, F },

            NewMap = maps:remove( Ref, Map );

        { ok, { service, _ } } ->

            io:format( "storeOrSend: back-to-back service messages on same
Ref~n", [ ] ),

            NewMap = Map;

        error ->

            NewMap = maps:put( Ref, { service, S }, Map )

    end,

    NewMap.

 

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

% placeholder downstream processes

 

z1() ->

    receive

        Msg -> io:format( "z1 received: ~w~n", [ Msg ] )

    end,

    z1().

 

z2() ->

    receive

        Msg -> io:format( "z2 received: ~w~n", [ Msg ] )

    end,

    z2().

 

 

 

 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20200428/d007f98e/attachment-0001.htm>


More information about the erlang-questions mailing list