[erlang-questions] messages manipulatio

Jayson Barley <>
Wed Feb 27 00:45:40 CET 2013


I did something similar for a gated worker pool. It allowed me to slow the
incoming data to only as fast as my workers are able to process it.

>From the worker process:
handle_cast(get_data, State = #state{data=Data}) ->
    gp_server:child_data(Data),
    {noreply, State#state{data=undefined}};
handle_cast(run, State = #state{worker_fun=Fun, data_provider={Module,
Function, Args}, data=undefined})     Input = apply(Module, Function, Args),
    Data = apply(Fun, Input),
    gp_server:child_ready(self()),
    {noreply, State#state{data=Data}}.



>From the server process:
handle_call(get_data, _From, State = #state{ready_children=[Child | Rest],
working_children=Working}) ->
    Reply = gen_server:call(Child, get_data),
    {reply, Reply, State#state{ready_children=Rest, working_children=[Child
| Working]}}.

handle_cast({ready, Child}, State = #state{ready_children=Children,
working_children=Working}) ->
    Rest = lists:delete(Child, Working),
    {noreply, State#state{ready_children=[Child | Children],
working_children=Rest}}.



It worked well for how I was using it.


On Sat, Feb 16, 2013 at 8:01 AM, sasa <> wrote:

> Having thought more, I realized the original approach can be simplified.
> This would be the modified version:
>
> 1. The facade keeps track whether the worker is available or it is working.
> 2. Upon receiving a message, the facade joins it with the messages already
> in the queue. Then, if the worker is available, it immediately sends data
> to it, and clears its own queue.
> 3. When the worker receives the notification, it performs the work, then
> it notifies the facade that it is available.
> 4. When the facade receives the information that the worker is available,
> if there is new data available, it will immediately send it to the worker
> as explained in 2.
>
> Thank you for your question, it had made me realized there is place for
> simplification.
>
> On Sat, Feb 16, 2013 at 5:58 AM, Erik Søe Sørensen <>wrote:
>
>> I must confess that (at least at this late hour) your Step 2 confuses me.
>> Why not just 1+3+4, or indeed *just* step 2?
>>
>> Regards,
>> /Erik
>> Den 15/02/2013 16.47 skrev "sasa" <>:
>>
>>>  Hello,
>>>
>>> A while ago I encountered the following situation:
>>>
>>> I had the gen_server base process P which would receive messages, and
>>> handle them by sending some data over the network. The messages were coming
>>> faster than they were being sent. I established the reason for this was the
>>> "randomness" of my network conditions. I also established that sending more
>>> messages at once was almost as fast as sending one message, i.e. the
>>> network push time wasn't highly dependent on the message size.
>>>
>>> To tackle this in a generic way, I devised an approach which has served
>>> me well in multiple places. I was repeatedly googling whether some similar
>>> solution exists, but I couldn't find it. Now, I'm not sure if I have
>>> reinvented a wheel, or the approach is not optimal, so I'm asking if you
>>> are aware of similar approaches, and are there any faults in this one?
>>>
>>> The approach I took is following:
>>>
>>> I split the server in two processes: the "facade" and the worker. The
>>> facade acceptes requests from clients, and stores them internally. While
>>> the worker is doing its work, new messages are stored in the facade. When
>>> the worker is available, it will take all accumulated messages from the
>>> facade and process them.
>>>
>>> These are the steps:
>>> 1. The facade receives messages, stores data in its list, and notifies
>>> the worker (without sending actual data), that some work is ready.
>>> 2. Upon receiving the notification, the worker first flushes its message
>>> queue by doing repeated receive ... after 0 as long as there are messages
>>> in the queue.
>>> 3. Then the worker pulls all messages from the facade. This is a
>>> gen_server:call to the facade which will return all messages, and at the
>>> same time remove them from its state.
>>> 4. Finally, the worker processes all messages.
>>>
>>>
>>> I found this approach useful because the delay on the worker adapts to
>>> the incoming message rate.
>>> If the worker can handle messages at the incoming rate, everything works
>>> without delay.
>>> If messages can't be handled at the incoming rate, the worker's delay
>>> will increase to accomodate the higher load. In other words, the worker
>>> will try to compensate the load by bulk processing messages. Obviously,
>>> this is useful only when process_time(N messages) < N * process_time(1
>>> message).
>>>
>>> Another benefit I found is that I can vary the implementation of the
>>> facade i.e. I can store messages using different algorithms. In the first
>>> implementation, I stored messages in a list. In one variation, I used hash
>>> which allowed me to eliminate duplicate messages. Another variant was
>>> truncation of the list, which allowed me to discard old messages if the
>>> queue was getting too large.
>>>
>>> As I said, this has served me well in the production for more than a
>>> year, and I have finally found the time to make a generic library out of
>>> it. Before putting it public, I'd like to check if there are similar
>>> solutions, or alternative approaches?
>>>
>>> Thanks, and best regards,
>>> Sasa
>>>
>>> _______________________________________________
>>> erlang-questions mailing list
>>> 
>>> http://erlang.org/mailman/listinfo/erlang-questions
>>>
>>>
>
> _______________________________________________
> erlang-questions mailing list
> 
> http://erlang.org/mailman/listinfo/erlang-questions
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20130226/41b86d65/attachment.html>


More information about the erlang-questions mailing list