Skip to content

Commit

Permalink
Replace dicts with maps for queue mirroring logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Fedotov committed Apr 21, 2017
1 parent b5a07fa commit e07ca0e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/rabbit_amqqueue_process.erl
Expand Up @@ -99,7 +99,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec init_with_backing_queue_state
(rabbit_types:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), dict:dict()) ->
[rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
#q{}.

%%----------------------------------------------------------------------------
Expand Down
28 changes: 14 additions & 14 deletions src/rabbit_mirror_queue_master.erl
Expand Up @@ -57,13 +57,13 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
seen_status :: dict:dict(),
seen_status :: map(),
confirmed :: [rabbit_guid:guid()],
known_senders :: sets:set()
}.
-spec promote_backing_queue_state
(rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
dict:dict(), [pid()]) ->
map(), [pid()]) ->
master_state().

-spec sender_death_fun() -> death_fun().
Expand Down Expand Up @@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = dict:new(),
seen_status = #{},
confirmed = [],
known_senders = sets:new(),
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
Expand Down Expand Up @@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
Expand All @@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow,
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
false = dict:is_key(MsgId, SS), %% ASSERTION
false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
Expand All @@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
Expand All @@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow,
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
{false = dict:is_key(MsgId, SS), %% ASSERTION
{false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
Expand All @@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
Expand All @@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
lists:foldl(
fun (MsgId, {MsgIdsN, SSN}) ->
%% We will never see 'discarded' here
case dict:find(MsgId, SSN) of
case maps:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
{ok, published} ->
Expand All @@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
%% consequently we need to filter out the
%% confirm here. We will issue the confirm
%% when we see the publish from the channel.
{MsgIdsN, dict:store(MsgId, confirmed, SSN)};
{MsgIdsN, maps:put(MsgId, confirmed, SSN)};
{ok, confirmed} ->
%% Well, confirms are racy by definition.
{[MsgId | MsgIdsN], SSN}
Expand Down Expand Up @@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
info(backing_queue_status,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(backing_queue_status, BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
[ {mirror_seen, maps:size(State #state.seen_status)},
{mirror_senders, sets:size(State #state.known_senders)} ];
info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(Item, BQS).
Expand All @@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% it.

%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, SS) of
case maps:find(MsgId, SS) of
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
Expand All @@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
{true, State #state { seen_status = dict:erase(MsgId, SS) }};
{true, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
Expand All @@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
{true, State #state { seen_status = dict:erase(MsgId, SS),
{true, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
end.

Expand Down
56 changes: 28 additions & 28 deletions src/rabbit_mirror_queue_slave.erl
Expand Up @@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
rate_timer_ref = undefined,
sync_timer_ref = undefined,

sender_queues = dict:new(),
msg_id_ack = dict:new(),
sender_queues = #{},
msg_id_ack = #{},

msg_id_status = dict:new(),
msg_id_status = #{},
known_senders = pmon:new(delegate),

depth_delta = undefined
Expand Down Expand Up @@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer},
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
S = fun({MA, TRefN, BQSN}) ->
State1#state{depth_delta = undefined,
msg_id_ack = dict:from_list(MA),
msg_id_ack = maps:from_list(MA),
rate_timer_ref = TRefN,
backing_queue_state = BQSN}
end,
Expand Down Expand Up @@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
id = MsgId,
is_persistent = true } },
MS, #state { q = #amqqueue { durable = true } }) ->
dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
Expand All @@ -559,20 +559,20 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
lists:foldl(
fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
case dict:find(MsgId, MSN) of
case maps:find(MsgId, MSN) of
error ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
{ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
{CMsN, dict:store(MsgId, confirmed, MSN)};
{CMsN, maps:put(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
maps:remove(MsgId, MSN)};
{ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
Expand Down Expand Up @@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Master, or MTC in queue_process.

St = [published, confirmed, discarded],
SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],

MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),

MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
Deliveries = [promote_delivery(Delivery) ||
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
{_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
Expand Down Expand Up @@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
case dict:find(ChPid, SQ) of
case maps:find(ChPid, SQ) of
error ->
State;
{ok, {MQ, PendCh, ChStateRecord}} ->
case forget_sender(ChState, ChStateRecord) of
true ->
credit_flow:peer_down(ChPid),
State #state { sender_queues = dict:erase(ChPid, SQ),
State #state { sender_queues = maps:remove(ChPid, SQ),
msg_id_status = lists:foldl(
fun dict:erase/2,
fun maps:remove/2,
MS, sets:to_list(PendCh)),
known_senders = pmon:demonitor(ChPid, KS) };
false ->
SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
State #state { sender_queues = SQ1 }
end
end.
Expand All @@ -823,32 +823,32 @@ maybe_enqueue_message(
send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
case maps:find(MsgId, MS) of
error ->
{MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
Status, Delivery, dict:erase(MsgId, MS), State1),
Status, Delivery, maps:remove(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
end.

get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
case maps:find(ChPid, SQ) of
error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.

remove_from_pending_ch(MsgId, ChPid, SQ) ->
case dict:find(ChPid, SQ) of
case maps:find(ChPid, SQ) of
error ->
SQ;
{ok, {MQ, PendingCh, ChState}} ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
SQ)
end.

Expand All @@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId,
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, Status, MS)};
maps:put(MsgId, Status, MS)};
{{value, Delivery = #delivery {
message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
Expand All @@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.


Expand Down Expand Up @@ -1002,17 +1002,17 @@ msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
case maps:find(MsgId, MA) of
error -> {Acc, MAN};
{ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
{ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.

maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.

set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
Expand Down
5 changes: 3 additions & 2 deletions src/rabbit_queue_consumers.erl
Expand Up @@ -64,7 +64,8 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
non_neg_integer(), rabbit_framing:amqp_table()}].
non_neg_integer(), rabbit_framing:amqp_table(),
rabbit_types:username()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
Expand Down Expand Up @@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
orddict:update_counter(CTag, 1, CTagCounts), QTail);
{{value, V}, QTail} ->
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
{empty, _} ->
{empty, _} ->
subtract_acks([], Prefix, CTagCounts, AckQ)
end.

Expand Down

0 comments on commit e07ca0e

Please sign in to comment.