Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IProfiledCommand data expansion #1454

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 56 additions & 28 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -2222,9 +2222,10 @@ private bool PrepareToPushMessageToBridge<T>(Message message, ResultProcessor<T>
}
}

var profilingSession = _profilingSessionProvider?.Invoke();

if (server != null)
{
var profilingSession = _profilingSessionProvider?.Invoke();
if (profilingSession != null)
{
message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server));
Expand All @@ -2242,9 +2243,17 @@ private bool PrepareToPushMessageToBridge<T>(Message message, ResultProcessor<T>
Trace("Queueing on server: " + message);
return true;
}
Trace("No server or server unavailable - aborting: " + message);
return false;
else
{
if (profilingSession != null)
{
message.SetProfileStorage(ProfiledCommand.NewWithContext(profilingSession, server));
}
Trace("No server or server unavailable - aborting: " + message);
return false;
}
}

private ValueTask<WriteResult> TryPushMessageToBridgeAsync<T>(Message message, ResultProcessor<T> processor, IResultBox<T> resultBox, ref ServerEndPoint server)
=> PrepareToPushMessageToBridge(message, processor, resultBox, ref server) ? server.TryWriteAsync(message) : new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);

Expand Down Expand Up @@ -2750,16 +2759,22 @@ internal Task<T> ExecuteAsyncImpl<T>(Message message, ResultProcessor<T> process
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
if (!write.IsCompletedSuccessfully) return ExecuteAsyncImpl_Awaited<T>(this, write, tcs, message, server);

var result = write.Result;
if (tcs == null)
{
if (result != WriteResult.Success && message.HasPerformance)
{
var ex = GetException(result, message, server);
message.SetExceptionAndComplete(ex, null);
}
return CompletedTask<T>.Default(null); // F+F explicitly does not get async-state
}
else
{
var result = write.Result;
if (result != WriteResult.Success)
{
var ex = GetException(result, message, server);
message.SetExceptionAndComplete(ex, null);
ThrowFailed(tcs, ex);
}
return tcs.Task;
Expand All @@ -2772,6 +2787,7 @@ private static async Task<T> ExecuteAsyncImpl_Awaited<T>(ConnectionMultiplexer @
if (result != WriteResult.Success)
{
var ex = @this.GetException(result, message, server);
message.SetExceptionAndComplete(ex, null);
ThrowFailed(tcs, ex);
}
return tcs == null ? default(T) : await tcs.Task.ForAwait();
Expand Down Expand Up @@ -2818,42 +2834,54 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
if (message.IsFireAndForget)
{
#pragma warning disable CS0618
TryPushMessageToBridgeSync(message, processor, null, ref server);
var result = TryPushMessageToBridgeSync(message, processor, null, ref server);
if (result != WriteResult.Success && message.HasPerformance)
{
message.SetExceptionAndComplete(GetException(result, message, server), null);
}
#pragma warning restore CS0618
Interlocked.Increment(ref fireAndForgets);
return default(T);
}
else
{
var source = SimpleResultBox<T>.Get();

lock (source)
try
{
var source = SimpleResultBox<T>.Get();

lock (source)
{
#pragma warning disable CS0618
var result = TryPushMessageToBridgeSync(message, processor, source, ref server);
var result = TryPushMessageToBridgeSync(message, processor, source, ref server);
#pragma warning restore CS0618
if (result != WriteResult.Success)
{
throw GetException(result, message, server);
}
if (result != WriteResult.Success)
{
throw GetException(result, message, server);
}

if (Monitor.Wait(source, TimeoutMilliseconds))
{
Trace("Timeley response to " + message);
}
else
{
Trace("Timeout performing " + message);
Interlocked.Increment(ref syncTimeouts);
throw ExceptionFactory.Timeout(this, null, message, server);
// very important not to return "source" to the pool here
if (Monitor.Wait(source, TimeoutMilliseconds))
{
Trace("Timeley response to " + message);
}
else
{
Trace("Timeout performing " + message);
Interlocked.Increment(ref syncTimeouts);
throw ExceptionFactory.Timeout(this, null, message, server);
// very important not to return "source" to the pool here
}
}
// snapshot these so that we can recycle the box
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
if (ex != null) throw ex;
Trace(message + " received " + val);
return val;
}
catch (Exception ex)
{
message.SetExceptionAndComplete(ex, null);
throw;
}
// snapshot these so that we can recycle the box
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
if (ex != null) throw ex;
Trace(message + " received " + val);
return val;
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/StackExchange.Redis/Interfaces/IKeysMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace StackExchange.Redis
{
internal interface IKeysMessage
{
RedisKey[] Keys { get; }
}
}
7 changes: 7 additions & 0 deletions src/StackExchange.Redis/Interfaces/IValuesMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace StackExchange.Redis
{
internal interface IValuesMessage
{
RedisValue[] Values { get; }
}
}