Skip to content

Observability

The HPD.MultiAgent.Observability namespace provides two drop-in observers for monitoring workflow executions: MetricsObserver for execution metrics, and TracingObserver for distributed tracing via OpenTelemetry.

Register observers on a WorkflowEventCoordinator — no reference to HPD.Events is needed.


MetricsObserver

MetricsObserver collects per-workflow and per-node execution metrics in memory. By default it retains data for the last 100 completed workflows; pass maxCompletedWorkflows to adjust:

csharp
var metrics = new MetricsObserver();                         // default: last 100
var metrics = new MetricsObserver(maxCompletedWorkflows: 500); // keep last 500
csharp
using HPD.MultiAgent;
using HPD.MultiAgent.Observability;

var metrics = new MetricsObserver();
var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(metrics);

await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
    if (evt is TextDeltaEvent delta)
        Console.Write(delta.Text);
}

// After completion, inspect metrics
var completed = metrics.CompletedWorkflows.First();
Console.WriteLine($"Duration:  {completed.Duration}");
Console.WriteLine($"Nodes:     {completed.TotalNodesExecuted} executed, {completed.FailedNodes} failed");
Console.WriteLine($"Tokens:    {completed.TotalTokens} total ({completed.TotalInputTokens} in / {completed.TotalOutputTokens} out)");
Console.WriteLine($"Tools:     {completed.TotalToolCalls} tool calls");

WorkflowMetrics properties

PropertyTypeDescription
ExecutionIdstringUnique ID for this run
WorkflowNamestring?Workflow name (if set)
StartedAtDateTimeOffsetWhen execution started
CompletedAtDateTimeOffset?When execution completed
DurationTimeSpanTotal execution time
Successbool?Whether the workflow succeeded
TotalNodesExecutedintNumber of nodes that ran
SuccessfulNodesintNodes that succeeded
FailedNodesintNodes that failed
SkippedNodesintNodes that were skipped
TotalTokensintCombined input + output tokens
TotalInputTokensintInput tokens across all nodes
TotalOutputTokensintOutput tokens across all nodes
TotalToolCallsintTool invocations across all nodes
IterationCountintLoop iterations (cyclic graphs)
NodeMetricsConcurrentDictionary<string, NodeMetrics>Per-node breakdown

NodeMetrics properties

PropertyTypeDescription
NodeIdstringNode ID
StartedAtDateTimeOffset?When the node started
DurationTimeSpan?Node execution time
Successbool?Whether the node succeeded
WasSkippedboolWhether the node was skipped
SkipReasonstring?Reason for skipping
ErrorMessagestring?Error message if failed
RetryCountintNumber of retry attempts
InputTokensintInput tokens for this node
OutputTokensintOutput tokens for this node
ToolCallCountintTool calls made by this node
ToolsCalledList<string>Names of tools called
RequiredApprovalboolWhether approval was requested
ApprovalGrantedbool?Approval outcome
ApprovalWaitTimeTimeSpan?Time spent waiting for approval
IterationintWhich iteration this node ran in

Live updates

Use OnMetricsUpdated and OnWorkflowCompleted to react in real time:

csharp
var metrics = new MetricsObserver();

metrics.OnWorkflowCompleted += m =>
{
    Console.WriteLine($"Workflow finished: {m.TotalTokens} tokens used");
};

metrics.OnMetricsUpdated += m =>
{
    // Called after every node starts or completes
};

Querying active workflows

csharp
// Active (still running)
foreach (var wf in metrics.ActiveWorkflows)
    Console.WriteLine($"Running: {wf.ExecutionId} ({wf.Duration.TotalSeconds:F1}s)");

// Recently completed (last 100 by default)
var last = metrics.CompletedWorkflows.Last();
var slowestNode = last.NodeMetrics.Values
    .OrderByDescending(n => n.Duration)
    .First();

TracingObserver

TracingObserver creates OpenTelemetry-compatible System.Diagnostics.Activity spans for each workflow and node execution. Use it with any OTel-compatible backend (Jaeger, Zipkin, OTLP, etc.).

csharp
using HPD.MultiAgent;
using HPD.MultiAgent.Observability;

var tracing = new TracingObserver();
var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(tracing);

await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
    // events flow through...
}

tracing.Dispose();

Register TracingObserver.ActivitySourceName with your OTel pipeline so the spans are exported:

csharp
// In your ASP.NET Core / hosted service setup:
services.AddOpenTelemetry()
    .WithTracing(builder => builder
        .AddSource(TracingObserver.ActivitySourceName)  // "HPD.MultiAgent"
        .AddOtlpExporter());

Span structure

Each workflow run produces a span hierarchy:

Workflow:MyWorkflow          (root span, ActivityKind.Internal)
  ├── Node:researcher        (child span per node)
  ├── Node:factChecker
  └── Node:writer

Note: The root span name is set to Workflow:MyWorkflow once the WorkflowStartedEvent arrives. It briefly starts as Workflow:unnamed during the first few milliseconds of graph initialization before the name is patched in.

Span tags

Workflow spans:

TagDescription
workflow.execution_idUnique execution ID
workflow.node_countNumber of nodes
workflow.successTrue/false
workflow.duration_msTotal duration
workflow.successful_nodesCount
workflow.failed_nodesCount
workflow.skipped_nodesCount

Node spans:

TagDescription
node.idNode ID
node.handlerHandler name
node.layerExecution layer index
workflow.execution_idParent workflow ID
node.successTrue/false
node.duration_msExecution time
node.skippedTrue if skipped
node.skip_reasonReason (if skipped)
node.errorError message (if failed)

Using a custom ActivitySource

csharp
var source = new ActivitySource("MyApp.Workflows", "2.0.0");
var tracing = new TracingObserver(source);

Cleanup

TracingObserver implements IDisposable. Call Dispose() when done to stop any open spans and dispose the ActivitySource:

csharp
tracing.Dispose();

Using both observers together

csharp
using HPD.MultiAgent;
using HPD.MultiAgent.Observability;

var metrics = new MetricsObserver();
var tracing = new TracingObserver();

var coordinator = new WorkflowEventCoordinator();
coordinator.AddObserver(metrics);
coordinator.AddObserver(tracing);

await foreach (var evt in workflow.ExecuteStreamingAsync(input, coordinator))
{
    if (evt is TextDeltaEvent delta)
        Console.Write(delta.Text);
}

tracing.Dispose();

var result = metrics.CompletedWorkflows.First();
Console.WriteLine($"Done in {result.Duration}, {result.TotalTokens} tokens");

WorkflowEventCoordinator helpers

MemberDescription
AddObserver<TEvent>(observer)Register an observer. Multiple observers are called in registration order.
HasObserversbool — true if at least one observer is registered. Useful for conditional dispatch in custom event loops.
DispatchToObserversAsync(evt, ct)Manually push an event to all registered observers (called automatically by ExecuteStreamingAsync).
Approve(requestId, ...)Approve a pending NodeApprovalRequestEvent.
Deny(requestId, reason)Deny a pending NodeApprovalRequestEvent.
Dispose()Releases internal resources. Always dispose when done.

Released under the MIT License.