[erlang-questions] Make sure that there is only one consumer connected to the RabbitMQ queue

Tim Watson watson.timothy@REDACTED
Wed Mar 13 17:10:12 CET 2013


Hi Max,

Thinking about this a bit more... You're actually going to struggle with a queueing based solution to this, because queues are (by their very nature) enforcing a decoupling between clients. Not only this, but even if RabbitMQ did provide a `consumer_count_for(Queue)` API, that would be incredibly prone to race conditions. What if, immediately after returning the count == 1 the network connection breaks, but the caller doesn't bother to connect? And worse, what if the API returns count == 0 but before the new consumer tries to connect, someone else manages to 'beat him to it' and connect first!? That is why rabbit doesn't provide this API - it is error prone.

For your use case, this maybe doesn't matter if you're making the clients (nodes) cooperate with each other. Given that fact - i.e., if you're already making P2 connect to P1 and 'tell him to stop consuming messages' - then I'd suggest that the easiest way to handle this is thus:

Start P2. On start-up, P2 is 'informed' about other the other node, which in this case is [P1]. Once P2 has started, it connects to P1 and instructs it to stop working. P1 should nack/reject any messages it has received but not processed - unless it is using prefetch-count=1, in which case it will not have received any more until sending an ACK - and then disconnect from the queue. Whilst P1 is disconnected, messages will build up in the queue.

At this point, P1 needs to reply to P2 (via rpc:call/4 or something) that it is disconnected and shutting down. Now P2 (once connected to the broker) simply calls basic.consume to begin receiving messages. No message loss will occur.

Now you still have to deal with the case where P1 dies accidentally before replying to P2, or P2 dies after shutting down P1 but before connecting to the queue. Oh the joys of distribution! :)

> I have multiple nodes that might connect to the certain queue at my RabbitMQ server. I need to make sure that only the first of them will connect and all others will fail. Queue should not be deleted when currently connected node goes down.

If you don't like the solution above, one way to possibly make this happen is to have each consumer bind to an exchange using an exclusive queue and a temporary auto-delete exchange. I'll describe how to potentially do this, and hopefully you'll be put off by the complexity! :D

In this scenario, both the queue and the exchange are temporary. The exchange will be deleted once no more queues are using it, and the queue will be deleted once the exclusive consumer (connection) is finished with it. We will rely on these semantics to determine whether or not there are any active consumers. You would now bind the real exchange (which the senders are publishing to) to the temporary exchange - we will name it 'active' - using an exchange-2-exchange binding, which is a RabbitMQ extension to the AMQP protocol. In this topology, all messages sent to the main exchange - the ingress point for published messages - will be routed directly from the first exchange to the second one.

From the perspective of 'other consumers', in order to determine whether or not there is an active consumer already, you can check for the existence of the 'active' exchange. You can do that by issuing a passive exchange.declare, viz the spec:

-- https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.passive
"bit passive

If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.

* If set, and the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).
* If not set and the exchange exists, the server MUST check that the existing exchange has the same values for type, durable, and arguments fields. 

The server MUST respond with Declare-Ok if the requested exchange matches these fields, and MUST raise a channel exception if not."

So if you issue exchange.declare with `passive=true`, then you'll get a channel error (killing the channel) and you will know that someone else is consuming from the primary ingress exchange, because there is an exchange called 'active' which exists and therefore another client has a queue bound to it and is dealing with the incoming messages.

Now how do you 'fail over' between the two consumers, in a cooperative fashion and without loosing messages? It's not all that easy. Because you're doing something 'esoteric' here, when shutting down the active consumer we cannot simply issue connection.close to the broker. His queue is exclusive, so if it disappears all its unprocessed messages will be lost. What you need is a mechanism to ensure no messages are lost, so you will need another queue into which all messages will get routed - yes that's right, yet another queue! This queue is for tracking completed jobs. It is a permanent (durable) queue, into which the messages are delivered and it must not be exclusive nor auto-delete. There must be a second binding directly from the ingress exchange (NOT the 'active' exchange) to this tracking queue, so all inbound messages always arrive in it.

