<div dir="ltr">Hi Jörgen.<br><br>You are describing a complex behavior, and I'm probably not fully understanding it, but some of your ideas seem very similar to what we are building for NetComposer.<br>We don't yet have a full release, but we have already released many of its pieces:<br><br>- NkCLUSTER (<a href="https://github.com/Nekso/nkcluster">https://github.com/Nekso/nkcluster</a>) is a framework for creating clusters of Erlang nodes of any size, and distributing and managing jobs into them, with a pattern that seems similar to your proposal, different to other frameworks in many aspects, and specially sending "intelligence" to the workers.<br>- NkSERVICE (<a href="https://github.com/Nekso/nkservice">https://github.com/Nekso/nkservice</a>) is framework for managing distributed services based on plugins.<br>- NkDOMAIN (<a href="https://github.com/Nekso/nkdomain">https://github.com/Nekso/nkdomain</a>) is an Erlang framework to load an manage complex distributed, multi-tenant configurations in a cluster.<br>- NkSIP (<a href="https://github.com/kalta/nksip">https://github.com/kalta/nksip</a>) is a SIP application server that is currenly under heavy refactorization (it will be integrated as a plugin of NetComposer), but offers a way to process complex works similar to your idea.<br><br>I hope it helps.<br>Regards,<br>Carlos<br><br><br>On Fri, Oct 9, 2015 at 12:07 AM, Jörgen Brandt <<a href="mailto:brandjoe@hu-berlin.de">brandjoe@hu-berlin.de</a>> wrote:<br>><br>> Hello,<br>><br>> is there an Erlang library for transactional message<br>> passing, using patterns in communication and error handling<br>> to improve fault tolerance?<br>><br>> This (or a similar) question may have been asked before and,<br>> of course, there is plenty of research on fault tolerance<br>> and failure transparency. Nevertheless, in my work with<br>> scientific workflows it seems that certain patterns in error<br>> handling exist. In this mail I'm trying to motivate a way to<br>> externalize common error handling in a standardized service<br>> (a transaction server) but I'm unsure whether such a thing<br>> already exists, whether I'm missing an important feature,<br>> and whether it's a good idea anyway.<br>><br>> Large distributed systems are composed of many services.<br>> They process many tasks concurrently and need fault<br>> tolerance to yield correct results and maintain<br>> availability. Erlang seemed a good choice because it<br>> provides facilities to automatically improve availability,<br>> e.g., by using supervisers. In addition, it encourages a<br>> programming style that separates processing logic from<br>> error handling. In general, each service has its own<br>> requirements, implying that a general approach to error<br>> handling (beyond restarting) is infeasible. However, if an<br>> application exhibits recurring patterns in the way error<br>> handling corresponds to the messages passed between<br>> services, we can abstract these patterns to reuse them.<br>><br>><br>> Fault tolerance is important because it directly translates<br>> to scalability.<br>><br>> Consider an application (with transient software faults),<br>> processing user queries. The application reports errors back<br>> to the user as they appear. If a user query is a long-<br>> running job (hours, days), the number of subtasks created<br>> from this job (thousands), the number of services to process<br>> one subtask, and the number of machines involved are large,<br>> then the occurrence of an error is near to certain. Quietly<br>> restarting the application and rerunning the query may<br>> reduce the failure probability but even if the application<br>> succeeds, the number of retries and, thus, the time elapsed<br>> to success may be prohibitive. What is needed is a system<br>> that does not restart the whole application but only the<br>> service that failed reissuing only the unfinished requests<br>> that this service received before failing. Consequently, the<br>> finer the granularity at which errors are handled, the less<br>> work has to be redone when errors occur, allowing a system<br>> to host longer-running jobs, containing more subtasks,<br>> involving more services for each subtask, and running on<br>> more machines in feasible time.<br>><br>><br>> Scientific workflows are a good example for a large<br>> distributed application exhibiting patterns in communication<br>> and error handling.<br>><br>> A scientific workflow system consumes an input query in the<br>> form of an expression in the workflow language. On<br>> evaluation of this expression it identifies subtasks that<br>> can be executed in any order. E.g., a variant calling<br>> workflow from bioinformatics unfolds into several hundred<br>> to a thousand subtasks each of which is handed down in the<br>> form of requests through a number of services: Upon<br>> identification of the subtask in (i) the query interpreter,<br>> a request is sent to (ii) a cache service. This service<br>> keeps track of all previously run subtasks and returns the<br>> cached result if available. If not, a request is sent to<br>> (iii) a scheduling service. This service determines the<br>> machine, to run the subtask. The scheduler tries both, to<br>> adequately distribute the work load among workers (load<br>> balancing) and to minimize data transfers among nodes (data<br>> locality). Having decided where to run the subtask, a<br>> request is sent to (iv) the corresponding worker which<br>> executes the subtask and returns the result up the chain of<br>> services. Every subtask goes through this life cycle.<br>><br>> Apart from the interplay of the aforementioned services we<br>> want the workflow system to behave in a particular way when<br>> one of these services dies:<br>><br>> - Each workflow is evaluated inside its own interpreter<br>>   process. A workflow potentially runs for a long time and<br>>   at some point we might want to kill the interpreter<br>>   process. When this happens, the system has to identify all<br>>   open requests originating from that interpreter and cancel<br>>   them.<br>><br>> - When an important service (say the scheduler) dies, a<br>>   supervisor will restart it, this way securing the<br>>   availability of the application. Upon a fresh start, none<br>>   of the messages this service has received will be there<br>>   anymore. Instead of having to notify the client of this<br>>   important service (in this case the cache) to give it the<br>>   chance to repair the damage, we want all the messages,<br>>   that have been sent to the important service (scheduler)<br>>   and have not been quited, to be resent to the freshly<br>>   started service (scheduler).<br>><br>> - When a worker dies, from a hardware fault, we cannot<br>>   expect a supervisor to restart it (on the same machine).<br>>   In this case we want to notify the scheduler not to expect<br>>   a reply to his request anymore. Also we want to reissue<br>>   the original request to the scheduler to give it the<br>>   chance to choose a different machine to run the subtask<br>>   on.<br>><br>> - When a request is canceled at a high level (say at the<br>>   cache level because the interpreter died) All subsequent<br>>   requests (to the scheduler and in the worker)<br>>   corresponding to the original request should have been<br>>   canceled before the high level service (cache) is<br>>   notified, thereby relieving him of the duty to cancel them<br>>   himself.<br>><br>><br>> Since there is no shared memory in Erlang, the state of a<br>> process is defined only by the messages received (and its<br>> init parameters which are assumed constant). To reestablish<br>> the state of a process after failure we propose three<br>> different ways to send messages to a process and their<br>> corresponding informal error handling semantics:<br>><br>> tsend( Dest, Req, replay ) -> TransId<br>> when Dest    :: atom(),<br>>      Req     :: term(),<br>>      TransId :: reference().<br>><br>> Upon calling tsend/3, a transaction server creates a record<br>> of the request to be sent and relays it to the destination<br>> (must be a registered process). At the same time it creates<br>> a monitor on both the request's source and destination. When<br>> the source dies, it will send an abort message to the<br>> destination. When the destination dies, initially, nothing<br>> happens. When the supervisor restarts the destination, the<br>> transaction server replays all unfinished requests to the<br>> destination.<br>><br>> tsend( Dest, Req, replay, Precond ) -> TransId<br>> when Dest      :: atom(),<br>>      Req       :: term(),<br>>      Precond   :: reference(),<br>>      TransId   :: reference().<br>><br>> The error handling for tsend/4 with replay works just the<br>> same as tsend/3. Additionally, when the request with the id<br>> Precond is canceled, this request is also canceled.<br>><br>> tsend( Dest, Req, reschedule, Precond ) -> TransId<br>> when Dest    :: atom() | pid(),<br>>      Req     :: term(),<br>>      Precond :: reference(),<br>>      TransId :: reference().<br>><br>> Upon calling tsend/4, with reschedule, as before, a<br>> transaction server creates a record of the request and<br>> monitors both source and destination. When the destination<br>> dies, instead of waiting for a supervisor to restart it, the<br>> original request identified with Precond is first canceled<br>> at the source and then replayed to the source. Since we do<br>> not rely on the destination to be a permanent process, we<br>> can also identify it per Pid while we had to require a<br>> registered service under replay error handling.<br>><br>> commit( TransId, Reply ) -> ok<br>> when TransId :: reference(),<br>>      Reply   :: term().<br>><br>> When a service is done working on a request, it sends a<br>> commit which relays the reply to the transaction source and<br>> removes the record for this request from the transaction<br>> server.<br>><br>> A service participating in transaction handling has to<br>> provide the following two callbacks:<br>><br>> handle_recv( TransId::reference(), Req::_, State::_ ) -><br>>   {noreply, NewState::_}.<br>><br>> handle_abort( TransId::reference(), State::_ ) -><br>>   {noreply, NewState::_}.<br>><br>> While the so-defined transaction protocol is capable of<br>> satisfying the requirements introduced for the workflow<br>> system example the question is, is it general enough to be<br>> applicable also in other applications?<br>><br>><br>> This conduct has its limitations.<br>><br>> The introduced transaction protocol may be suited to deal<br>> with transient software faults (Heisenbugs) but its<br>> effectiveness to mitigate hardware faults or deterministic<br>> software faults (Bohrbugs) is limited. In addition, with the<br>> introduction of the transaction server we created a single<br>> point of failure.<br>><br>><br>> Concludingly, the restarting of a service by a supervisor is<br>> sufficient to secure the availability of a service in the<br>> presence of software faults but large scale distributed<br>> systems require a more fine-grained approach to error<br>> handling. To identify patterns in message passing and error<br>> handling gives us the opportunity to reduce error handling<br>> code and, thereby, avoid the introduction of bugs into error<br>> handling. The proposed transaction protocol may be suitable<br>> to achieve this goal.<br>><br>><br>> I had hoped to get some feedback on the concept, in order to<br>> have an idea whether I am on the right track. If a similar<br>> library is already around and I just couldn't find it, if I<br>> am missing an obvious feature, a pattern that is important<br>> but just doesn't appear in the context of scientific<br>> workflows, it would be helpful to know about it. Thanks in<br>> advance.<br>><br>> Cheers<br>> Jörgen<br>><br>> _______________________________________________<br>> erlang-questions mailing list<br>> <a href="mailto:erlang-questions@erlang.org">erlang-questions@erlang.org</a><br>> <a href="http://erlang.org/mailman/listinfo/erlang-questions">http://erlang.org/mailman/listinfo/erlang-questions</a></div>