-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Request] Handle pipeline failure #432
Comments
What about using DataPipeline.Files.LogEntries to know what happened? work is always done on a file, and a global log can be extracted iterating the list of files. FailedStep can be calculated using Steps and RemainingSteps, there's also CompletedSteps if needed. |
I was thinking about a generic public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
// Files must be uploaded before starting any other task
await this.UploadFilesAsync(pipeline, cancellationToken).ConfigureAwait(false);
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
while (!pipeline.Complete)
{
string currentStepName = pipeline.RemainingSteps.First();
if (!this._handlers.TryGetValue(currentStepName, out var stepHandler))
{
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = $"No handler found for step '{currentStepName}'";
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("No handler found for step '{0}'", currentStepName);
throw new OrchestrationException($"No handler found for step '{currentStepName}'");
}
try
{
// Run handler
(bool success, DataPipeline updatedPipeline) = await stepHandler
.InvokeAsync(pipeline, this.CancellationTokenSource.Token)
.ConfigureAwait(false);
pipeline = updatedPipeline;
pipeline.LastUpdate = DateTimeOffset.UtcNow;
if (success)
{
this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId);
pipeline.MoveToNextStep();
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
}
else
{
pipeline.Failed = true;
pipeline.Logs = string.Join(", ", pipeline.Files.Where(f => f.LogEntries is not null).SelectMany(f => f.LogEntries!).Select(l => $"{l.Source}: {l.Text}"));
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed");
}
}
catch (Exception ex)
{
// Gets the exception and its inner message, that is some cases is more descriptive.
var failureReson = ex.Message;
if (ex.InnerException is not null)
{
failureReson += $" ({ex.InnerException.Message})";
}
pipeline.LastUpdate = DateTimeOffset.UtcNow;
pipeline.Failed = true;
pipeline.Logs = failureReson;
await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogError(ex, "Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", ex);
}
}
await this.CleanUpAfterCompletionAsync(pipeline, cancellationToken).ConfigureAwait(false);
this.Log.LogInformation("Pipeline '{0}/{1}' complete", pipeline.Index, pipeline.DocumentId);
} Note: I have renamed the |
I have tried to put my idea in a PR: #443. So, you can see if it is the approach you're thinking about. |
Context / Scenario
Currently, pipeline failures aren't handled at all:
kernel-memory/service/Abstractions/Pipeline/DataPipeline.cs
Lines 434 to 439 in 775301a
The problem
It is important to keep track of errors that occurs during pipeline exectuion.
Proposed solution
We can add a couple of properties to DataPipeline.cs:
FailureReason
can be useful to immediately obtain information about the problem, but it is not strictly necessary to implement this feature.Then, we need to handle exceptions during pipeline execution, both in InProcessPipelineOrchestrator.cs and in DistributedPipelineOrchestrator.cs.
Finally, after updating the DataPipelineStatus.cs class accordingly, we just need this code:
Importance
would be great to have
The text was updated successfully, but these errors were encountered: