[erlang-questions] messages manipulatio

sasa sasa555@REDACTED
Wed Feb 27 16:34:46 CET 2013


>From what I can see, this is more a pooling, but it has similarities to
what I described.

On Wed, Feb 27, 2013 at 12:45 AM, Jayson Barley <jayson.barley@REDACTED>wrote:

> I did something similar for a gated worker pool. It allowed me to slow the
> incoming data to only as fast as my workers are able to process it.
>
> From the worker process:
> handle_cast(get_data, State = #state{data=Data}) ->
>     gp_server:child_data(Data),
>     {noreply, State#state{data=undefined}};
> handle_cast(run, State = #state{worker_fun=Fun, data_provider={Module,
> Function, Args}, data=undefined})     Input = apply(Module, Function, Args),
>     Data = apply(Fun, Input),
>     gp_server:child_ready(self()),
>     {noreply, State#state{data=Data}}.
>
>
>
> From the server process:
> handle_call(get_data, _From, State = #state{ready_children=[Child | Rest],
> working_children=Working}) ->
>     Reply = gen_server:call(Child, get_data),
>     {reply, Reply, State#state{ready_children=Rest,
> working_children=[Child | Working]}}.
>
> handle_cast({ready, Child}, State = #state{ready_children=Children,
> working_children=Working}) ->
>     Rest = lists:delete(Child, Working),
>     {noreply, State#state{ready_children=[Child | Children],
> working_children=Rest}}.
>
>
>
> It worked well for how I was using it.
>
>
> On Sat, Feb 16, 2013 at 8:01 AM, sasa <sasa555@REDACTED> wrote:
>
>> Having thought more, I realized the original approach can be simplified.
>> This would be the modified version:
>>
>> 1. The facade keeps track whether the worker is available or it is
>> working.
>> 2. Upon receiving a message, the facade joins it with the messages
>> already in the queue. Then, if the worker is available, it immediately
>> sends data to it, and clears its own queue.
>> 3. When the worker receives the notification, it performs the work, then
>> it notifies the facade that it is available.
>> 4. When the facade receives the information that the worker is available,
>> if there is new data available, it will immediately send it to the worker
>> as explained in 2.
>>
>> Thank you for your question, it had made me realized there is place for
>> simplification.
>>
>> On Sat, Feb 16, 2013 at 5:58 AM, Erik Søe Sørensen <eriksoe@REDACTED>wrote:
>>
>>> I must confess that (at least at this late hour) your Step 2 confuses me.
>>> Why not just 1+3+4, or indeed *just* step 2?
>>>
>>> Regards,
>>> /Erik
>>> Den 15/02/2013 16.47 skrev "sasa" <sasa555@REDACTED>:
>>>
>>>>  Hello,
>>>>
>>>> A while ago I encountered the following situation:
>>>>
>>>> I had the gen_server base process P which would receive messages, and
>>>> handle them by sending some data over the network. The messages were coming
>>>> faster than they were being sent. I established the reason for this was the
>>>> "randomness" of my network conditions. I also established that sending more
>>>> messages at once was almost as fast as sending one message, i.e. the
>>>> network push time wasn't highly dependent on the message size.
>>>>
>>>> To tackle this in a generic way, I devised an approach which has served
>>>> me well in multiple places. I was repeatedly googling whether some similar
>>>> solution exists, but I couldn't find it. Now, I'm not sure if I have
>>>> reinvented a wheel, or the approach is not optimal, so I'm asking if you
>>>> are aware of similar approaches, and are there any faults in this one?
>>>>
>>>> The approach I took is following:
>>>>
>>>> I split the server in two processes: the "facade" and the worker. The
>>>> facade acceptes requests from clients, and stores them internally. While
>>>> the worker is doing its work, new messages are stored in the facade. When
>>>> the worker is available, it will take all accumulated messages from the
>>>> facade and process them.
>>>>
>>>> These are the steps:
>>>> 1. The facade receives messages, stores data in its list, and notifies
>>>> the worker (without sending actual data), that some work is ready.
>>>> 2. Upon receiving the notification, the worker first flushes its
>>>> message queue by doing repeated receive ... after 0 as long as there are
>>>> messages in the queue.
>>>> 3. Then the worker pulls all messages from the facade. This is a
>>>> gen_server:call to the facade which will return all messages, and at the
>>>> same time remove them from its state.
>>>> 4. Finally, the worker processes all messages.
>>>>
>>>>
>>>> I found this approach useful because the delay on the worker adapts to
>>>> the incoming message rate.
>>>> If the worker can handle messages at the incoming rate, everything
>>>> works without delay.
>>>> If messages can't be handled at the incoming rate, the worker's delay
>>>> will increase to accomodate the higher load. In other words, the worker
>>>> will try to compensate the load by bulk processing messages. Obviously,
>>>> this is useful only when process_time(N messages) < N * process_time(1
>>>> message).
>>>>
>>>> Another benefit I found is that I can vary the implementation of the
>>>> facade i.e. I can store messages using different algorithms. In the first
>>>> implementation, I stored messages in a list. In one variation, I used hash
>>>> which allowed me to eliminate duplicate messages. Another variant was
>>>> truncation of the list, which allowed me to discard old messages if the
>>>> queue was getting too large.
>>>>
>>>> As I said, this has served me well in the production for more than a
>>>> year, and I have finally found the time to make a generic library out of
>>>> it. Before putting it public, I'd like to check if there are similar
>>>> solutions, or alternative approaches?
>>>>
>>>> Thanks, and best regards,
>>>> Sasa
>>>>
>>>> _______________________________________________
>>>> erlang-questions mailing list
>>>> erlang-questions@REDACTED
>>>> http://erlang.org/mailman/listinfo/erlang-questions
>>>>
>>>>
>>
>> _______________________________________________
>> erlang-questions mailing list
>> erlang-questions@REDACTED
>> http://erlang.org/mailman/listinfo/erlang-questions
>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20130227/8c0a8148/attachment.htm>


More information about the erlang-questions mailing list