Observability
The HPD.MultiAgent.Observability namespace provides two drop-in observers for monitoring workflow executions: MetricsObserver for execution metrics, and TracingObserver for distributed tracing via OpenTelemetry.
Register observers on a WorkflowEventCoordinator — no reference to HPD.Events is needed.
MetricsObserver
MetricsObserver collects per-workflow and per-node execution metrics in memory. By default it retains data for the last 100 completed workflows; pass maxCompletedWorkflows to adjust:
var metrics = new MetricsObserver(); // default: last 100
var metrics = new MetricsObserver(maxCompletedWorkflows: 500); // keep last 500using HPD.MultiAgent;
using HPD.MultiAgent.Observability;
var metrics = new MetricsObserver();
var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(metrics);
await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
if (evt is TextDeltaEvent delta)
Console.Write(delta.Text);
}
// After completion, inspect metrics
var completed = metrics.CompletedWorkflows.First();
Console.WriteLine($"Duration: {completed.Duration}");
Console.WriteLine($"Nodes: {completed.TotalNodesExecuted} executed, {completed.FailedNodes} failed");
Console.WriteLine($"Tokens: {completed.TotalTokens} total ({completed.TotalInputTokens} in / {completed.TotalOutputTokens} out)");
Console.WriteLine($"Tools: {completed.TotalToolCalls} tool calls");WorkflowMetrics properties
| Property | Type | Description |
|---|---|---|
ExecutionId | string | Unique ID for this run |
WorkflowName | string? | Workflow name (if set) |
StartedAt | DateTimeOffset | When execution started |
CompletedAt | DateTimeOffset? | When execution completed |
Duration | TimeSpan | Total execution time |
Success | bool? | Whether the workflow succeeded |
TotalNodesExecuted | int | Number of nodes that ran |
SuccessfulNodes | int | Nodes that succeeded |
FailedNodes | int | Nodes that failed |
SkippedNodes | int | Nodes that were skipped |
TotalTokens | int | Combined input + output tokens |
TotalInputTokens | int | Input tokens across all nodes |
TotalOutputTokens | int | Output tokens across all nodes |
TotalToolCalls | int | Tool invocations across all nodes |
IterationCount | int | Loop iterations (cyclic graphs) |
NodeMetrics | ConcurrentDictionary<string, NodeMetrics> | Per-node breakdown |
NodeMetrics properties
| Property | Type | Description |
|---|---|---|
NodeId | string | Node ID |
StartedAt | DateTimeOffset? | When the node started |
Duration | TimeSpan? | Node execution time |
Success | bool? | Whether the node succeeded |
WasSkipped | bool | Whether the node was skipped |
SkipReason | string? | Reason for skipping |
ErrorMessage | string? | Error message if failed |
RetryCount | int | Number of retry attempts |
InputTokens | int | Input tokens for this node |
OutputTokens | int | Output tokens for this node |
ToolCallCount | int | Tool calls made by this node |
ToolsCalled | List<string> | Names of tools called |
RequiredApproval | bool | Whether approval was requested |
ApprovalGranted | bool? | Approval outcome |
ApprovalWaitTime | TimeSpan? | Time spent waiting for approval |
Iteration | int | Which iteration this node ran in |
Live updates
Use OnMetricsUpdated and OnWorkflowCompleted to react in real time:
var metrics = new MetricsObserver();
metrics.OnWorkflowCompleted += m =>
{
Console.WriteLine($"Workflow finished: {m.TotalTokens} tokens used");
};
metrics.OnMetricsUpdated += m =>
{
// Called after every node starts or completes
};Querying active workflows
// Active (still running)
foreach (var wf in metrics.ActiveWorkflows)
Console.WriteLine($"Running: {wf.ExecutionId} ({wf.Duration.TotalSeconds:F1}s)");
// Recently completed (last 100 by default)
var last = metrics.CompletedWorkflows.Last();
var slowestNode = last.NodeMetrics.Values
.OrderByDescending(n => n.Duration)
.First();TracingObserver
TracingObserver creates OpenTelemetry-compatible System.Diagnostics.Activity spans for each workflow and node execution. Use it with any OTel-compatible backend (Jaeger, Zipkin, OTLP, etc.).
using HPD.MultiAgent;
using HPD.MultiAgent.Observability;
var tracing = new TracingObserver();
var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(tracing);
await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
// events flow through...
}
tracing.Dispose();Register TracingObserver.ActivitySourceName with your OTel pipeline so the spans are exported:
// In your ASP.NET Core / hosted service setup:
services.AddOpenTelemetry()
.WithTracing(builder => builder
.AddSource(TracingObserver.ActivitySourceName) // "HPD.MultiAgent"
.AddOtlpExporter());Span structure
Each workflow run produces a span hierarchy:
Workflow:MyWorkflow (root span, ActivityKind.Internal)
├── Node:researcher (child span per node)
├── Node:factChecker
└── Node:writerNote: The root span name is set to
Workflow:MyWorkflowonce theWorkflowStartedEventarrives. It briefly starts asWorkflow:unnamedduring the first few milliseconds of graph initialization before the name is patched in.
Span tags
Workflow spans:
| Tag | Description |
|---|---|
workflow.execution_id | Unique execution ID |
workflow.node_count | Number of nodes |
workflow.success | True/false |
workflow.duration_ms | Total duration |
workflow.successful_nodes | Count |
workflow.failed_nodes | Count |
workflow.skipped_nodes | Count |
Node spans:
| Tag | Description |
|---|---|
node.id | Node ID |
node.handler | Handler name |
node.layer | Execution layer index |
workflow.execution_id | Parent workflow ID |
node.success | True/false |
node.duration_ms | Execution time |
node.skipped | True if skipped |
node.skip_reason | Reason (if skipped) |
node.error | Error message (if failed) |
Using a custom ActivitySource
var source = new ActivitySource("MyApp.Workflows", "2.0.0");
var tracing = new TracingObserver(source);Cleanup
TracingObserver implements IDisposable. Call Dispose() when done to stop any open spans and dispose the ActivitySource:
tracing.Dispose();Using both observers together
using HPD.MultiAgent;
using HPD.MultiAgent.Observability;
var metrics = new MetricsObserver();
var tracing = new TracingObserver();
var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(metrics);
coordinator.AddObserver(tracing);
await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
if (evt is TextDeltaEvent delta)
Console.Write(delta.Text);
}
tracing.Dispose();
var result = metrics.CompletedWorkflows.First();
Console.WriteLine($"Done in {result.Duration}, {result.TotalTokens} tokens");WorkflowEventCoordinator helpers
| Member | Description |
|---|---|
AddObserver<TEvent>(observer) | Register an observer. Multiple observers are called in registration order. |
HasObservers | bool — true if at least one observer is registered. Useful for conditional dispatch in custom event loops. |
DispatchToObserversAsync(evt, ct) | Manually push an event to all registered observers (called automatically by ExecuteStreamingAsync). |
Approve(requestId, ...) | Approve a pending NodeApprovalRequestEvent. |
Deny(requestId, reason) | Deny a pending NodeApprovalRequestEvent. |
Dispose() | Releases internal resources. Always dispose when done. |