Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISubsriber.WithPrefix and IConnectionMultiplexer.WithPrefix implementations #896

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Current package versions:
- Adds: `last-in` and `cur-in` (bytes) to timeout exceptions to help identify timeouts that were just-behind another large payload off the wire ([#2276 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2276))
- Adds: general-purpose tunnel support, with HTTP proxy "connect" support included ([#2274 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2274))
- Removes: Package dependency (`System.Diagnostics.PerformanceCounter`) ([#2285 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2285))
- Adds: `.WithChannelPrefix()` to `ISubscriber` for key-preifixing channels like the existing `.WithKeyPreifx()` on `IDatabase` ([#896 by pecanw](https://github.com/StackExchange/StackExchange.Redis/pull/896))

## 2.6.70

Expand Down
122 changes: 122 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
using System;
using System.Net;
using System.Threading.Tasks;

namespace StackExchange.Redis.KeyspaceIsolation
{
internal class KeyPrefixedSubscriber : ISubscriber
{
public ISubscriber Inner { get; }

internal RedisChannel Prefix { get; }

public KeyPrefixedSubscriber(ISubscriber inner, byte[] prefix)
{
Inner = inner;
Prefix = new RedisChannel(prefix, RedisChannel.PatternMode.Literal);
}

public IConnectionMultiplexer Multiplexer => Inner.Multiplexer;

public EndPoint? IdentifyEndpoint(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpoint(ToInner(channel), flags);

public Task<EndPoint?> IdentifyEndpointAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) => Inner.IdentifyEndpointAsync(ToInner(channel), flags);

public bool IsConnected(RedisChannel channel = default(RedisChannel)) => Inner.IsConnected(ToInner(channel));

public TimeSpan Ping(CommandFlags flags = CommandFlags.None) => Inner.Ping(flags);

public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None) => Inner.PingAsync(flags);

public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.Publish(ToInner(channel), message, flags);

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) => Inner.PublishAsync(ToInner(channel), message, flags);

public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None) =>
Inner.Subscribe(ToInner(channel), flags);

public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None) =>
Inner.Subscribe(ToInner(channel), ToInner(handler), flags);

public Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None) =>
Inner.SubscribeAsync(ToInner(channel), flags);

public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None) =>
Inner.SubscribeAsync(ToInner(channel), ToInner(handler), flags);

public EndPoint? SubscribedEndpoint(RedisChannel channel) => Inner.SubscribedEndpoint(ToInner(channel));

public bool TryWait(Task task) => Inner.TryWait(task);

public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue>? handler = null, CommandFlags flags = CommandFlags.None) =>
Inner.Unsubscribe(ToInner(channel), ToInner(handler), flags);

public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
{
if (Prefix.IsNullOrEmpty)
{
Inner.UnsubscribeAll(flags);
}
else
{
Inner.Unsubscribe(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags);
}
}

public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) => Prefix.IsNullOrEmpty
? Inner.UnsubscribeAllAsync(flags)
: Inner.UnsubscribeAsync(new RedisChannel(Prefix + "*", RedisChannel.PatternMode.Pattern), null, flags);

public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue>? handler = null, CommandFlags flags = CommandFlags.None) =>
Inner.UnsubscribeAsync(ToInner(channel), ToInner(handler), flags);

public void Wait(Task task) => Inner.Wait(task);

public T Wait<T>(Task<T> task) => Inner.Wait(task);

public void WaitAll(params Task[] tasks) => Inner.WaitAll(tasks);

protected Action<RedisChannel, RedisValue> ToInner(Action<RedisChannel, RedisValue>? handler) => (channel, value) => handler?.Invoke(ToOuter(channel), value);

public RedisChannel ToInner(RedisChannel outer)
{
if (Prefix.IsNullOrEmpty) return outer;

if (outer.IsNullOrEmpty) return Prefix;

byte[] outerArr = outer!;
byte[] prefixArr = Prefix!;

var innerArr = new byte[prefixArr.Length + outerArr.Length];
Buffer.BlockCopy(prefixArr, 0, innerArr, 0, prefixArr.Length);
Buffer.BlockCopy(outerArr, 0, innerArr, prefixArr.Length, outerArr.Length);

var patternMode = outer.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal;

return new RedisChannel(innerArr, patternMode);
}

public RedisChannel ToOuter(RedisChannel inner)
{
if (Prefix.IsNullOrEmpty || inner.IsNullOrEmpty) return inner;

byte[] innerArr = inner!;
byte[] prefixArr = Prefix!;

if (innerArr.Length <= prefixArr.Length) return inner;

for (var i = 0; i < prefixArr.Length; i++)
{
if (prefixArr[i] != innerArr[i]) return inner;
}

var outerLength = innerArr.Length - prefixArr.Length;
var outerArr = new byte[outerLength];
Buffer.BlockCopy(innerArr, prefixArr.Length, outerArr, 0, outerLength);

var patternMode = inner.IsPatternBased ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal;

return new RedisChannel(outerArr, patternMode);
}
}
}
47 changes: 47 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/SubscriberExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;

