Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ds): add cache #12453

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open

Conversation

thalesmg
Copy link
Contributor

@thalesmg thalesmg commented Feb 1, 2024

Fixes https://emqx.atlassian.net/browse/EMQX-10943

Release version: v/e5.6

Summary

PR Checklist

Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:

  • Added tests for the changes
  • Added property-based tests for code which performs user input validation
  • Changed lines covered in coverage report
  • Change log has been added to changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md files
  • For internal contributor: there is a jira ticket to track this change
  • Created PR to emqx-docs if documentation update is required, or link to a follow-up jira ticket
  • Schema changes are backward compatible

Checklist for CI (.github/workflows) changes

  • If changed package build workflow, pass this action (manual trigger)
  • Change log has been added to changes/ dir for user-facing artifacts update

@thalesmg thalesmg mentioned this pull request Feb 1, 2024
9 tasks
@@ -763,7 +763,7 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli
true -> ItBegin0;
false -> ItEnd0
end,
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize, #{use_cache => true}) of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new option for the cache? Does it change the semantics of next?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantics, no. But we need a way to call next without going through the cache. One example is the cache worker: when it tries to update the cache, it should go through it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be solved by having an internal API in the replication layer that always goes directly to the RPC/RocksDB, and exposing the cached version to emqx_ds callback module (similar to how egress workers intercept calls).

Exposing this option directly in the API only makes sense when we want to give the API consumers an option to bypass cache, but I can't come up with a situation when it would be needed.

Copy link
Contributor Author

@thalesmg thalesmg Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cache is to be used by other backends, then shouldn't it be controllable from emqx_ds, like egress is as well, instead of directly going to replication_layer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Egress also has its options exposed:

-spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
store_batch(DB, Msgs, Opts) ->
?module(DB):store_batch(DB, Msgs, Opts).
-spec store_batch(db(), [emqx_types:message()]) -> store_batch_result().
store_batch(DB, Msgs) ->
store_batch(DB, Msgs, #{}).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inverted the default behavior to use the cache by default in the replication layer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options exposed by egress actually change the semantics of the call: for example, async => true means the message can be lost before being persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Still, if I understand correctly, the cache could be used by other backends. If so, the way to call next without attempting to use the case should live in emqx_ds, right?

@thalesmg thalesmg force-pushed the ds-cache-p2-mk4-m-20240131 branch 3 times, most recently from 599b503 to 4c04bc3 Compare February 2, 2024 18:37

-spec start_link(emqx_ds:db(), emqx_ds:stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
supervisor:startchild_ret().
start_link(DB, Stream, TopicFilter, StartTime) ->
Copy link
Member

@ieQu1 ieQu1 Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the long term the cache workers shouldn't care about topic filters, and simply cache the entire contents of the stream. This will allow to share the contents of the cache between different clients that potentially subscribe to different topics, that happen to be mapped to the same stream.

The consumer of the messages can use the cached contents as a substitute of RocksDB table (given the key order is the same). It then can do the post-processing of the cached messages using the topic filter and start time stored in the iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API to create iterators require the topic filter to be provided, so I guess it'll require new APIs to iterate over the stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you suggest to track # for the stream?

gen_server:start_link(
?via(#?cache_worker{db = DB, stream = Stream}),
?MODULE,
{DB, Stream, TopicFilter, StartTime},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to supply topic filter and start time to the cache worker? It seems inconsistent with the process ID, that only allows to have one cache worker for the DB and the stream.
If different different topic filters happen to map to the same stream, does it mean they will compete for the process registration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API to create iterators require the topic filter to be provided, so I don't think there's currently a way around it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If different different topic filters happen to map to the same stream, does it mean they will compete for the process registration?

Indeed, if one misconfigures the topic filters as it is here, it would happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we've discussed that the first version of the cache could use statically configured topic filters, and later make it automatically track streams based on usage or other heuristics. So the future version might not need to specify the topic filter and use #.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just found a fundamental problem with the cache right now: it is currently just relying on seqnos to detect gaps, without checking that the message topic actually matches the iterator topic filter... It'll require some rethinking. 🙈

@thalesmg thalesmg force-pushed the ds-cache-p2-mk4-m-20240131 branch 3 times, most recently from e31d32e to e9cafd6 Compare February 5, 2024 16:50
}).
-type cache_entry() :: #cache_entry{}.

-type seqno() :: non_neg_integer().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It's better to avoid defining types in the headers, since they'll end up duplicated in each module using the hrl.

-ifndef(EMQX_DS_CACHE_HRL).
-define(EMQX_DS_CACHE_HRL, true).

-define(CACHE_KEY(LASTKEY), {LASTKEY}).
Copy link
Member

@ieQu1 ieQu1 Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why the key is wrapped in a tuple (erlang term order), but it will lead to extra memory allocations and de-allocations in the happy path. For performance reasons, we should try to store as little data as possibly. Perhaps a better solution would be to add "end_of_stream" flag to some external data structure associated with the stream cache, and check it when the next iterator returns empty list, smth. like:

case next_cache(StreamCache...) of
  Ret = {ok, Messages = [], ItNext} ->
     case ets:lookup(StreamCache, ?EOF_KEY) of
        [_] ->
            {ok, end_of_stream};
        [] ->
            Ret
     end;
  ...

-record(cache_entry, {
key :: ?CACHE_KEY(emqx_ds:message_key()) | ?EOS_KEY,
seqno :: seqno(),
inserted_at :: timestamp(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the timestamp from the key? We don't have to be very precise in the cache eviction flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the key inside the message, you mean? If we assume only more or less recent keys are added to cache (i.e., we don't start caching from the distant past), I guess so.

If it's the #cache_entry.key you mean, I think there's no way to extract it from the emqx_ds:message_key() in the general case without yet another callback. 😅

@thalesmg thalesmg force-pushed the ds-cache-p2-mk4-m-20240131 branch 2 times, most recently from 0a58027 to 0b22461 Compare February 7, 2024 19:19
@thalesmg
Copy link
Contributor Author

Using non-wildcard subscriptions (t/%n)

Left: without cache
Right: with cache
image

Using wildcard subscriptions (t/#)

Left: without cache
Right: with cache
image

@thalesmg thalesmg force-pushed the ds-cache-p2-mk4-m-20240131 branch 2 times, most recently from 4bc5ceb to 6a58997 Compare February 27, 2024 17:00
@thalesmg
Copy link
Contributor Author

I ran the test with 1 k subscriber and 1 k publishers (non-wildcard) . Network usage drops a bit, but I guess it's because the received message rate also drops substantially... CPU usage is similar in both scenarios (very high, ~ 100 %). With the cache, it stays at ~ 100 % even at "rest"... 🙈

Curiously, loadgen (LG) CPU usage also seemed higher with the cache. 🤔

Left: without cache.
Right: with cache.

3rd image is LG.

emqttb

emqx

lg

@thalesmg thalesmg force-pushed the ds-cache-p2-mk4-m-20240131 branch 4 times, most recently from c86fd74 to 516c503 Compare March 5, 2024 12:12
@thalesmg thalesmg marked this pull request as ready for review March 5, 2024 13:59
@thalesmg thalesmg requested review from keynslug, a team and lafirest as code owners March 5, 2024 13:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants