[erlang-questions] Erlang - supervised gen_server to drain off a queue

Garrett Smith g@REDACTED
Sat Dec 29 17:44:29 CET 2012


Hello!

On Sat, Dec 29, 2012 at 1:35 AM, hyperboreean
<hyperboreean@REDACTED> wrote:
> Hi guys,
>
> I'm just trying to bring Erlang into my company - it's a hard road, but
> we might eventually use it for at least some of our projects. I want to
> get started by changing a small piece of code that reads an AMQP queue
> and appends the contents of each message to a file. To be more specific,
> there are multiple producers for that queue (which produce error
> messages, if any) and there's this consumer which gets the error
> messages from the queue and updates a file with them. Now, this piece of
> software has always been problematic for us, for one reason or another -
> it's not reliable and when it fails it might bring the AMQP broker to
> its knees because of the number of messages that are not consumed.

While it doesn't help you get Erlang into your org, the "next obvious
step" might be to place your current queue reader under OS process
supervision like runit, systemd, launchd, etc. For this to work,
you'll need to make sure that the program crashes (process death)
whenever it gets into a flaky state. Depending on the program, that
may be practically impossible.

You could alternatively write a small program that monitors the AMQP
queue size and restarts the reader process when the queue reaches a
certain size -- assuming there's a correlation there.

But at that point you're writing new code, so Erlang is back in play.

> I'd like to change this code with some Erlang code - maybe with a
> supervision tree so that if/when the server fails, it gets automatically
> restarted.

You should always run Erlang apps under a supervisory tree. If indeed
you're using OTP apps, this is automatic.

However, the benefit of Erlang process supervision may be different
from what you're thinking. Supervision is the secret that lets you
skip error handling in the default case. That leads to hugely
simplified code that has a much better chance of working correctly.

You can easily write software that "keeps running" by using defensive
measures. E.g. a top level try/catch block will keep your program
running forever. But God forbid you ever hit that block -- your
program will be in some unknown state without any chance of recovery!

Unless you're absolutely certain about how an error should be handled
(e.g. unexpected input from a user -> return an error message), the
best approach is to let a process crash catastrophically (process
death) and let someone else restart it to a known state.

Erlang is like getting paid to eat a free lunch -- you skip the error
handling *and* have more reliable software!

> I was thinking of trying a gen_server, but then I figured out
> that there's no client of the server, the code just has to constantly
> pull data out of that queue.

You want a gen_server that starts itself. There's a trick... in the
gen_server init/1 callback, return a 0 for the timeout value:

    -behavior(gen_server).
    ...

    init(Args) ->
        {ok, init_state(Args), 0}.

This will send a timeout message to the process, which will be
delivered before any other messages:

    handle_info(timeout, State) ->
        pull_messages(State),
        {noreply, State, 0}.

This turns a "server" into a self starting process.

I personally dislike these semantics, so I use e2 [1] "tasks". In e2,
the equivalent looks like this:

    -behavior(e2_task).
    ...

    init(Args) ->
        {ok, init_state(Args)}.

    handle_task(State) ->
        pull_messages(State),
        {repeat, State}.

It uses the same underlying gen_server behavior, so you're
accomplishing *exactly* the same thing (standard OTP) without the odd
timeout semantics. Even if you don't mind the timeout trick, consider
others in your organization who might want to understand the code at
some point!

If you get your Erlang based queue reader going, you'll have a
fantastic platform for adding more features... it's just a matter
plugging new tasks, services into your supervisor tree.

E.g. you might want a separate monitoring task that routinely checks
the AMQP queue size and takes some action if it gets too high (send an
email, pull messages from the queue and discard, delete the queue,
etc.) That's just another "task" that repeats:

    init(Args) ->
        {ok, init_state(Args), {?INITIAL_DELAY, ?REPEAT_INTERVAL}}

    handle_task(State) ->
        check_queue_size(State),
        {repeat, State}.

This task will repeat at REPEAT_INTERVAL milliseconds.

Good luck with your project. I think you'll see some success: a)
Erlang is perfect for what you're trying to do and b) you're tackling
a small enough problem that your chances of getting something useful
is quite high!

Garrett

[1] http://e2project.org



More information about the erlang-questions mailing list