AW: How to know when a multi-spawned process set finish its work?

Giovanni Giorgi jj@REDACTED
Tue Apr 13 21:14:07 CEST 2021


Hi all,

  my idea was to able to monitor the execution, but I must explore the 
gen_server+synchronous call in the future.

I was able to fix the bug following Maria suggestion (thank you Maria!).

The failing processes was dying due a redis timeout, probably because I 
used a redis MULTI/EXEC transaction which can lead to race conditions on 
the redis side.

I implemented a small database to track down failing processes and 
respawing... The idea is only to track down the timeout errors and so I 
changed the server to match "good" and "timeout" DOWN cases like

...

{'DOWN', Reference, process, _Pid, normal} ->
  indexerDaemon(RunningWorker-1,FilesProcessed+1, 
maps:remove(Reference,MonitorRefMap) );

{'DOWN', Reference, process, Pid, {timeout, Detail}} ->
  %% MMMmm we must assume still files to be processed?
  #{ Reference := FailedFile } = MonitorRefMap,
  io:format("!! Timeout Error on ~p ~n Detail: ~p~n", [FailedFile, 
{'DOWN', Reference, process, Pid, {timeout, Detail}}]),
  % We suppose a timeout error and we push back
  % Remove old Reference
  UpdatedRefMap=maps:remove(Reference,MonitorRefMap),
  NewPid=spawn(er_zauker_util, load_file_if_needed,[FailedFile]),
  MonitorRef = erlang:monitor(process,NewPid),
  NewRecoveryRefMap=UpdatedRefMap#{ MonitorRef => FailedFile },
  indexerDaemon(RunningWorker,FilesProcessed,NewRecoveryRefMap);

I do not know if there is some other smart way of doing it.

Thank you for your hints!!

...

On 2021-04-13 17:29, dieter@REDACTED wrote:

> Hi Giovanni,
> 
> I had a quick look into the code, and I think that the messages do not 
> match..
> 
> You wait for {worker, 0} in the waitAllWorkerDone function, but I do 
> not think that this message
> is generated anywhere.
> In that function you send a {self(), report} to the daemon, to which it 
> will respond with
> 
> {worker,RunningWorker,files_processed,FilesProcessed}.
> Although RunningWorker can eventually become 0 when all workers exit, 
> so the record starts with
> {worker, 0
> but there are the other two items which prevent the matching with the 
> expected outcome.
> Could this be the issue?
> 
> Anyway, thanks for sharing your code, it is always interesting to see 
> how somebody else is tackling a problem!
> 
> On a more highlevel view, are you really interesting in intermediate 
> results?
> If I wanted only the end result, I think I would use OTP with a 
> gen_server and a synchronous call.
> 
> kind regards,
> dieter
> 
> Am Mo., Apr. 12, 2021 18:36 schrieb Giovanni Giorgi <jj@REDACTED>:
> 
>> Hi all,
>> a newbie question here.
>> I have done a small erlang server following the behavior application, 
>> here
>> https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl
>> To make a long story short, my server scans a set of directories and 
>> index files using redis as backend database.
>> It works well when I runs on small set of files.
>> But when I run it on a very huge set of files, it seems to "finish" 
>> before indexing all the files; when it starts, the client wait until 
>> every file is processed and the server can send him a report about the 
>> status:
>> 
>> er_zauker_indexer!{self(),directory,"."},
>> er_zauker_app:wait_worker_done().
>> 
>> The relevant part seems correct (see below) but I think I have done a 
>> stupid mistake, but I cannot understand where is it.
>> Where can I find an example for this use case?
>> 
>> wait_worker_done()->
>> waitAllWorkerDone(1,erlang:monotonic_time(second)).
>> waitAllWorkerDone(RunningWorker,StartTimestamp) when RunningWorker >0 
>> ->
>> er_zauker_indexer!{self(),report},
>> receive
>> {worker,0} ->
>> io:format("All workers done~n~n");
>> {worker, RunningGuys, files_processed, TotalFilesDone} ->
>> if
>> RunningGuys  /= RunningWorker ->
>> SecondsRunning= erlang:monotonic_time(second)-StartTimestamp,
>> FilesSec=TotalFilesDone/SecondsRunning,
>> io:format("[~p]s Workers[~p]  Files processed:~p Files/sec: ~p 
>> ~n",[SecondsRunning,RunningGuys,TotalFilesDone,FilesSec]),
>> timer:sleep(200);
>> true ->
>> %% Okey so nothing changed so far...sleep a bit
>> timer:sleep(100)
>> end,
>> %% Master sleep value
>> timer:sleep(990),
>> waitAllWorkerDone(RunningGuys,StartTimestamp)
>> after 5000 ->
>> io:format("~n-----------------------------~n"),
>> io:format(" Mmmm no info in the last 5 sec... when was running:~p 
>> Workers~n",[RunningWorker]),
>> io:format(" ?System is stuck? "),
>> io:format("------------------------------~n"),
>> waitAllWorkerDone(RunningWorker,StartTimestamp)
>> end;
>> waitAllWorkerDone(0,_) ->
>> io:format("All worker Finished").
>> 
>> --
>> Giovanni Giorgi via webmail

-- 
Giovanni Giorgi via webmail
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20210413/0fb6c5cb/attachment.htm>


More information about the erlang-questions mailing list