Streaming
Streaming lets you subscribe to updates of the agent run as it proceeds. This is useful for showing end-user progress updates and partial responses in real-time.
Quick Start
To stream, call StreamWithResult(), which returns a *stream.Result. Use the StreamEvents() method to get an iterator over streaming events:
result, err := runner.StreamWithResult(ctx, agent, messages)
if err != nil {
return err
}
for event, err := range result.StreamEvents(ctx) {
if err != nil {
return err
}
// Handle event
}
Event Types
There are three main types of streaming events:
1. Raw Response Events
RawResponseEvent contains raw events passed directly from the LLM in OpenAI Responses API format. These are useful for streaming response messages to the user as they're generated.
Example - Streaming text token-by-token:
for event, err := range result.StreamEvents(ctx) {
if err != nil {
return err
}
if raw, ok := event.(*stream.RawResponseEvent); ok {
if raw.Type == "response.output_text.delta" {
if data, ok := raw.Data.(map[string]any); ok {
if delta, ok := data["delta"].(string); ok {
fmt.Print(delta) // Print each token as it arrives
}
}
}
}
}
Common raw event types:
- response.created - Response started
- response.output_text.delta - Text content delta
- response.function_call_arguments.delta - Function argument delta
- response.output_item.done - Output item completed
- response.completed - Response finished
2. Run Item Events
RunItemEvent represents high-level semantic events that inform you when an item has been fully generated. This allows progress updates at the level of "message generated", "tool ran", etc.
Example - Tracking agent actions:
for event, err := range result.StreamEvents(ctx) {
if err != nil {
return err
}
if runItem, ok := event.(*stream.RunItemEvent); ok {
switch runItem.Name {
case string(stream.MessageOutputCreated):
fmt.Println("💬 Agent responded")
case string(stream.ToolCalled):
fmt.Println("🔧 Tool called")
case string(stream.ToolOutput):
fmt.Println("📤 Tool completed")
case string(stream.HandoffRequested):
fmt.Println("🔄 Handoff requested")
case string(stream.HandoffOccurred):
fmt.Println("✅ Handoff completed")
}
}
}
Available RunItemEvent names:
- MessageOutputCreated - New message output
- ToolCalled - Tool was called
- ToolOutput - Tool output received
- HandoffRequested - Handoff requested
- HandoffOccurred - Handoff completed
- ReasoningItemCreated - Reasoning item created
3. Approval Required Events
ApprovalRequiredEvent is emitted when a tool call requires human approval before execution can proceed. The stream will terminate with a ToolApprovalRequiredError after this event. Use Runner.Resume() to continue.
for event, err := range result.StreamEvents(ctx) {
if err != nil {
var approvalErr *agents.ToolApprovalRequiredError
if errors.As(err, &approvalErr) {
// Collect approval decisions, then resume
result, err = runner.Resume(ctx, approvalErr.State, approvals)
}
return err
}
if approval, ok := event.(*stream.ApprovalRequiredEvent); ok {
fmt.Printf("⚠️ Tool approval needed: %v\n", approval.Requests)
}
}
4. Agent Updated Events
AgentUpdatedEvent provides updates when the current agent changes (e.g., during handoffs).
Example:
for event, err := range result.StreamEvents(ctx) {
if err != nil {
return err
}
if agentUpdate, ok := event.(*stream.AgentUpdatedEvent); ok {
fmt.Printf("👤 Now talking to: %v\n", agentUpdate.NewAgent)
}
}
Cancellation
You can cancel a streaming run using the Cancel() method with two modes:
Immediate Cancellation
Stops immediately, cancels all tasks, and clears event queues:
Graceful Cancellation
Completes the current turn gracefully before stopping:
Example with context:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result, err := runner.StreamWithResult(ctx, agent, messages)
// Cancel after 5 seconds
go func() {
time.Sleep(5 * time.Second)
result.Cancel(stream.CancelAfterTurn)
}()
for event, err := range result.StreamEvents(ctx) {
// Handle events
}
Examples
Example 1: Basic Text Streaming
See examples/18_streaming_basic for a complete example of streaming text responses.
Example 2: Function Argument Streaming
See examples/19_streaming_function_args for real-time function call argument streaming.
Example 3: Semantic Event Streaming
See examples/20_streaming_semantic for high-level progress tracking with semantic events.
Best Practices
1. Choose the Right Event Type
- Raw events: Use when you need token-by-token updates or real-time function arguments
- Semantic events: Use for high-level progress indicators and UI state updates
- Agent events: Use to track multi-agent conversations
2. Handle Errors Properly
Always check for errors in the event loop:
for event, err := range result.StreamEvents(ctx) {
if err != nil {
log.Printf("Stream error: %v", err)
return err
}
// Handle event
}
3. Use Context for Timeout Control
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := runner.StreamWithResult(ctx, agent, messages)
4. Filter Events for Performance
If you only need certain events, filter early:
for event, err := range result.StreamEvents(ctx) {
if err != nil {
return err
}
// Ignore raw events if you only care about semantic events
if _, ok := event.(*stream.RawResponseEvent); ok {
continue
}
// Process only semantic events
if runItem, ok := event.(*stream.RunItemEvent); ok {
// Handle
}
}
Comparison with Python SDK
The Go streaming API is designed for parity with the Python SDK:
| Feature | Python | Go |
|---|---|---|
| Raw events | RawResponsesStreamEvent |
*stream.RawResponseEvent |
| Semantic events | RunItemStreamEvent |
*stream.RunItemEvent |
| Approval events | — | *stream.ApprovalRequiredEvent |
| Agent updates | AgentUpdatedStreamEvent |
*stream.AgentUpdatedEvent |
| Iteration | async for event in result.stream_events() |
for event, err := range result.StreamEvents(ctx) |
| Cancellation | result.cancel(mode="after_turn") |
result.Cancel(stream.CancelAfterTurn) |
Technical Details
Iterator Pattern
The Go SDK uses Go 1.23+ iterators (iter.Seq2[Event, error]) for idiomatic iteration:
This provides: - Automatic cleanup when iteration stops - Context cancellation support - Error handling in the iteration - Memory-efficient streaming
Thread Safety
All stream.Result methods are thread-safe and can be called concurrently.
Event Ordering
Events include sequence numbers to maintain ordering even when generated concurrently.