When the active consumer has taken a message from his personal (exclusive) queue and finished working with it, he must also dequeue it from this 'tracking queue' as well. The simplest method for doing this is probably to use synchronous basic.get, but if you're concerned about throughput you can use basic.consume (in another channel) and set `prefetch=1` using basic.qos, then only send the ACK for a message once it is finished being handled. The tracking task can be a gen_server, which has some code like this (below) to receive a 'tracking note' for the message and to send the ACK (and thereby remove the message from the tracking queue) in response to the worker - which is using a separate (exchange +) queue - indicating completion:

handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
            #amqp_msg{props = #'P_basic'{correlation_id = <<Id:64>>},
                      payload = Payload}},
           State = #state{continuations = Conts, channel = Channel}) ->
 %% store the correlation_id and wait to be told that we can ACK
 ... ok; ... etc

handle_call({finished, TaskKey}, _From, State) ->
 DeliveryTag = lookup_correlation_id(TaskKey, State),
 amqp_channel:call(State#state.channel, #'basic.ack'{delivery_tag = DeliveryTag}),
 {reply, ok, remove_correlation_id(TaskKey, State)}; ...


Now when the worker (P1) is ready to stop, it can simply disconnect. Here is what will happen then...

1. The exclusive queue will be deleted when the connection dies - all unprocessed messages therein will be lost
2. The exchange named 'active' is auto-delete, so once its only bound queue is deleted, it will also be removed 
3. All the unprocessed messages still in the shared 'tracking queue' will remain there
4. Any message delivered from the 'tracking queue' to the (now dead) consumer that has *not* been ack'ed, will be re-queued.


Phew. Now for the takeover, it's relatively simple(er). Whilst the 'active' exchange exists, the additional consumers should not connect. If the 'active' exchange does not exist, then they can become the active consumer by creating the exclusive queue, creating the 'active' exchange and binding it to the exclusive queue *first* and then binding the active exchange to the ingress exchange. Once all that is in place, the 'takeover' is almost complete, but of course there will be messages now in the 'tracking queue' which never made it to the 'active' exchange because it didn't exist when they arrived. So now the new consumer must start dequeueing messages from both the exclusive queue (arriving in it via the 'active' exchange) *AND* the tracking queue. Whilst the tracking queue has messages in it that are not in the 'active' queue, we process those first. Once we have caught up, then we swap back and process the messages from the active pipe and ACK them on the tracking queue second. 

Of course it will be very easy to end up with a race between the two in your single consumer. Just describing that design made me feel rather unhappy, so I'd suggest that it's *really* worth trying to find another way to frame the problem if you can.

Hope that all makes sense, sort of...

Cheers,

Tim

On 13 Mar 2013, at 14:23, Tim Watson wrote:

> Hi Max,
> 
> On 13 Mar 2013, at 14:07, Max Bourinov wrote:
>> I have multiple nodes that might connect to the certain queue at my RabbitMQ server. I need to make sure that only the first of them will connect and all others will fail. Queue should not be deleted when currently connected node goes down.
>> 
>> I think that exclusive = true and auto-delete = false will do the work. Am I right?
>> 
> 
> From http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare.exclusive:
> 
> "bit exclusive
> 
> Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed."
> 
> An exclusive queue will be deleted when the connection closes, whereas the auto-delete flag is for ensuring the queue is deleted once all consumers have finishing using it. Also, exclusive queues will exist only _for_ the current connection - i.e., no other connection will know about them - so they're not the right abstraction for limiting access to a shared resource. In fact, I'm not sure if AMQP provides a mechanism to do what you want.
> 
> Is there another way to approach this requirement, or is this specific behaviour a vital part of it? There are probably ways you could layer this on top of AMQP if you're using the erlang-client, but I suspect you'll have more luck trying to select a different routing topology rather than enforcing connectivity rules that the specification doesn't support.
> 
> Cheers,
> 
> Tim Watson
> Staff Engineer
> RabbitMQ / VMWare




More information about the erlang-questions mailing list