[erlang-questions] Considering a Generic Transaction System in Erlang

Carlos González Florido carlosj.gf@REDACTED
Mon Oct 19 14:46:09 CEST 2015


Hi Jörgen.

You are describing a complex behavior, and I'm probably not fully
understanding it, but some of your ideas seem very similar to what we are
building for NetComposer.
We don't yet have a full release, but we have already released many of its
pieces:

- NkCLUSTER (https://github.com/Nekso/nkcluster) is a framework for
creating clusters of Erlang nodes of any size, and distributing and
managing jobs into them, with a pattern that seems similar to your
proposal, different to other frameworks in many aspects, and specially
sending "intelligence" to the workers.
- NkSERVICE (https://github.com/Nekso/nkservice) is framework for managing
distributed services based on plugins.
- NkDOMAIN (https://github.com/Nekso/nkdomain) is an Erlang framework to
load an manage complex distributed, multi-tenant configurations in a
cluster.
- NkSIP (https://github.com/kalta/nksip) is a SIP application server that
is currenly under heavy refactorization (it will be integrated as a plugin
of NetComposer), but offers a way to process complex works similar to your
idea.

I hope it helps.
Regards,
Carlos


On Fri, Oct 9, 2015 at 12:07 AM, Jörgen Brandt <brandjoe@REDACTED>
wrote:
>
> Hello,
>
> is there an Erlang library for transactional message
> passing, using patterns in communication and error handling
> to improve fault tolerance?
>
> This (or a similar) question may have been asked before and,
> of course, there is plenty of research on fault tolerance
> and failure transparency. Nevertheless, in my work with
> scientific workflows it seems that certain patterns in error
> handling exist. In this mail I'm trying to motivate a way to
> externalize common error handling in a standardized service
> (a transaction server) but I'm unsure whether such a thing
> already exists, whether I'm missing an important feature,
> and whether it's a good idea anyway.
>
> Large distributed systems are composed of many services.
> They process many tasks concurrently and need fault
> tolerance to yield correct results and maintain
> availability. Erlang seemed a good choice because it
> provides facilities to automatically improve availability,
> e.g., by using supervisers. In addition, it encourages a
> programming style that separates processing logic from
> error handling. In general, each service has its own
> requirements, implying that a general approach to error
> handling (beyond restarting) is infeasible. However, if an
> application exhibits recurring patterns in the way error
> handling corresponds to the messages passed between
> services, we can abstract these patterns to reuse them.
>
>
> Fault tolerance is important because it directly translates
> to scalability.
>
> Consider an application (with transient software faults),
> processing user queries. The application reports errors back
> to the user as they appear. If a user query is a long-
> running job (hours, days), the number of subtasks created
> from this job (thousands), the number of services to process
> one subtask, and the number of machines involved are large,
> then the occurrence of an error is near to certain. Quietly
> restarting the application and rerunning the query may
> reduce the failure probability but even if the application
> succeeds, the number of retries and, thus, the time elapsed
> to success may be prohibitive. What is needed is a system
> that does not restart the whole application but only the
> service that failed reissuing only the unfinished requests
> that this service received before failing. Consequently, the
> finer the granularity at which errors are handled, the less
> work has to be redone when errors occur, allowing a system
> to host longer-running jobs, containing more subtasks,
> involving more services for each subtask, and running on
> more machines in feasible time.
>
>
> Scientific workflows are a good example for a large
> distributed application exhibiting patterns in communication
> and error handling.
>
> A scientific workflow system consumes an input query in the
> form of an expression in the workflow language. On
> evaluation of this expression it identifies subtasks that
> can be executed in any order. E.g., a variant calling
> workflow from bioinformatics unfolds into several hundred
> to a thousand subtasks each of which is handed down in the
> form of requests through a number of services: Upon
> identification of the subtask in (i) the query interpreter,
> a request is sent to (ii) a cache service. This service
> keeps track of all previously run subtasks and returns the
> cached result if available. If not, a request is sent to
> (iii) a scheduling service. This service determines the
> machine, to run the subtask. The scheduler tries both, to
> adequately distribute the work load among workers (load
> balancing) and to minimize data transfers among nodes (data
> locality). Having decided where to run the subtask, a
> request is sent to (iv) the corresponding worker which
> executes the subtask and returns the result up the chain of
> services. Every subtask goes through this life cycle.
>
> Apart from the interplay of the aforementioned services we
> want the workflow system to behave in a particular way when
> one of these services dies:
>
> - Each workflow is evaluated inside its own interpreter
>   process. A workflow potentially runs for a long time and
>   at some point we might want to kill the interpreter
>   process. When this happens, the system has to identify all
>   open requests originating from that interpreter and cancel
>   them.
>
> - When an important service (say the scheduler) dies, a
>   supervisor will restart it, this way securing the
>   availability of the application. Upon a fresh start, none
>   of the messages this service has received will be there
>   anymore. Instead of having to notify the client of this
>   important service (in this case the cache) to give it the
>   chance to repair the damage, we want all the messages,
>   that have been sent to the important service (scheduler)
>   and have not been quited, to be resent to the freshly
>   started service (scheduler).
>
> - When a worker dies, from a hardware fault, we cannot
>   expect a supervisor to restart it (on the same machine).
>   In this case we want to notify the scheduler not to expect
>   a reply to his request anymore. Also we want to reissue
>   the original request to the scheduler to give it the
>   chance to choose a different machine to run the subtask
>   on.
>
> - When a request is canceled at a high level (say at the
>   cache level because the interpreter died) All subsequent
>   requests (to the scheduler and in the worker)
>   corresponding to the original request should have been
>   canceled before the high level service (cache) is
>   notified, thereby relieving him of the duty to cancel them
>   himself.
>
>
> Since there is no shared memory in Erlang, the state of a
> process is defined only by the messages received (and its
> init parameters which are assumed constant). To reestablish
> the state of a process after failure we propose three
> different ways to send messages to a process and their
> corresponding informal error handling semantics:
>
> tsend( Dest, Req, replay ) -> TransId
> when Dest    :: atom(),
>      Req     :: term(),
>      TransId :: reference().
>
> Upon calling tsend/3, a transaction server creates a record
> of the request to be sent and relays it to the destination
> (must be a registered process). At the same time it creates
> a monitor on both the request's source and destination. When
> the source dies, it will send an abort message to the
> destination. When the destination dies, initially, nothing
> happens. When the supervisor restarts the destination, the
> transaction server replays all unfinished requests to the
> destination.
>
> tsend( Dest, Req, replay, Precond ) -> TransId
> when Dest      :: atom(),
>      Req       :: term(),
>      Precond   :: reference(),
>      TransId   :: reference().
>
> The error handling for tsend/4 with replay works just the
> same as tsend/3. Additionally, when the request with the id
> Precond is canceled, this request is also canceled.
>
> tsend( Dest, Req, reschedule, Precond ) -> TransId
> when Dest    :: atom() | pid(),
>      Req     :: term(),
>      Precond :: reference(),
>      TransId :: reference().
>
> Upon calling tsend/4, with reschedule, as before, a
> transaction server creates a record of the request and
> monitors both source and destination. When the destination
> dies, instead of waiting for a supervisor to restart it, the
> original request identified with Precond is first canceled
> at the source and then replayed to the source. Since we do
> not rely on the destination to be a permanent process, we
> can also identify it per Pid while we had to require a
> registered service under replay error handling.
>
> commit( TransId, Reply ) -> ok
> when TransId :: reference(),
>      Reply   :: term().
>
> When a service is done working on a request, it sends a
> commit which relays the reply to the transaction source and
> removes the record for this request from the transaction
> server.
>
> A service participating in transaction handling has to
> provide the following two callbacks:
>
> handle_recv( TransId::reference(), Req::_, State::_ ) ->
>   {noreply, NewState::_}.
>
> handle_abort( TransId::reference(), State::_ ) ->
>   {noreply, NewState::_}.
>
> While the so-defined transaction protocol is capable of
> satisfying the requirements introduced for the workflow
> system example the question is, is it general enough to be
> applicable also in other applications?
>
>
> This conduct has its limitations.
>
> The introduced transaction protocol may be suited to deal
> with transient software faults (Heisenbugs) but its
> effectiveness to mitigate hardware faults or deterministic
> software faults (Bohrbugs) is limited. In addition, with the
> introduction of the transaction server we created a single
> point of failure.
>
>
> Concludingly, the restarting of a service by a supervisor is
> sufficient to secure the availability of a service in the
> presence of software faults but large scale distributed
> systems require a more fine-grained approach to error
> handling. To identify patterns in message passing and error
> handling gives us the opportunity to reduce error handling
> code and, thereby, avoid the introduction of bugs into error
> handling. The proposed transaction protocol may be suitable
> to achieve this goal.
>
>
> I had hoped to get some feedback on the concept, in order to
> have an idea whether I am on the right track. If a similar
> library is already around and I just couldn't find it, if I
> am missing an obvious feature, a pattern that is important
> but just doesn't appear in the context of scientific
> workflows, it would be helpful to know about it. Thanks in
> advance.
>
> Cheers
> Jörgen
>
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@REDACTED
> http://erlang.org/mailman/listinfo/erlang-questions
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20151019/1e7acd3b/attachment.htm>


More information about the erlang-questions mailing list