diff -r af848798cc65 include/rabbit.hrl --- a/include/rabbit.hrl Sat Jun 16 13:12:20 2007 +0100 +++ b/include/rabbit.hrl Wed Aug 01 00:23:22 2007 +0100 @@ -29,7 +29,7 @@ -record(realm, {name, exchanges, queues}). -record(user_realm, {username, realm, ticket_pattern}). --record(connection, {user, timeout_sec, heartbeat_sender_pid, frame_max, vhost}). +-record(connection, {user, timeout_sec, heartbeat_sender_pid, frame_max, vhost, reader_pid, writer_pid}). -record(content, {class_id, properties, %% either 'none', or a decoded record/tuple diff -r af848798cc65 src/amqp_client.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_client.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,458 @@ +-module(amqp_client, [Mod]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(gen_server). + +-export([start/3, start/2, stop/1]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([open_channel/3]). +-export([access_request/2]). +-export([queue_declare/4]). +-export([exchange_declare/9]). +-export([queue_bind/6]). +-export([basic_publish/7]). +-export([basic_consume/5]). +-export([basic_get/5]). + +-export([protocol_header/0]). + +-export([basic_properties/0]). + +-define(Reader, reader). +-define(Writer, writer). +-define(Username, username). +-define(Password, password). +-define(VHostPath, vhostpath). +-define(Peer, peer). +-define(ChannelNumber,channel_number). +-define(ClientTimeout, 20000). + +-record(amq_channel, {number, pid}). + +%--------------------------------------------------------------------------- +% Start / Stop functions +%--------------------------------------------------------------------------- + +start(User, Password) -> + gen_server:start_link({local, ?MODULE }, ?MODULE:new(Mod), + [list_to_binary(User), list_to_binary(Password), <<"localhost">>], []). + + +start(User, Password, Host) -> + gen_server:start_link({local, ?MODULE }, ?MODULE:new(Mod), + [list_to_binary(User), list_to_binary(Password), Host], []). + +stop(Channel) -> + gen_server:call({local, Channel }, stop), + ok. + +%--------------------------------------------------------------------------- +% API Methods +%--------------------------------------------------------------------------- + +open_channel(Connection, ChannelNumber, OutOfBand) -> + case Mod:is_direct() of + true -> + gen_server:call(Connection, {open_channel, ChannelNumber, OutOfBand}); + _ -> + {User, Pass, Host, ReaderPid, WriterPid} = gen_server:call(Connection, channel0), + {ok, Channel} = gen_server:start_link(?MODULE:new(Mod), [User, Pass, Host, ReaderPid, WriterPid], []), + gen_server:call(Channel, {open_channel, ChannelNumber, binary(OutOfBand)}), + Channel + end. + +access_request(Channel, Realm) -> + Access = #'access.request'{ realm = binary(Realm), + exclusive = false, + passive = true, + active = true, + write = true, + read = true}, + gen_server:call( Channel, {access_request, Access}). + +queue_declare(Channel, Host, Ticket, Q) -> + QueueDeclare = #'queue.declare'{ticket = Ticket, + queue = binary(Q), + passive = false, + durable = false, + exclusive = false, + auto_delete = false, + nowait = false, + arguments = []}, + gen_server:call( ?MODULE, {Channel, QueueDeclare, <<>>}). + +exchange_declare(Channel, Host, Ticket, Exchange, Type, Passive, Durable, AutoDelete, Map) -> + ExchangeDeclare = #'exchange.declare'{ticket = Ticket, exchange = binary(Exchange), + type = binary(Type), passive = Passive, + durable = Durable, auto_delete = AutoDelete, + internal = false, nowait = false, arguments = Map}, + gen_server:call( ?MODULE, {Channel, ExchangeDeclare, <<>>}). + +queue_bind(Channel, Host, Ticket, Q, Exchange, RoutingKey) -> + QueueBind = #'queue.bind'{ticket = Ticket, queue = binary(Q), + exchange = binary(Exchange), routing_key = binary(RoutingKey), + nowait = false, arguments = <<>>}, + gen_server:call( ?MODULE , {Channel, QueueBind, <<>>}). + +basic_get(Channel, Host, Ticket, Q, NoAck) -> + BasicGet = #'basic.get'{ticket = Ticket, queue = binary(Q), no_ack = NoAck}, + gen_server:call(?MODULE, {Channel, BasicGet, <<>>}). + +basicAck(Channel, DeliveryTag, Multiple) -> ok. +exchangeDelete(Channel, Ticket, Exchange) -> ok. +queueDelete(Channel, Ticket, Q) -> ok. +basicCancel(Channel, ConsumerTag) -> ok. + +txSelect(Channel) -> ok. +txCommit(Channel) -> ok. +txRollback(Channel) -> ok. + +basic_publish(Channel, Host, Ticket, Exchange, RoutingKey, Properties, Payload) -> + BasicPublish = #'basic.publish'{ticket = Ticket, exchange = binary(Exchange), + routing_key = binary(RoutingKey), + mandatory = false, immediate = false}, + Content = #content{class_id = 60, %% TODO HARDCODED VALUE + properties = Properties, %% either 'none', or a decoded record/tuple + properties_bin = 'none', %% either 'none', or an encoded properties binary + %% Note: at most one of properties and properties_bin can be 'none' at once. + payload_fragments_rev = [Payload] %% list of binaries, in reverse order (!) + }, + gen_server:cast(?MODULE, {Channel, BasicPublish, Content}). + +basic_consume(Channel, Host, Ticket, Q, Consumer) -> + BasicConsume = #'basic.consume'{ticket = Ticket, queue = binary(Q), + consumer_tag = <<"">>, + no_local = false, no_ack = false, exclusive = false, nowait = false}, + gen_server:call(?MODULE, {Channel, BasicConsume, Consumer}, ?ClientTimeout). + +basic_properties() -> + #'P_basic'{content_type = <<"application/octet-stream">>, delivery_mode = 1, priority = 0}. + +protocol_header() -> + <<"AMQP", 1, 1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>. + +%--------------------------------------------------------------------------- +% Internal plumbing +%--------------------------------------------------------------------------- + +binary(L) when is_list(L) -> + list_to_binary(L); + +binary(B) when is_binary(B) -> + B. + +channel_server_name(Channel, Host) -> + list_to_atom(lists:flatten(["ch",integer_to_list(Channel),"@",Host])). + +network_server_name(Channel,Host) -> + list_to_atom(lists:flatten(["amqp_client@",Host])). + +%% TODO These look a bit java-ish +getVHostPath(Table) -> + [{_,VHostPath}] = ets:lookup(Table, ?VHostPath), + VHostPath. + +getUsername(Table) -> + [{_,User}] = ets:lookup(Table, ?Username), + User. + +getPassword(Table) -> + [{_,Password}] = ets:lookup(Table, ?Password), + Password. + +get_reader(Table) -> + [{_,Reader}] = ets:lookup(Table, ?Reader), + Reader. +get_writer(Table) -> + [{_,Writer}] = ets:lookup(Table, ?Writer), + Writer. + +get_peer(Table) -> + case ets:lookup(Table, ?Peer) of + [{_,Peer}] -> + Peer; + _ -> + exit(peer_not_intialized) + end. + +get_channel_number(Table) -> + case ets:lookup(Table, ?ChannelNumber) of + [{_,ChannelNumber}] -> + ChannelNumber; + _ -> + exit(channel_not_intialized) + end. + +start_ok(Table) -> + case Mod:is_direct() of + true -> + #'connection.start_ok'{ + client_properties = [ + {<<"product">>,<<"Erlang-AMQC">>}, + {<<"version">>,<<"0.1">>}, + {<<"platform">>,<<"Erlang">>} + ], + mechanism = <<"AMQPLAIN">>, + response = [ + {<<"LOGIN">>, getUsername(Table) }, + {<<"PASSWORD">>, getPassword(Table) } + ], + locale = <<"en_US">>}; + _ -> + #'connection.start_ok'{ + client_properties = [ + {<<"product">>, longstr, <<"Erlang-AMQC">>}, + {<<"version">>, longstr, <<"0.1">>}, + {<<"platform">>,longstr, <<"Erlang">>} + ], + mechanism = <<"AMQPLAIN">>, + response = [ + {<<"LOGIN">>, longstr, getUsername(Table) }, + {<<"PASSWORD">>, longstr, getPassword(Table) } + ], + locale = <<"en_US">>} + end. + + +%% DEPRECATED +getChannel(Channel, Table) -> + case ets:lookup(Table, Channel) of + [{_,Chan}] -> + Chan; + _ -> + exit( {channel_not_intialized, Channel}) + end. + +%--------------------------------------------------------------------------- +% AMQP message handling +%--------------------------------------------------------------------------- + +%% connection.close_ok +amqp_handle(Peer, {Channel, #'connection.close_ok'{} }, Table ) -> + ok; + +%% Anything else +amqp_handle(Peer, Message, Table ) -> + exit({amqp_client, unhandled_message , Message}). + +%--------------------------------------------------------------------------- +% AMQP message sending +%--------------------------------------------------------------------------- + +%% Sends AMQP messages +amqp_send(Method, Channel, Table) -> + Chan = getChannel(Channel, Table), + Chan#amq_channel.pid ! { self(), Method }. + +amqp_send(Method, Content, Channel, Table) -> + Chan = getChannel(Channel, Table), + Chan#amq_channel.pid ! { method, Method, Content }. + +amqp_send(Method, Content, Channel, ConsumerPid, Table) -> + Chan = getChannel(Channel, Table), + Chan#amq_channel.pid ! { method, Method, Content, ConsumerPid }. + +%% NEW AMQP SEND METHODS +amqp_s_init(Method, Table) -> + io:format("Sending method ~p to peer~n",[Method]), + Peer = get_peer(Table), + Peer ! { self(), Method }. + +amqp_s(Method, Table) -> + io:format("Sending method ~p to peer~n",[Method]), + Peer = get_peer(Table), + Message = case Mod:is_direct() of + true -> + { method, Method}; + _ -> + { self(), Method} + end, + Peer ! Message. + +amqp_s(Method, Content, Table) -> + io:format("Sending method ~p, content ~p to peer~n",[Method, Content]), + Peer = get_peer(Table), + Message = case Mod:is_direct() of + true -> + { method, Method, Content }; + _ -> + { self(), Method, Content } + end, + Peer ! Message. + +%--------------------------------------------------------------------------- +% AMQP blocking reads +%--------------------------------------------------------------------------- + +%% This is an experimental blocking read in order to implement the bottom half of an RPC +amqp_receive(Channel) when is_integer(Channel) -> + receive + {send_command, Content } -> + Content; + {Sender, Channel, Method } -> + Method; + {deliver, DeliveryStyle, ConsumerTagOrMessageCount, AckRequired, QName, SenderPid, Message} -> + {DeliveryStyle, ConsumerTagOrMessageCount, AckRequired, QName, SenderPid, Message}; + Other -> + io:format("amqp_receive(Channel) - This shouldn't be received:--> ~p~n",[Other]), + exit(unknown_protocol_sequence) + end; + +%% This is an experimental blocking read in order to implement the bottom half of an RPC +amqp_receive(Method) -> + receive + {frame, Sender, Channel, {method, Method, Content} } -> + {Sender, rabbit_framing:decode_method_fields(Method, Content)}; + {Sender, M} -> + {Sender,M}; + {Sender, ChannelNumber, M} -> + M; + {Sender, Method} -> + {Sender,Method}; + {send_command, { Method, Content } } -> + Content; + Other -> + io:format("amqp_receive(Method) - This shouldn't be received:--> ~p~n",[Other]), + exit(unknown_protocol_sequence) + end. + +%% This is an experimental blocking read in order to implement the bottom half of an RPC +amqp_receive(Channel, Method) -> + receive + {send_command, { Channel, Method } } -> + Method; + {Sender, { Channel, {Method} } } -> + Method; + Other -> + io:format("amqp_receive/2 - This shouldn't be received:--> ~p~n",[Other]), + exit(unknown_protocol_sequence) + end. + +%--------------------------------------------------------------------------- +% gen_server callbacks +%--------------------------------------------------------------------------- + +init([User, Pass, VHostPath]) -> + { ReaderPid, WriterPid } = Mod:handshake(self(), VHostPath ), + Table = ets:new(?MODULE,[]), + ets:insert(Table, {?Username, User} ), + ets:insert(Table, {?Password, Pass} ), + ets:insert(Table, {?VHostPath, VHostPath} ), + ets:insert(Table, {?Reader, ReaderPid} ), + ets:insert(Table, {?Writer, WriterPid} ), + { Peer, #'connection.start'{version_major = MajorVersion, + version_minor = MinorVersion, + server_properties = Properties, + mechanisms = Mechansims, + locales = Locales }} = amqp_receive('connection.start'), + WriterPid ! {self(), start_ok(Table) }, + {Peer, #'connection.tune'{channel_max = ChannelMax, + frame_max = FrameMax, + heartbeat = Heartbeat} } = amqp_receive('connection.tune'), + TuneOk = #'connection.tune_ok'{channel_max = ChannelMax, frame_max = FrameMax, heartbeat = Heartbeat}, + WriterPid ! {self(), TuneOk }, + %% This is something where I don't understand the protocol, + %% What happens if the following command reaches the server before the tune ok? + %% Or doesn't get sent at all? + ConnectionOpen = #'connection.open'{virtual_host = <<"localhost">>, + capabilities = <<"">>, + insist = false }, + WriterPid ! {self(), ConnectionOpen}, + case amqp_receive('connection.open_ok') of + {Peer, {0, #'connection.open_ok'{known_hosts = KnownHosts}}} -> + ok; + {Peer, #'connection.open_ok'{known_hosts = KnownHosts}} -> + ok; + _ -> + exit(unknown_protocol_sequence) + end, + {ok, Table}; + +%% This gets called to open a new network client channel process +init( [User, Pass, VHostPath, ReaderPid, WriterPid] ) -> + Table = ets:new(?MODULE,[]), + ets:insert(Table, {?Username, User} ), + ets:insert(Table, {?Password, Pass} ), + ets:insert(Table, {?VHostPath, VHostPath} ), + ets:insert(Table, {?Reader, ReaderPid} ), + ets:insert(Table, {?Writer, WriterPid} ), + {ok,Table}. + +%% Return the reader and writer pids for the network client +handle_call( channel0, From, Table) -> + {reply, { getUsername(Table), + getPassword(Table), + getVHostPath(Table), + get_reader(Table), + get_writer(Table) }, Table}; + +%% Create and open a channel +handle_call( {open_channel, ChannelNumber, OutOfBand}, From, Table) -> + Connection = case Mod:is_direct() of + true -> + #connection{user = #user{username = getUsername(Table)} , + vhost = getVHostPath(Table), + reader_pid = self(), + writer_pid = self()}; + _ -> + #connection{user = #user{username = getUsername(Table)} , + vhost = getVHostPath(Table), + reader_pid = get_reader(Table), + writer_pid = get_writer(Table)} + end, + {Con, Peer} = Mod:open_channel({ChannelNumber, OutOfBand}, Connection), + io:format("Bound AMQP client channel ~p to pid ~p, peer pid is ~p~n", [ChannelNumber, self(), Peer]), + ets:insert(Table, {?Peer, Peer}), + amqp_s_init(#'channel.open'{out_of_band = OutOfBand}, Table), + OpenOk = amqp_receive('channel.open_ok'), + {reply, self(), Table}; + +%% Access a realm +handle_call({ access_request, Method}, From , Table) -> + amqp_s(Method, Table), + %% TODO Think about a case block here for better exception handling + %% Also, this case block arises from the difference messages are being passed + %% in the direct and networked cases, this can handled a lot better + case amqp_receive('access.request_ok') of + % network case + {Peer, #'access.request_ok'{ticket = NextTicket}} -> + ok; + % direct case + #'access.request_ok'{ticket = NextTicket} -> + ok + end, + {reply, NextTicket, Table}; + +%% TODO look at how to apply this to access_request +%% This is effectively RPC over AMQP +handle_call({ Channel, Method, Content }, From , Table) -> + case is_pid(Content) of + true -> + %% This gets called when you want to register a subscriber's Pid. + amqp_send(Method, <<>>, Channel, Content, Table); + _ -> + amqp_send(Method, Content, Channel, Table) + end, + %% Do a blocking receive of the result + {reply, amqp_receive(Channel), Table}; + + +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + +%% Non RPC calls +handle_cast( { Channel, Method, Content }, Table) -> + amqp_send(Method, Content, Channel, Table), + {noreply, Table}. + +handle_info( {Peer, Message}, Table) -> + amqp_handle(Peer, Message, Table), + {noreply, Table}. + +terminate(_Reason, State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + State. \ No newline at end of file diff -r af848798cc65 src/amqp_client_test.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_client_test.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,81 @@ +-module(amqp_client_test). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([direct_client_test/0, network_client_test/0]). + +network_client_test() -> + ChannelNumber = 1, + Host = "localhost", + User = "admin", + Password = "changeit", + Realm = "/data", + Q = "a.b.c", + X = "x", + BindKey = "a.b.c.*", + RoutingKey = "a.b.c.d", + Payload = <<"foobar">>, + + %% Start a connection to the server + AMQPClient = amqp_client:new(amqp_network_driver), + {ok, Connection} = AMQPClient:start(User, Password, Host), + + %% Set up the exchange + Channel = AMQPClient:open_channel(Connection, ChannelNumber, ""), + io:format("Opening client channel process: ~p~n",[Channel]), + Ticket = AMQPClient:access_request(Channel, Realm), + io:format("Received ticket: ~p~n",[Ticket]). + +direct_client_test() -> + ChannelNumber = 1, + User = "admin", + Password = "changeit", + Realm = "/data", + Q = "a.b.c", + X = "x", + BindKey = "a.b.c.*", + RoutingKey = "a.b.c.d", + Payload = <<"foobar">>, + + %% Start a connection to the server + AMQPClient = amqp_client:new(amqp_direct_driver), + {ok, Connection} = AMQPClient:start(User, Password), + + %% Set up the exchange + Channel = AMQPClient:open_channel(Connection, ChannelNumber, ""), + io:format("Opening client channel process: ~p~n",[Channel]), + Ticket = AMQPClient:access_request(Channel, Realm), + io:format("Received ticket: ~p~n",[Ticket]). + + %#'queue.declare_ok'{queue = Q1, + % message_count = MessageCount, + % consumer_count = ConsumerCount} + % = AMQPClient:queue_declare(Channel, Ticket,Q), + + %#'exchange.declare_ok'{} = AMQPClient:exchange_declare(Channel, Ticket, X, "topic", false, false, true, <<>>), + %#'queue.bind_ok'{} = AMQPClient:queue_bind(Channel, Ticket, Q, X, BindKey), + + %% Publish some data + %AMQPClient:basic_publish(Channel, Ticket, X, RoutingKey, AMQPClient:basic_properties(), Payload ), + + %% Get that data + %{DeliveryStyle, ConsumerTagOrMessageCount, AckRequired, QName, SenderPid, Message} + % = AMQPClient:basic_get(Channel, Ticket, Q, true), + %io:format("Basic get: ~p~n",[ (Message#basic_message.content)#content.payload_fragments_rev ]), + + %% Publish some more data and consume it + %AMQPClient:basic_publish(Channel, Ticket, X, RoutingKey, AMQPClient:basic_properties(), Payload ), + + %{ok, Consumer} = gen_event:start_link({local, consumer}), + %gen_event:add_handler(consumer, amqp_consumer , [] ), + + %#'basic.consume_ok'{consumer_tag = ConsumerTag} = AMQPClient:basic_consume(Channel, Ticket, Q, Consumer), + %io:format("Consumer Tag: ~p~n",[ConsumerTag]), + + %receive + %after 5000 -> + % gen_event:stop(consumer) + %end. + + \ No newline at end of file diff -r af848798cc65 src/amqp_consumer.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_consumer.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,20 @@ +-module(amqp_consumer). +-behaviour(gen_event). + +-export([init/1, handle_info/2, terminate/2]). + +%--------------------------------------------------------------------------- +% gen_event callbacks +%--------------------------------------------------------------------------- + +init(Args) -> + {ok, []}. + +handle_info({deliver, DeliveryStyle, ConsumerTagOrMessageCount, AckRequired, QName, SenderPid, Message}, State) -> + io:format("---------------------------~n"), + io:format("AMQP Consumer, rec'd: ~p~n", [ amqp_util:message_payload(Message) ] ), + io:format("---------------------------~n"), + {ok, State}. + +terminate(Args, State) -> + ok. \ No newline at end of file diff -r af848798cc65 src/amqp_direct_driver.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_direct_driver.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,19 @@ +-module(amqp_direct_driver). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([handshake/2, open_channel/2, is_direct/0]). + +is_direct() -> + true. + +handshake(ChannelPid, Host) -> + AMQPClient = amqp_client:new(?MODULE), + rabbit_reader:handle_input(handshake, AMQPClient:protocol_header(), ChannelPid ). + +open_channel(Method, Connection) -> + rabbit_reader:handle_input(open_channel, Method, Connection ). + + + diff -r af848798cc65 src/amqp_network_driver.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_network_driver.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,139 @@ +-module(amqp_network_driver). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([handshake/2, open_channel/2, is_direct/0]). +-export([start_reader/2, start_writer/2, init/2]). + +%--------------------------------------------------------------------------- +% API Methods +%--------------------------------------------------------------------------- +is_direct() -> + false. + +handshake(Channel0, Host) -> + {ReaderPid, WriterPid} = init(Channel0, Host), + {ReaderPid, WriterPid }. + +open_channel({ChannelNumber, OutOfBand}, + #connection{user = #user{username = Username} , + vhost = VHostPath, + reader_pid = ReaderPid, + writer_pid = WriterPid}) -> + ReaderPid ! {self(), ChannelNumber}, + io:format("enqueuing myself ~p with pid ~p for chan ~p~n",[self(),WriterPid, ChannelNumber]), + WriterPid ! {self(), ChannelNumber}, + %% TODO Don't it is necessary to return the pids that the called passed in themselves + %% Just trying to be consistent with the other driver + {ReaderPid, WriterPid}. + + +%--------------------------------------------------------------------------- +% Internal plumbing +%--------------------------------------------------------------------------- + +init(Channel0, Host) -> + AMQPClient = amqp_client:new(?MODULE), + case gen_tcp:connect(Host, 5672, [binary, {packet, 0},{active,false}]) of + {ok, Sock} -> + ok = gen_tcp:send(Sock, AMQPClient:protocol_header()), + ReaderPid = spawn_link(?MODULE, start_reader, [Sock, Channel0]), + WriterPid = spawn_link(?MODULE, start_writer, [Sock, Channel0]), + io:format("Started the network driver~n"), + {ReaderPid,WriterPid}; + {error, Reason} -> + io:format("Could not start the network driver: ~p~n",[Reason]), + exit(Reason) + end. + +start_reader(Sock, Channel0) -> + put({channel, 0},{chpid, Channel0}), + reader_loop(Sock, 7). + +start_writer(Sock, Channel0) -> + put({chpid, Channel0}, {channel, 0}), + writer_loop(Sock). + +reader_loop(Sock, Length) -> + case gen_tcp:recv(Sock, Length, -1) of + {ok, <>} -> + case gen_tcp:recv(Sock, PayloadSize + 1, -1) of + {ok, <>} -> + maybe_send_heartbeat(Sock), + handle_frame(Type, Channel, Payload), + reader_loop(Sock, 7); + R -> exit(R) + end; + R -> + io:format("Got ~p~n", [R]), + exit(R) + end, + gen_tcp:close(Sock). + +writer_loop(Sock) -> + receive + {Sender, Channel} when is_integer(Channel) -> + put({chpid, Sender},{channel, Channel}), + writer_loop(Sock); + {Sender, MethodRecord} when is_pid(Sender) -> + Channel = resolve_channel(Sender), + io:format("About to send ~p to channel ~p ~n",[MethodRecord, Channel]), + rabbit_writer:internal_send_command(Sock, Channel, MethodRecord), + writer_loop(Sock); + {Sender, MethodRecord, Content} when is_pid(Sender) -> + Channel = resolve_channel(Sender), + io:format("About to send ~p with content ~p to channel ~p ~n",[MethodRecord, Content, Channel]), + %% TODO where does this FrameMax value come from + FrameMax = 110000, + rabbit_writer:internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax), + writer_loop(Sock); + Other -> + io:format("Deal with this old chap ~p~n",[Other]), + writer_loop(Sock) + end. + +maybe_send_heartbeat(Sock) -> + io:format("Sending heartbeat~n"), + %% TODO Implement this properly + ok = gen_tcp:send(Sock, [<>,0,0,0,0,0,0,<>]). + + +handle_frame(Type, Channel, Payload) -> + case rabbit_reader:analyze_frame(Type, Payload) of + heartbeat when Channel /= 0 -> + rabbit_misc:die(frame_error); + heartbeat -> + heartbeat; + trace when Channel /= 0 -> + rabbit_misc:die(frame_error); + trace -> + trace; + AnalyzedFrame -> + ChPid = resolve_receiver(Channel), + ChPid ! {frame, self(), Channel, AnalyzedFrame} + end. + +resolve_receiver(Channel) -> + case get({channel, Channel}) of + {chpid, ChPid} -> + ChPid; + undefined -> + receive + {Sender,Channel} -> + put({channel, Channel},{chpid, Sender}), + Sender + after 1000 -> + io:format("Could not resolve receiver from channel ~p~n",[Channel]), + exit(unknown_channel) + end + end. + +resolve_channel(Sender) -> + case get({chpid, Sender}) of + {channel, Channel} -> + Channel; + undefined -> + io:format("Could not resolve channel from sender ~p~n",[Sender]), + exit(unknown_sender) + end. \ No newline at end of file diff -r af848798cc65 src/amqp_util.erl --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/amqp_util.erl Wed Aug 01 00:23:22 2007 +0100 @@ -0,0 +1,18 @@ +-module(amqp_util). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([message_payload/1,channel_server_name/2,network_server_name/2]). + +channel_server_name(Channel, Host) -> + list_to_atom(lists:flatten(["ch",integer_to_list(Channel),"@",Host])). + +network_server_name(Channel,Host) -> + list_to_atom(lists:flatten(["amqp_client@",Host])). + +message_payload(Message) -> + (Message#basic_message.content)#content.payload_fragments_rev. + + + diff -r af848798cc65 src/rabbit.erl --- a/src/rabbit.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit.erl Wed Aug 01 00:23:22 2007 +0100 @@ -73,7 +73,7 @@ start(normal, []) -> end, {ok, Host} = application:get_env(listen_host), {ok, Port} = application:get_env(listen_port), - gen_server:start_link(?MODULE, [Host, Port], []). + gen_server:start_link({local, rabbit},?MODULE, [Host, Port], []). stop(_State) -> ok. @@ -223,8 +223,14 @@ code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) -> State. -handle_call(_Request, _From, State) -> - {reply, ignored, State}. +handle_call({Type, Request}, {Pid,Ref}, State) -> + spawn(rabbit_reader, handle_input, [Type, Request, Pid]), + {reply, ignored, State}; + +handle_call({Type, {User,VHost}, Request}, {Pid,Ref}, State) -> + Connection = #connection{user = #user{username = User} , vhost = VHost, reader_pid = Pid}, + ChPid = rabbit_reader:handle_input(Type, Request, Connection), + {reply, ChPid, State}. handle_cast(_Request, State) -> {noreply, State}. diff -r af848798cc65 src/rabbit_amqqueue.erl --- a/src/rabbit_amqqueue.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_amqqueue.erl Wed Aug 01 00:23:22 2007 +0100 @@ -28,7 +28,7 @@ -export([lookup/1, lookup_or_die/2, stat/1, stat_all/0, deliver/3]). -export([add_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/1]). -import(mnesia). @@ -175,12 +175,12 @@ basic_get(#amqqueue{pid = QPid}, NoAck, basic_get(#amqqueue{pid = QPid}, NoAck, WriterPid) -> gen_server:cast(QPid, {basic_get, NoAck, WriterPid}). -basic_consume(Q, NoAck, ReaderPid, WriterPid, <<>>, ExclusiveConsume, NoWait) -> - basic_consume(Q, NoAck, ReaderPid, WriterPid, +basic_consume(Q, NoAck, ReaderPid, WriterPid, AckPid, <<>>, ExclusiveConsume, NoWait) -> + basic_consume(Q, NoAck, ReaderPid, WriterPid, AckPid, list_to_binary(rabbit_gensym:gensym("amq.ctag")), ExclusiveConsume, NoWait); -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, WriterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, WriterPid, AckPid, ConsumerTag, ExclusiveConsume, NoWait) -> - case gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, WriterPid, + case gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, WriterPid, AckPid, ConsumerTag, ExclusiveConsume, NoWait}) of ok -> {ok, ConsumerTag}; diff -r af848798cc65 src/rabbit_amqqueue_process.erl --- a/src/rabbit_amqqueue_process.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_amqqueue_process.erl Wed Aug 01 00:23:22 2007 +0100 @@ -303,7 +303,7 @@ handle_call({deliver, Message}, _From, S {reply, false, State} end; -handle_call({basic_consume, NoAck, ReaderPid, WriterPid, ConsumerTag, ExclusiveConsume, NoWait}, +handle_call({basic_consume, NoAck, ReaderPid, WriterPid, AckPid, ConsumerTag, ExclusiveConsume, NoWait}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -327,7 +327,7 @@ handle_call({basic_consume, NoAck, Reade true -> ExistingHolder end, round_robin = queue:in({WriterPid, Consumer}, RoundRobin)}, - ok = maybe_send(NoWait, WriterPid, + ok = maybe_send(NoWait, AckPid, #'basic.consume_ok'{consumer_tag = ConsumerTag}), {reply, ok, run_poke_burst(State1)} end diff -r af848798cc65 src/rabbit_channel.erl --- a/src/rabbit_channel.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_channel.erl Wed Aug 01 00:23:22 2007 +0100 @@ -25,7 +25,7 @@ -include("rabbit.hrl"). -export([read_frame/0, read_method/0, read_method/1]). --export([start/4]). +-export([start/3,start/4]). read_frame() -> receive @@ -92,10 +92,36 @@ read_method(ExpectedMethodName) -> tx, reader_pid, writer_pid, + acknowledge_pid, + direct = false, username, virtual_host, most_recently_declared_queue, next_ticket}). + +start(Connection, Channel, Timeout) -> + ReaderPid = Connection#connection.reader_pid, + receive + {Sender, #'channel.close_ok'{}} -> + % We are required by the spec to ignore unsolicited close-ok messages. + exit(normal); + {Sender, #'channel.open'{}} -> + rabbit_log:info("~p: Opening channel ~p~n", [ReaderPid, Channel]), + Sender ! {self(), {Channel, #'channel.open_ok'{} } }, + Tx0 = rabbit_transaction:start(), + Tx1 = rabbit_transaction:set_writer_pid(Tx0, self()), + mainloop_without_framing(#ch{ channel = Channel, + tx = Tx1, + reader_pid = ReaderPid, + writer_pid = Connection#connection.writer_pid, + acknowledge_pid = ReaderPid, + direct = true, + username = (Connection#connection.user)#user.username, + virtual_host = Connection#connection.vhost, + most_recently_declared_queue = <<>>, + next_ticket = 101 + }) + end. start(ReaderPid, Channel, Sock, Connection) -> case read_method() of @@ -112,6 +138,7 @@ start(ReaderPid, Channel, Sock, Connecti tx = Tx1, reader_pid = ReaderPid, writer_pid = WriterPid, + acknowledge_pid = WriterPid, username = (Connection#connection.user)#user.username, virtual_host = Connection#connection.vhost, most_recently_declared_queue = <<>>, @@ -121,11 +148,33 @@ start(ReaderPid, Channel, Sock, Connecti rabbit_misc:die(channel_error, rabbit_misc:method_record_type(MethodRecord)) end. -send(#ch{ writer_pid = W }, MethodRecord) -> - ok = rabbit_writer:send_command(W, MethodRecord). - -%%send(#ch{ writer_pid = W }, MethodRecord, Content) -> -%% ok = rabbit_writer:send_command(W, MethodRecord, Content). +send(#ch{ channel = Channel, writer_pid = W, direct = Direct }, MethodRecord) -> + case Direct of + true -> + W ! {self(), Channel, MethodRecord}, + ok; + _ -> + ok = rabbit_writer:send_command(W, MethodRecord) + end. + +mainloop_without_framing(State) -> + receive + {method, MethodRecord} -> + State1 = handle_method(MethodRecord, <<>>, State), + mainloop_without_framing(State1); + {method, MethodRecord, Content} -> + State1 = handle_method(MethodRecord, Content, State), + mainloop_without_framing(State1); + {method, MethodRecord, Content, SubscriberPid} -> + WriterPid = State#ch.writer_pid, + State0 = State#ch{writer_pid = SubscriberPid}, + State1 = handle_method(MethodRecord, Content, State0), + State2 = State1#ch{writer_pid = WriterPid}, + mainloop_without_framing(State2); + Other -> + rabbit_log:error("Unexpected ch~p content: ~p~n", [State#ch.channel, Other]), + mainloop_without_framing(State) + end. mainloop(State) -> case read_frame() of @@ -279,14 +328,15 @@ handle_method(#'basic.consume'{ticket = exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{ reader_pid = ReaderPid, - writer_pid = WriterPid }) -> + writer_pid = WriterPid, + acknowledge_pid = AckPid}) -> case get_consumer_mapping(ConsumerTag) of undefined -> ActualQueueName = expand_queue_name_shortcut(QueueNameBin, State, 'basic.consume'), rabbit_ticket:check_ticket(TicketNumber, ActualQueueName, #ticket.read_flag, 'basic.consume'), Q = rabbit_amqqueue:lookup_or_die(ActualQueueName, 'basic.consume'), - case rabbit_amqqueue:basic_consume(Q, NoAck, ReaderPid, WriterPid, + case rabbit_amqqueue:basic_consume(Q, NoAck, ReaderPid, WriterPid, AckPid, ConsumerTag, ExclusiveConsume, NoWait) of {ok, ActualConsumerTag} -> put_consumer_mapping(ActualConsumerTag, ActualQueueName), diff -r af848798cc65 src/rabbit_channel0.erl --- a/src/rabbit_channel0.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_channel0.erl Wed Aug 01 00:23:22 2007 +0100 @@ -24,7 +24,13 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start/4]). +-export([start/3,start/4]). + +start(ReaderPid, ChannelNum, Timeout) -> + {ok, UserDetails} = do_connection_start(ReaderPid, Timeout), + ok = do_connection_tune(ReaderPid, Timeout), + {ok, VHostPath} = do_connection_open(ReaderPid, UserDetails, Timeout), + wait_for_close(ReaderPid). start(ReaderPid, _ChannelNum, Sock, InitialConnection) -> {ok, UserDetails} = do_connection_start(Sock), @@ -61,6 +67,44 @@ read_method(Expected) -> ?LOGMESSAGE(in, 0, M, C), Result. +connection_start_properties() -> + #'connection.start'{version_major = 8, + version_minor = 1, + server_properties = + [{<<"product">>, longstr, <<"RabbitMQ">>}, + {<<"version">>, longstr, <<"1.0">>}, + {<<"platform">>, longstr, <<"Erlang/OTP">>}, + {<<"copyright">>, longstr, <<"LShift Ltd">>}], + mechanisms = <<"PLAIN">>, + locales = <<"en_US">> }. + +connection_tune_properties() -> + #'connection.tune'{channel_max = 0, + frame_max = 131072, %% set to zero once QPid fix their negotiation + heartbeat = 0 }. + +connection_ok_properties() -> + #'connection.open_ok'{known_hosts = <<>>}. + +do_connection_start(Pid, Timeout) when is_pid(Pid) -> + Pid ! {self(), connection_start_properties()}, + receive + {Sender, #'connection.start_ok'{mechanism = Mechanism, response = LoginTable}} -> + case { lists:keysearch(<<"LOGIN">>, 1, LoginTable), + lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of + {{value, {_, UsernameBin}}, + {value, {_, PasswordBin}}} -> + {ok, _U} = rabbit_access_control:user_pass_login(UsernameBin, PasswordBin); + _ -> + rabbit_misc:die(access_refused) + end; + _ -> + rabbit_misc:die(protocol_mismatch) + after Timeout -> + ?LOGDEBUG("Connection start: Pid ~p timing out~n",[Pid]), + rabbit_misc:die(client_timeout) + end. + do_connection_start(Sock) -> ok = rabbit_writer:internal_send_command (Sock, 0, @@ -76,7 +120,20 @@ do_connection_start(Sock) -> {ok, #'connection.start_ok'{mechanism = Mechanism, response = Response}, _} = read_method('connection.start_ok'), {ok, _U} = rabbit_access_control:check_login(Mechanism, Response). - + +do_connection_tune(Pid, Timeout) when is_pid(Pid) -> + Pid ! { self(), connection_tune_properties() } , + receive + {Sender, #'connection.tune_ok'{channel_max = ChannelMax} } -> + rabbit_log:info("Tuned channel_max = ~p.~n", [ChannelMax]); + _ -> + rabbit_misc:die(protocol_mismatch) + after Timeout -> + ?LOGDEBUG("Connection start: Pid ~p timing out~n",[Pid]), + rabbit_misc:die(client_timeout) + end, + ok. + do_connection_tune(Sock) -> ok = rabbit_writer:internal_send_command (Sock, 0, @@ -99,6 +156,25 @@ do_connection_open(Sock, UserDetails) -> ok = rabbit_writer:internal_send_command(Sock, 0, #'connection.open_ok'{known_hosts = <<>>}), {ok, VHostPath}. +do_connection_open(Pid, UserDetails, Timeout) when is_pid(Pid) -> + receive + {Sender, #'connection.open'{virtual_host = VHostPath} } -> + rabbit_access_control:check_vhost_access(UserDetails, VHostPath), + Pid ! { self(), {0, connection_ok_properties() } }, + {ok, VHostPath}; + _ -> + rabbit_misc:die(protocol_mismatch) + after Timeout -> + ?LOGDEBUG("Connection start: Pid ~p timing out~n",[Pid]), + rabbit_misc:die(client_timeout) + end. + +wait_for_close(Pid) when is_pid(Pid) -> + receive + {Sender,'connection.close'} -> + Sender ! { self(), 0 , #'connection.close_ok'{} } + end; + wait_for_close(Sock) -> {ok, _, _} = read_method('connection.close'), - ok = rabbit_writer:internal_send_command(Sock, 0, #'connection.close_ok'{}). + ok = rabbit_writer:internal_send_command(Sock, 0, #'connection.close_ok'{} ). diff -r af848798cc65 src/rabbit_framing.erl --- a/src/rabbit_framing.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_framing.erl Wed Aug 01 00:23:22 2007 +0100 @@ -1355,9 +1355,10 @@ encode_method_fields(#'connection.start_ F0Tab = rabbit_binary_generator:generate_table(F0), F0Len = size(F0Tab), F1Len = size(F1), - F2Len = size(F2), + F2Tab = rabbit_binary_generator:generate_table(F2), + F2Len = size(F2Tab), F3Len = size(F3), - <>; + <>; encode_method_fields(#'connection.secure'{challenge = F0}) -> F0Len = size(F0), <>; diff -r af848798cc65 src/rabbit_reader.erl --- a/src/rabbit_reader.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_reader.erl Wed Aug 01 00:23:22 2007 +0100 @@ -26,7 +26,9 @@ -export([accept_and_start/1]). -export([init/1, start_connection/1]). +-export([handle_input/3]). -export([lookup_amqp_exception/1]). +-export([analyze_frame/2]). -import(gen_tcp). -import(fprof). @@ -117,7 +119,7 @@ mainloop(State = #v1{recv_ref = RecvRef, {inet_async, _Sock, AsyncRef, Status} when AsyncRef == RecvRef -> case Status of {ok, Data} -> - %%?LOGDEBUG("Reader data (~p): ~p~n", [size(Data), Data]), + io:format("Reader data (~p): ~p~n", [size(Data), Data]), mainloop(handle_input(State#v1.callback, Data, State#v1{recv_ref = none})); {error, closed} -> rabbit_log:info("Connection closed abruptly.~n"), @@ -176,6 +178,7 @@ handle_frame(Type, Channel, Payload, #v1 State; AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), + io:format("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> ChPid ! {frame, Channel, AnalyzedFrame}, @@ -219,24 +222,32 @@ handle_input({frame_payload, Type, Chann %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]), State1 = handle_frame(Type, Channel, Payload, State), switch_callback(State1, frame_header, 7); - _ -> - rabbit_log:error("Bad payload received - wrong frame end marker?~n"), + Other -> + rabbit_log:error("Bad payload received - wrong frame end marker? ~p~n",[binary_to_list(Other)]), exit(reader_bad_payload) end; +handle_input(handshake, + <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>, + Pid) when is_pid(Pid)-> + open_channel(0, rabbit_channel0, start, Pid); + handle_input(handshake, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>, State) -> {State1, _ChPid} = open_channel(0, rabbit_channel0, start, State), switch_callback(State1#v1{connection = (State1#v1.connection)#connection{timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, frame_header, 7); - + connection_state = starting}, frame_header, 7); + +handle_input(open_channel, {Channel, OutOfBand}, Connection)-> + open_channel(Connection, Channel, rabbit_channel, start); + handle_input(handshake, Other, #v1{sock = Sock}) -> rabbit_log:error("Bad header received: ~p~n", [Other]), ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), exit(reader_bad_header); - + handle_input(Callback, timeout, #v1{connection = #connection{timeout_sec = TimeoutSec}}) -> rabbit_log:error("Reader timeout (~p s) in state ~p~n", [TimeoutSec, Callback]), @@ -247,11 +258,24 @@ handle_input(Callback, Data, _State) -> rabbit_log:error("Bad input to reader in state ~p: ~p~n", [Callback, Data]), exit(reader_bad_input). +open_channel(Channel, Mod, Fun, State) when is_pid(State) -> + ChPid = spawn_link(Mod, Fun, [State, Channel, ?NORMAL_TIMEOUT]), + open_channel_internal(Channel,ChPid), + {State, ChPid}; + +open_channel(Connection, Channel, Mod, Fun) when is_integer(Channel) -> + ChPid = spawn_link(Mod, Fun, [Connection, Channel, ?NORMAL_TIMEOUT]), + open_channel_internal(Channel,ChPid), + {Connection, ChPid}; + open_channel(Channel, Mod, Fun, State) -> ChPid = spawn_link(Mod, Fun, [self(), Channel, State#v1.sock, State#v1.connection]), + open_channel_internal(Channel,ChPid), + {State,ChPid}. + +open_channel_internal(Channel, ChPid) -> put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - {State, ChPid}. + put({chpid, ChPid}, {channel, Channel}). %--------------------------------------------------------------------------- diff -r af848798cc65 src/rabbit_writer.erl --- a/src/rabbit_writer.erl Sat Jun 16 13:12:20 2007 +0100 +++ b/src/rabbit_writer.erl Wed Aug 01 00:23:22 2007 +0100 @@ -24,7 +24,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, shutdown/1, mainloop/1]). +-export([start/1, start/4, shutdown/1, mainloop/1]). -export([send_command/2, send_command/3, deliver/6, deliver/7, pause/1, unpause/2]). -export([internal_send_command/3, internal_send_command/5]). @@ -38,6 +38,9 @@ start(Sock, Channel, Connection, Tx) -> connection = Connection, next_tag = 10000, tx = Tx}]). + +start(Sock) -> + spawn_link(?MODULE, mainloop, [#wstate{sock = Sock}]). mainloop(State) -> receive