[erlang-questions] Handling routing_key for topic routing in gen_server handle_info

Dmitry Belyaev be.dmitry@REDACTED
Wed Aug 21 07:24:17 CEST 2019

If we remove all the unrelated implementation details then we see a picture:

handle_info({incoming, Id, SomeDetails}, State) ->
   {noreply, State};
handle_info({processed, Result}, State) ->
   Id = ???, % how to get the value
   finalise(Id, Result),
   {noreply, State}

I can suggest the following approaches to do that assuming you cannot force
the processor to accept and pass your value back.
1. If you can switch to synchronous processing (which may be desirable to
prevent overloading of the processor) you can store the Id in the process
   handle_info({incoming, Id, SomeDetails}, #{current_id := undefined} =
State) ->
      {noreply, State#{current_id => Id}};
   handle_info({processed, Result}, #{current_id := Id} = State) ->
      finalise(Id, Result),
      {noreply, State#{current_id => undefined}}

2. If the processing must be asynchronous and there may be multiple
inflight processings, you can:
  a) make start_processing to accept the Id and to spawn a new process
which would receive the processed message (tcp reponse)
    handle_info({incoming, Id, SomeDetails}, State) ->
      spawn_link(fun ->
            {processed, Result} ->
                finalise(Id, Result)
     {noreply, State}

3. make mapping from sockets to Ids and keep it in the state
    handle_info({incoming, Id, SomeDetails}, State) ->
      Socket = start_processing(SomeDetails),
      {noreply, State#{Socket => Id}};
    handle_info({processed, Socket, Result}, State) ->
      #{Socket := Id} = State,
      finalise(Id, Result),
      {noreply, maps:delete(State, Socket)}.

Kind regards,
Dmitry Belyaev

On Fri, Aug 16, 2019 at 11:51 PM Sébastien BRICE <otb@REDACTED> wrote:

> Hello there,
> I am a bit new to the Erlang Environment
> I am writing an emailtesting application that filters incoming email
> with a randomly generated routing_keys on a topic exchange to make
> emails entering my system
> Once they are delivered (and processed) on an queue, I want to label
> them again with the previously randomly routing_key to route them to
> another exchange to make them ready for the final consume.
> This 2nd producing step is causing me real troubles
> I am getting data back from a tcp socket (processed by a third-tier
> program: spamassassin) with handle_info pattern matching
> I rely on a gen_server to consume messages first through the regular
> amqp_client/include/amqp_client.hrl Library
> I use handle_info in my gen_server behaviour and then pattern match on
> the parameters.
> Detecting delivered AMQP message is done through function heads
> (records) in handle_info callback
> ***My gen_server****
> handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag},
> Content}, State) ->
>      #amqp_msg{props = Properties, payload = Payload} = Content,
>      #'P_basic'{message_id = MessageId, headers = Headers} = Properties,
>      send_to_spamassassin:calcule_score(Payload),
>      {noreply, State};
> handle_info(Msg, State) ->
>      case Msg of
>          {_,_,Data} ->
>             scored_email:main(Data);
>          {_,_} ->
>      end,
>      {noreply, State}.
> ***send_to_spamassassin function ***
>      calcule_score(Message) ->
>      case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
>          {ok, Sock} ->
>>              gen_tcp:send(Sock, Message2);
>          {error,_} ->
>              io:fwrite("Connection error! Quitting...~n")
>      end.
> TCP socket is nice to talk with spamassassin, it returns me a 3-tuple
> with binary string data like that:
> {tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam:
> True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by
> XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug
> 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2
> (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level:
> *******\nX-Spam-Status: Yes, score=7.9 required=5.0
> autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version:
> 1.0\nContent-Type: multipart/mixed;
> boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}
> The loop in the second handle_info match OK the answer from the
> listening gen_tcp server, but I have to do the packaging to send it to a
> topic Exchange (topic_scored_email exchange)
> ***scored_email***
> main(Argv) ->
>      {ok, Connection} =
> amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
>      {ok, Channel} = amqp_connection:open_channel(Connection),
>      amqp_channel:call(Channel, #'exchange.declare'{exchange =
> <<"topic_scored_email">>,type = <<"topic">>}),
>      {RoutingKey, Message} = case Argv of
>>                              end,
>      amqp_channel:cast(Channel,#'basic.publish'{exchange =
> <<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload =
> Message}),
> First issue is type of the data (binary string) but I guess it can be
> workarounded using BIF binary_to_tuple or stuff like that.
> What I struggle to understand is how I could pass the righ RoutingKey,
> since Erlang is functionnal, there is no side effect or assignation.
> That change in format data (AMQP --> raw tcp --> then AMQP again) seems
> impossible (to me) to achieve with OTP abstraction
> However I would like to reassemble every processed message with the
> right routing key matched 5 lines above.
> How could I modify my code, to do that ? I come from imperative language
> and reach my limit here…
> Yours
> PS I know it is more a rabbitmq issue and I might be more successful to
> post on stackoverflow or rabbitmq google groups but I feel
> #Erlang-questions could come handy on that topic
