[erlang-questions] Considering a Generic Transaction System in Erlang

Jörgen Brandt <>
Fri Oct 23 15:02:10 CEST 2015


Hey Torben,

thanks for your reply.

On 19.10.2015 10:51, Torben Hoffmann wrote:
> Hi Jörgen,
> 
> With the risk of showing my inability to understand your problem I would challenge
> the need for the transaction server altogether.
> 
> As you say, the messages that the processing server has yet to process will be lost
> if the server dies, so re-sending is required.
> 
> I would simply deal with this in the client.
> When you send a request you monitor the server, if it dies, you re-send when the
> service is up again.
> The server monitors clients, if the client dies, the server stops the pending and
> ongoing jobs for that client.

You are perfectly right. It would be advantageous to have a
decentralized model for this. The reason I proposed a centralized
architecture (with a dedicated transaction server) is the following:

You send a request to a server. The server dies. Because of a monitor
you find out you have to resend the message when the server is up again.
So far so good.

How do you find out, the server is up again? There will be a millisecond
or something the supervisor needs to restart the server and you need to
make sure, not to repeat the message into the void.

How did you address this issue in your GOL implementation?

Cheers
Jörgen

> 
> I have used this approach is my Game of Life implementation - it seems to work.
> 
> There might be room for a little library for some of the book keeping involved in
> this, but given that there can be so many variations on this very simple pattern I
> fear that it will be hard to create a generic library for this.
> 
> Cheers,
> Torben
> 
> Jörgen Brandt writes:
> 
>> Hello,
>>
>> is there an Erlang library for transactional message
>> passing, using patterns in communication and error handling
>> to improve fault tolerance?
>>
>> This (or a similar) question may have been asked before and,
>> of course, there is plenty of research on fault tolerance
>> and failure transparency. Nevertheless, in my work with
>> scientific workflows it seems that certain patterns in error
>> handling exist. In this mail I'm trying to motivate a way to
>> externalize common error handling in a standardized service
>> (a transaction server) but I'm unsure whether such a thing
>> already exists, whether I'm missing an important feature,
>> and whether it's a good idea anyway.
>>
>> Large distributed systems are composed of many services.
>> They process many tasks concurrently and need fault
>> tolerance to yield correct results and maintain
>> availability. Erlang seemed a good choice because it
>> provides facilities to automatically improve availability,
>> e.g., by using supervisers. In addition, it encourages a
>> programming style that separates processing logic from
>> error handling. In general, each service has its own
>> requirements, implying that a general approach to error
>> handling (beyond restarting) is infeasible. However, if an
>> application exhibits recurring patterns in the way error
>> handling corresponds to the messages passed between
>> services, we can abstract these patterns to reuse them.
>>
>>
>> Fault tolerance is important because it directly translates
>> to scalability.
>>
>> Consider an application (with transient software faults),
>> processing user queries. The application reports errors back
>> to the user as they appear. If a user query is a long-
>> running job (hours, days), the number of subtasks created
>> from this job (thousands), the number of services to process
>> one subtask, and the number of machines involved are large,
>> then the occurrence of an error is near to certain. Quietly
>> restarting the application and rerunning the query may
>> reduce the failure probability but even if the application
>> succeeds, the number of retries and, thus, the time elapsed
>> to success may be prohibitive. What is needed is a system
>> that does not restart the whole application but only the
>> service that failed reissuing only the unfinished requests
>> that this service received before failing. Consequently, the
>> finer the granularity at which errors are handled, the less
>> work has to be redone when errors occur, allowing a system
>> to host longer-running jobs, containing more subtasks,
>> involving more services for each subtask, and running on
>> more machines in feasible time.
>>
>>
>> Scientific workflows are a good example for a large
>> distributed application exhibiting patterns in communication
>> and error handling.
>>
>> A scientific workflow system consumes an input query in the
>> form of an expression in the workflow language. On
>> evaluation of this expression it identifies subtasks that
>> can be executed in any order. E.g., a variant calling
>> workflow from bioinformatics unfolds into several hundred
>> to a thousand subtasks each of which is handed down in the
>> form of requests through a number of services: Upon
>> identification of the subtask in (i) the query interpreter,
>> a request is sent to (ii) a cache service. This service
>> keeps track of all previously run subtasks and returns the
>> cached result if available. If not, a request is sent to
>> (iii) a scheduling service. This service determines the
>> machine, to run the subtask. The scheduler tries both, to
>> adequately distribute the work load among workers (load
>> balancing) and to minimize data transfers among nodes (data
>> locality). Having decided where to run the subtask, a
>> request is sent to (iv) the corresponding worker which
>> executes the subtask and returns the result up the chain of
>> services. Every subtask goes through this life cycle.
>>
>> Apart from the interplay of the aforementioned services we
>> want the workflow system to behave in a particular way when
>> one of these services dies:
>>
>> - Each workflow is evaluated inside its own interpreter
>>   process. A workflow potentially runs for a long time and
>>   at some point we might want to kill the interpreter
>>   process. When this happens, the system has to identify all
>>   open requests originating from that interpreter and cancel
>>   them.
>>
>> - When an important service (say the scheduler) dies, a
>>   supervisor will restart it, this way securing the
>>   availability of the application. Upon a fresh start, none
>>   of the messages this service has received will be there
>>   anymore. Instead of having to notify the client of this
>>   important service (in this case the cache) to give it the
>>   chance to repair the damage, we want all the messages,
>>   that have been sent to the important service (scheduler)
>>   and have not been quited, to be resent to the freshly
>>   started service (scheduler).
>>
>> - When a worker dies, from a hardware fault, we cannot
>>   expect a supervisor to restart it (on the same machine).
>>   In this case we want to notify the scheduler not to expect
>>   a reply to his request anymore. Also we want to reissue
>>   the original request to the scheduler to give it the
>>   chance to choose a different machine to run the subtask
>>   on.
>>
>> - When a request is canceled at a high level (say at the
>>   cache level because the interpreter died) All subsequent
>>   requests (to the scheduler and in the worker)
>>   corresponding to the original request should have been
>>   canceled before the high level service (cache) is
>>   notified, thereby relieving him of the duty to cancel them
>>   himself.
>>
>>
>> Since there is no shared memory in Erlang, the state of a
>> process is defined only by the messages received (and its
>> init parameters which are assumed constant). To reestablish
>> the state of a process after failure we propose three
>> different ways to send messages to a process and their
>> corresponding informal error handling semantics:
>>
>> tsend( Dest, Req, replay ) -> TransId
>> when Dest    :: atom(),
>>      Req     :: term(),
>>      TransId :: reference().
>>
>> Upon calling tsend/3, a transaction server creates a record
>> of the request to be sent and relays it to the destination
>> (must be a registered process). At the same time it creates
>> a monitor on both the request's source and destination. When
>> the source dies, it will send an abort message to the
>> destination. When the destination dies, initially, nothing
>> happens. When the supervisor restarts the destination, the
>> transaction server replays all unfinished requests to the
>> destination.
>>
>> tsend( Dest, Req, replay, Precond ) -> TransId
>> when Dest      :: atom(),
>>      Req       :: term(),
>>      Precond   :: reference(),
>>      TransId   :: reference().
>>
>> The error handling for tsend/4 with replay works just the
>> same as tsend/3. Additionally, when the request with the id
>> Precond is canceled, this request is also canceled.
>>
>> tsend( Dest, Req, reschedule, Precond ) -> TransId
>> when Dest    :: atom() | pid(),
>>      Req     :: term(),
>>      Precond :: reference(),
>>      TransId :: reference().
>>
>> Upon calling tsend/4, with reschedule, as before, a
>> transaction server creates a record of the request and
>> monitors both source and destination. When the destination
>> dies, instead of waiting for a supervisor to restart it, the
>> original request identified with Precond is first canceled
>> at the source and then replayed to the source. Since we do
>> not rely on the destination to be a permanent process, we
>> can also identify it per Pid while we had to require a
>> registered service under replay error handling.
>>
>> commit( TransId, Reply ) -> ok
>> when TransId :: reference(),
>>      Reply   :: term().
>>
>> When a service is done working on a request, it sends a
>> commit which relays the reply to the transaction source and
>> removes the record for this request from the transaction
>> server.
>>
>> A service participating in transaction handling has to
>> provide the following two callbacks:
>>
>> handle_recv( TransId::reference(), Req::_, State::_ ) ->
>>   {noreply, NewState::_}.
>>
>> handle_abort( TransId::reference(), State::_ ) ->
>>   {noreply, NewState::_}.
>>
>> While the so-defined transaction protocol is capable of
>> satisfying the requirements introduced for the workflow
>> system example the question is, is it general enough to be
>> applicable also in other applications?
>>
>>
>> This conduct has its limitations.
>>
>> The introduced transaction protocol may be suited to deal
>> with transient software faults (Heisenbugs) but its
>> effectiveness to mitigate hardware faults or deterministic
>> software faults (Bohrbugs) is limited. In addition, with the
>> introduction of the transaction server we created a single
>> point of failure.
>>
>>
>> Concludingly, the restarting of a service by a supervisor is
>> sufficient to secure the availability of a service in the
>> presence of software faults but large scale distributed
>> systems require a more fine-grained approach to error
>> handling. To identify patterns in message passing and error
>> handling gives us the opportunity to reduce error handling
>> code and, thereby, avoid the introduction of bugs into error
>> handling. The proposed transaction protocol may be suitable
>> to achieve this goal.
>>
>>
>> I had hoped to get some feedback on the concept, in order to
>> have an idea whether I am on the right track. If a similar
>> library is already around and I just couldn't find it, if I
>> am missing an obvious feature, a pattern that is important
>> but just doesn't appear in the context of scientific
>> workflows, it would be helpful to know about it. Thanks in
>> advance.
>>
>> Cheers
>> Jörgen
>>
>> _______________________________________________
>> erlang-questions mailing list
>> 
>> http://erlang.org/mailman/listinfo/erlang-questions
> 
> --
> Torben Hoffmann
> Architect, basho.com
> M: +45 25 14 05 38
> 


More information about the erlang-questions mailing list