Skip to main content
Streaming allows you to receive task execution updates in real-time as they happen, rather than waiting for the entire task to complete. This is particularly useful for long-running tasks where you want to display progress to users or process responses incrementally.

How streaming works

When you use the streaming API, you receive events as they occur:
  • Token events: Individual tokens from the LLM response as they’re generated
  • Step events: Information about tool calls and intermediate steps during task execution
  • Done event: Final completion status and token usage statistics

Example

Here’s a complete example of how to use streaming with the Lumo SDK:
streaming_example.py
"""Streaming example for the Lumo SDK."""

from lumo_sdk import (
    LumoClient,
    StreamTokenEvent,
    StreamStepEvent,
    StreamDoneEvent,
)
from time import sleep

# Initialize the client
client = LumoClient(api_key="<api-key-here>")

# Stream a task
print("Streaming task execution...\n")
print("=" * 60)

response_text = ""
step_counter = 0

for event in client.stream_task(
    task="What is the weather in Berlin?",
    model="gpt-4.1-nano",
    base_url="https://api.openai.com/v1/chat/completions",
    tools=["ExaSearchTool", "VisitWebsite"],
    max_steps=5,
):
    if isinstance(event, StreamTokenEvent):
        # Print tokens as they arrive
        token = event.content
        response_text += token
        sleep(0.01)
        print(token, end="", flush=True)

    elif isinstance(event, StreamStepEvent):
        # Handle step events with tool calls
        step_counter += 1
        step_data = event.step

        print("\n" + "=" * 60)
        print(f"Step {step_counter}")
        print("=" * 60)

        # Display tool calls if present
        tool_calls = step_data.tool_calls
        if tool_calls:
            print(f"\nTools used: {len(tool_calls)}")
            for tool_call in tool_calls:
                tool_name = tool_call.function.name
                tool_args = tool_call.function.arguments
                print(f"\n  Tool: {tool_name}")
                if tool_args:
                    print(f"  Arguments:")
                    if isinstance(tool_args, dict):
                        for key, value in tool_args.items():
                            print(f"    {key}: {value}")
                    else:
                        print(f"    {tool_args}")

        # Display step output if present
        output = step_data.llm_output
        if output:
            print(f"\nOutput: {output}")

        print("\n" + "=" * 60)
        print("\nContinuing response...\n")

    elif isinstance(event, StreamDoneEvent):
        print("=" * 60)
        print("Token Usage:")
        print("=" * 60)
        print(event.token_usage)
        print("\n" + "=" * 60)
        print("✅ Task completed!")
        print("=" * 60)
        break

print(f"\n\nFinal response length: {len(response_text)} characters")

Event types

StreamTokenEvent

Received when new tokens are generated by the LLM. Contains:
  • content: The token string to append to the response

StreamStepEvent

Received when a step completes (e.g., after tool execution). Contains:
  • step: Step data including tool calls and LLM output

StreamDoneEvent

Received when the task completes. Contains:
  • token_usage: Token usage statistics for the entire task

Benefits of streaming

  • Real-time feedback: Show progress to users as the task executes
  • Better UX: Display partial results immediately instead of waiting for completion
  • Debugging: Monitor tool calls and intermediate steps as they happen
  • Efficiency: Process responses incrementally for large outputs