Skip to content

Commit

Permalink
Merge pull request #1393 from rabbitmq/rabbitmq-server-batch-betas-lrb
Browse files Browse the repository at this point in the history
Don't use process dictionary for bump_reduce_memory_use message
  • Loading branch information
lukebakken committed Nov 2, 2017
2 parents 27bf1d6 + dff1d20 commit e811e30
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 19 deletions.
6 changes: 3 additions & 3 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1455,9 +1455,9 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
put(waiting_bump, false),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
backing_queue_state = BQS0}) ->
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
noreply(State#q{backing_queue_state = BQ:resume(BQS1)});

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
Expand Down
6 changes: 5 additions & 1 deletion src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
zip_msgs_and_acks/4, handle_info/2]).

-export([start/2, stop/1, delete_crashed/1]).

Expand Down Expand Up @@ -447,6 +447,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.

handle_info(Msg, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_info(Msg, BQS) }.

resume(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:resume(BQS) }.
Expand Down
5 changes: 4 additions & 1 deletion src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
zip_msgs_and_acks/4, handle_info/2]).

-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
Expand Down Expand Up @@ -393,6 +393,9 @@ handle_pre_hibernate(State = #state{bq = BQ}) ->
handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(handle_pre_hibernate(BQS)).

handle_info(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(handle_info(Msg, BQS)).

resume(State = #state{bq = BQ}) ->
foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State);
resume(State = #passthrough{bq = BQ, bqs = BQS}) ->
Expand Down
27 changes: 16 additions & 11 deletions src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).

-export([start/2, stop/1]).

Expand Down Expand Up @@ -325,7 +325,8 @@
memory_reduction_run_count,
%% Queue data is grouped by VHost. We need to store it
%% to work with queue index.
virtual_host
virtual_host,
waiting_bump = false
}).

-record(rates, { in, out, ack_in, ack_out, timestamp }).
Expand Down Expand Up @@ -911,6 +912,9 @@ timeout(State = #vqstate { index_state = IndexState }) ->
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.

handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
State#vqstate{ waiting_bump = false }.

resume(State) -> a(reduce_memory_use(State)).

msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
Expand Down Expand Up @@ -2466,21 +2470,16 @@ reduce_memory_use(State = #vqstate {
Blocked = credit_flow:blocked(),
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
%% Credit bump will continue paging
{true, _} -> ok;
{true, _} -> State3;
%% Finished with paging
{false, false} -> ok;
{false, false} -> State3;
%% Planning next batch
{false, true} ->
%% We don't want to use self-credit-flow, because it's harder to
%% reason about. So the process sends a (prioritised) message to
%% itself and sets a waiting_bump value to keep the message box clean
case get(waiting_bump) of
true -> ok;
_ -> self() ! bump_reduce_memory_use,
put(waiting_bump, true)
end
end,
State3;
maybe_bump_reduce_memory_use(State3)
end;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
reduce_memory_use(State = #vqstate {
Expand All @@ -2506,6 +2505,12 @@ reduce_memory_use(State = #vqstate {
garbage_collect(),
State3.

maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
State;
maybe_bump_reduce_memory_use(State) ->
self() ! bump_reduce_memory_use,
State#vqstate{ waiting_bump = true }.

limit_ram_acks(0, State) ->
{0, ui(State)};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
Expand Down
9 changes: 6 additions & 3 deletions test/channel_operation_timeout_test_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
-export([start/2, stop/1]).
start/2, stop/1, zip_msgs_and_acks/4, handle_info/2]).

%%----------------------------------------------------------------------------
%% This test backing queue follows the variable queue implementation, with
Expand Down Expand Up @@ -91,7 +90,8 @@
memory_reduction_run_count,
%% Queue data is grouped by VHost. We need to store it
%% to work with queue index.
virtual_host
virtual_host,
waiting_bump = false
}).

-record(rates, { in, out, ack_in, ack_out, timestamp }).
Expand Down Expand Up @@ -285,6 +285,9 @@ timeout(State) ->
handle_pre_hibernate(State) ->
rabbit_variable_queue:handle_pre_hibernate(State).

handle_info(Msg, State) ->
rabbit_variable_queue:handle_info(Msg, State).

resume(State) -> rabbit_variable_queue:resume(State).

msg_rates(State) ->
Expand Down

0 comments on commit e811e30

Please sign in to comment.