namespace StackExchange.Redis.KeyspaceIsolation
{
/// <summary>
/// Provides the <see cref="WithChannelPrefix"/> extension method to <see cref="ISubscriber"/>.
/// </summary>
public static class SubscriberExtensions
{
/// <summary>
/// Creates a new <see cref="ISubscriber"/> instance that provides an isolated channel namespace
/// of the specified underlying subscriber instance.
/// </summary>
/// <param name="subscriber">
/// The underlying subscriber instance that the returned instance shall use.
/// </param>
/// <param name="channelPrefix">
/// The prefix that defines a channel namespace isolation for the returned subscriber instance.
/// </param>
/// <returns>
/// A new <see cref="ISubscriber"/> instance that invokes the specified underlying
/// <paramref name="subscriber"/> but prepends the specified <paramref name="channelPrefix"/>
/// to all channel names and thus forms a logical channel namespace isolation.
/// </returns>
public static ISubscriber WithChannelPrefix(this ISubscriber subscriber, RedisChannel channelPrefix)
{
if (subscriber == null)
{
throw new ArgumentNullException(nameof(subscriber));
}

if (channelPrefix.IsNullOrEmpty)
{
throw new ArgumentNullException(nameof(channelPrefix));
}

if (subscriber is KeyPrefixedSubscriber wrapper)
{
// combine the channel prefix in advance to minimize indirection
channelPrefix = wrapper.ToInner(channelPrefix);
subscriber = wrapper.Inner;
}

return new KeyPrefixedSubscriber(subscriber, channelPrefix!);
}
}
}
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ StackExchange.Redis.ITransaction.AddCondition(StackExchange.Redis.Condition! con
StackExchange.Redis.ITransaction.Execute(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> bool
StackExchange.Redis.ITransaction.ExecuteAsync(StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<bool>!
StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions
StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions
StackExchange.Redis.LatencyHistoryEntry
StackExchange.Redis.LatencyHistoryEntry.DurationMilliseconds.get -> int
StackExchange.Redis.LatencyHistoryEntry.LatencyHistoryEntry() -> void
Expand Down Expand Up @@ -1629,6 +1630,7 @@ static StackExchange.Redis.HashEntry.implicit operator System.Collections.Generi
static StackExchange.Redis.HashEntry.operator !=(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool
static StackExchange.Redis.HashEntry.operator ==(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool
static StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions.WithKeyPrefix(this StackExchange.Redis.IDatabase! database, StackExchange.Redis.RedisKey keyPrefix) -> StackExchange.Redis.IDatabase!
static StackExchange.Redis.KeyspaceIsolation.SubscriberExtensions.WithChannelPrefix(this StackExchange.Redis.ISubscriber! subscriber, StackExchange.Redis.RedisChannel channelPrefix) -> StackExchange.Redis.ISubscriber!
static StackExchange.Redis.Lease<T>.Create(int length, bool clear = true) -> StackExchange.Redis.Lease<T>!
static StackExchange.Redis.Lease<T>.Empty.get -> StackExchange.Redis.Lease<T>!
static StackExchange.Redis.ListPopResult.Null.get -> StackExchange.Redis.ListPopResult
Expand Down
76 changes: 76 additions & 0 deletions tests/StackExchange.Redis.Tests/KeyPrefixedSubscriberTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using StackExchange.Redis.KeyspaceIsolation;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests
{
public class KeyPrefixedSubscriberTests : TestBase
{
public KeyPrefixedSubscriberTests(ITestOutputHelper output) : base(output)
{}

[Fact]
public async Task UsePrefixForChannel()
{
using (var client = Create(allowAdmin: true))
{
const string prefix1 = "(p1)-";
const string prefix2 = "(p2)-";

var s1 = client.GetSubscriber().WithChannelPrefix(prefix1);
var s12 = client.GetSubscriber().WithChannelPrefix(prefix1);
var s2 = client.GetSubscriber().WithChannelPrefix(prefix2);
var s = client.GetSubscriber();

var l1 = new List<string>();
var l12 = new List<string>();
var l2 = new List<string>();
var l = new List<string>();
var lAll = new List<string>();
var lT1 = new List<string>();
var c1 = new List<string>();

const string channelName = "test-channel";
s1.Subscribe(channelName, (channel, value) =>
{
c1.Add(channel!);
l1.Add(value!);
});
s12.Subscribe(channelName, (_channel, value) => l12.Add(value!));
s2.Subscribe(channelName, (_channel, value) => l2.Add(value!));
s.Subscribe(channelName, (_channel, value) => l.Add(value!));
s.Subscribe("*" + channelName, (_channel, value) => lAll.Add(value!));
s.Subscribe(prefix1 + channelName, (_channel, value) => lT1.Add(value!));

s1.Publish(channelName, "value1");
s.Publish(channelName, "value");

// Give some time to pub-sub
await Task.Delay(500);

Assert.Single(l1);
Assert.Equal("value1",l1[0]);

Assert.Single(l12);
Assert.Equal("value1",l12[0]);

Assert.Empty(l2);

Assert.Single(l);
Assert.Equal("value",l[0]);

Assert.Equal(2, lAll.Count);
Assert.Contains("value", lAll);
Assert.Contains("value1", lAll);

Assert.Single(lT1);
Assert.Equal("value1",lT1[0]);

Assert.Single(c1);
Assert.Equal(channelName,c1[0]);
}
}
}
}