[erlang-questions] Considering a Generic Transaction System in Erlang

Michael Truog mjtruog@REDACTED
Tue Oct 20 00:09:55 CEST 2015

On 10/08/2015 03:07 PM, Jörgen Brandt wrote:
> Hello,
> is there an Erlang library for transactional message
> passing, using patterns in communication and error handling
> to improve fault tolerance?
Yes, there is http://cloudi.org .  If you need only Erlang support, there is a repository at https://github.com/CloudI/cloudI_core/, but the main repository (https://github.com/CloudI/CloudI) provides support for all the supported programming languages:  C++/C, Erlang, Java, JavaScript, Perl, PHP, Python, and Ruby.  Comments below:
> This (or a similar) question may have been asked before and,
> of course, there is plenty of research on fault tolerance
> and failure transparency. Nevertheless, in my work with
> scientific workflows it seems that certain patterns in error
> handling exist. In this mail I'm trying to motivate a way to
> externalize common error handling in a standardized service
> (a transaction server) but I'm unsure whether such a thing
> already exists, whether I'm missing an important feature,
> and whether it's a good idea anyway.
> Large distributed systems are composed of many services.
> They process many tasks concurrently and need fault
> tolerance to yield correct results and maintain
> availability. Erlang seemed a good choice because it
> provides facilities to automatically improve availability,
> e.g., by using supervisers. In addition, it encourages a
> programming style that separates processing logic from
> error handling. In general, each service has its own
> requirements, implying that a general approach to error
> handling (beyond restarting) is infeasible. However, if an
> application exhibits recurring patterns in the way error
> handling corresponds to the messages passed between
> services, we can abstract these patterns to reuse them.
> Fault tolerance is important because it directly translates
> to scalability.
Not really.  The approach you described in your email below uses registered processes, which is an easy way to limit scalability while still having fault-tolerance.
> Consider an application (with transient software faults),
> processing user queries. The application reports errors back
> to the user as they appear. If a user query is a long-
> running job (hours, days), the number of subtasks created
> from this job (thousands), the number of services to process
> one subtask, and the number of machines involved are large,
> then the occurrence of an error is near to certain. Quietly
> restarting the application and rerunning the query may
> reduce the failure probability but even if the application
> succeeds, the number of retries and, thus, the time elapsed
> to success may be prohibitive. What is needed is a system
> that does not restart the whole application but only the
> service that failed reissuing only the unfinished requests
> that this service received before failing. Consequently, the
> finer the granularity at which errors are handled, the less
> work has to be redone when errors occur, allowing a system
> to host longer-running jobs, containing more subtasks,
> involving more services for each subtask, and running on
> more machines in feasible time.
> Scientific workflows are a good example for a large
> distributed application exhibiting patterns in communication
> and error handling.
> A scientific workflow system consumes an input query in the
> form of an expression in the workflow language. On
> evaluation of this expression it identifies subtasks that
> can be executed in any order. E.g., a variant calling
> workflow from bioinformatics unfolds into several hundred
> to a thousand subtasks each of which is handed down in the
> form of requests through a number of services: Upon
> identification of the subtask in (i) the query interpreter,
> a request is sent to (ii) a cache service. This service
> keeps track of all previously run subtasks and returns the
> cached result if available. If not, a request is sent to
> (iii) a scheduling service. This service determines the
> machine, to run the subtask. The scheduler tries both, to
> adequately distribute the work load among workers (load
> balancing) and to minimize data transfers among nodes (data
> locality). Having decided where to run the subtask, a
> request is sent to (iv) the corresponding worker which
> executes the subtask and returns the result up the chain of
> services. Every subtask goes through this life cycle.
> Apart from the interplay of the aforementioned services we
> want the workflow system to behave in a particular way when
> one of these services dies:
> - Each workflow is evaluated inside its own interpreter
>    process. A workflow potentially runs for a long time and
>    at some point we might want to kill the interpreter
>    process. When this happens, the system has to identify all
>    open requests originating from that interpreter and cancel
>    them.
> - When an important service (say the scheduler) dies, a
>    supervisor will restart it, this way securing the
>    availability of the application. Upon a fresh start, none
>    of the messages this service has received will be there
>    anymore. Instead of having to notify the client of this
>    important service (in this case the cache) to give it the
>    chance to repair the damage, we want all the messages,
>    that have been sent to the important service (scheduler)
>    and have not been quited, to be resent to the freshly
>    started service (scheduler).
> - When a worker dies, from a hardware fault, we cannot
>    expect a supervisor to restart it (on the same machine).
>    In this case we want to notify the scheduler not to expect
>    a reply to his request anymore. Also we want to reissue
>    the original request to the scheduler to give it the
>    chance to choose a different machine to run the subtask
>    on.
> - When a request is canceled at a high level (say at the
>    cache level because the interpreter died) All subsequent
>    requests (to the scheduler and in the worker)
>    corresponding to the original request should have been
>    canceled before the high level service (cache) is
>    notified, thereby relieving him of the duty to cancel them
>    himself.
In CloudI you can create an Erlang service by implementing the cloudi_service behaviour which sends and receives service requests. For integration with other Erlang source code, you can receive normal Erlang message sends and you can send to CloudI services from normal Erlang processes with the cloudi module's context data.  A CloudI service request is a task or subtask as you have described and there is no need to distinguish between workers and services (you can just configure a service to use any number of processes). A CloudI uses service names to provide a name to send service requests to and any number of service processes (or threads) may subscribe to the same name.  This approach is necessary for high-availability while providing fault-tolerance.  So that means all the service source code is executed concurrently without attempting to maintain consistent state outside a single thread of execution (it is an AP-type system when considering the CAP theorem).  The service 
requests are randomly assigned to a thread of execution that has subscribed to the matching service name for service fault-tolerance guarantees.
> Since there is no shared memory in Erlang, the state of a
> process is defined only by the messages received (and its
> init parameters which are assumed constant). To reestablish
> the state of a process after failure we propose three
> different ways to send messages to a process and their
> corresponding informal error handling semantics:
> tsend( Dest, Req, replay ) -> TransId
> when Dest    :: atom(),
>       Req     :: term(),
>       TransId :: reference().
> Upon calling tsend/3, a transaction server creates a record
> of the request to be sent and relays it to the destination
> (must be a registered process). At the same time it creates
> a monitor on both the request's source and destination. When
> the source dies, it will send an abort message to the
> destination. When the destination dies, initially, nothing
> happens. When the supervisor restarts the destination, the
> transaction server replays all unfinished requests to the
> destination.
> tsend( Dest, Req, replay, Precond ) -> TransId
> when Dest      :: atom(),
>       Req       :: term(),
>       Precond   :: reference(),
>       TransId   :: reference().
> The error handling for tsend/4 with replay works just the
> same as tsend/3. Additionally, when the request with the id
> Precond is canceled, this request is also canceled.
> tsend( Dest, Req, reschedule, Precond ) -> TransId
> when Dest    :: atom() | pid(),
>       Req     :: term(),
>       Precond :: reference(),
>       TransId :: reference().
> Upon calling tsend/4, with reschedule, as before, a
> transaction server creates a record of the request and
> monitors both source and destination. When the destination
> dies, instead of waiting for a supervisor to restart it, the
> original request identified with Precond is first canceled
> at the source and then replayed to the source. Since we do
> not rely on the destination to be a permanent process, we
> can also identify it per Pid while we had to require a
> registered service under replay error handling.
> commit( TransId, Reply ) -> ok
> when TransId :: reference(),
>       Reply   :: term().
> When a service is done working on a request, it sends a
> commit which relays the reply to the transaction source and
> removes the record for this request from the transaction
> server.
In CloudI the situation is simpler.  The service request is sent through as many services as is necessary and data is committed upon receiving the reply to the service request.  The service request handling callback function provides the TransId which provides uniqueness for the transaction and a loose time-based ordering.  If a transaction (CloudI service request) receives a response, then it was handled successfully by all services involved within the timeout value provided for the service request (the timeout is important and missing from your description).  It is possible to retry a transaction if nothing occurred due to a failure at some service in the service request path during the timeout time period (this would be considered replaying a transaction, cloudi_service_queue is a service which handles persisting transactions to disk and provides retries as a service but there is also a cloudi_queue data structure which can provide retries in Erlang services).
> A service participating in transaction handling has to
> provide the following two callbacks:
> handle_recv( TransId::reference(), Req::_, State::_ ) ->
>    {noreply, NewState::_}.
> handle_abort( TransId::reference(), State::_ ) ->
>    {noreply, NewState::_}.
In CloudI, an "abort" described above is receiving a null response (a binary of <<>> for ResponseInfo and Response, where ResponseInfo is meta-data related the Response data (e.g., HTTP headers)).  The response to a CloudI service request is a result of a function call (either send_sync or recv_async from the CloudI API).  Erlang services can utilize send_async_active to receive responses as Erlang messages which can provide more efficiency for services that handle high-throughput.
> While the so-defined transaction protocol is capable of
> satisfying the requirements introduced for the workflow
> system example the question is, is it general enough to be
> applicable also in other applications?
> This conduct has its limitations.
> The introduced transaction protocol may be suited to deal
> with transient software faults (Heisenbugs) but its
> effectiveness to mitigate hardware faults or deterministic
> software faults (Bohrbugs) is limited. In addition, with the
> introduction of the transaction server we created a single
> point of failure.
CloudI shares the service name information between connected Erlang nodes (using distributed Erlang), so service requests can use remote nodes when a local service isn't found.  This is based on the service's configured destination refresh method (it determines how a service selects destinations for service requests).
> Concludingly, the restarting of a service by a supervisor is
> sufficient to secure the availability of a service in the
> presence of software faults but large scale distributed
> systems require a more fine-grained approach to error
> handling. To identify patterns in message passing and error
> handling gives us the opportunity to reduce error handling
> code and, thereby, avoid the introduction of bugs into error
> handling. The proposed transaction protocol may be suitable
> to achieve this goal.
> I had hoped to get some feedback on the concept, in order to
> have an idea whether I am on the right track. If a similar
> library is already around and I just couldn't find it, if I
> am missing an obvious feature, a pattern that is important
> but just doesn't appear in the context of scientific
> workflows, it would be helpful to know about it. Thanks in
> advance.
Feel free to ask questions if something wasn't answered above or you needed more detail on something.

Best Regards,

> Cheers
> Jörgen
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@REDACTED
> http://erlang.org/mailman/listinfo/erlang-questions

More information about the erlang-questions mailing list