Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.starlight-search.com/llms.txt

Use this file to discover all available pages before exploring further.

This reference is auto-generated from SDK docstrings. Run python scripts/generate_api_docs.py to regenerate.

Constructor

def __init__(
    *,
    base_url: str = "https://api.starlight-search.com",
    api_key: str,
    project_id: str | None = None,
    timeout: float | httpx.Timeout = 60.0,
) -> None
Create a Reflect client for a project.
ParameterTypeDefaultDescription
base_urlstr"https://api.starlight-search.com"Reflect API base URL (e.g. http://localhost:8000).
api_keystrrequiredPlaintext API key (e.g. rf_live_…).
project_id`strNone`NoneProject identifier. If the project does not exist yet it will be created automatically (master keys only). Defaults to "default" when omitted.
timeout`floathttpx.Timeout`60.0Request timeout in seconds.
from reflect_sdk import ReflectClient

client = ReflectClient(
    base_url="http://localhost:8000",
    api_key="rf_live_abc123_secret456",
    project_id="my-project",
)

Class methods

bootstrap

@classmethod
def bootstrap(
    *,
    base_url: str,
    bootstrap_token: str,
    user_id: str,
    project_id: str,
    key_label: str = "Default Admin Key",
    environment: str = "live",
    timeout: float | httpx.Timeout = 60.0,
) -> BootstrapResponse
info = ReflectClient.bootstrap(
    base_url="http://localhost:8000",
    bootstrap_token="your-admin-token",
    user_id="user-1",
    project_id="new-project",
)
# info.api_key contains the plaintext key

Instance methods

health

def health(
) -> dict[str, str]
Return the API health status. No authentication required.

query_memories_async

async def query_memories_async(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
) -> list[MemoryResponse]
Async version of query_memories.
ParameterTypeDefaultDescription
taskstrrequired
limitint10
lambda_float0.5
metadata_filter`dict[str, Any]None`None
similarity_threshold`floatNone`None
mmr_lambdafloat0.7

augment_with_memories_async

async def augment_with_memories_async(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
) -> AugmentedTask
Async version of augment_with_memories.
ParameterTypeDefaultDescription
taskstrrequired
limitint10
lambda_float0.5
metadata_filter`dict[str, Any]None`None
similarity_threshold`floatNone`None
mmr_lambdafloat0.7

query_memories

def query_memories(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
) -> list[MemoryResponse]
Retrieve memories by semantic similarity and Q-value ranking.
ParameterTypeDefaultDescription
taskstrrequiredTask description to search against.
limitint10Maximum number of memories to return.
lambda_float0.5Blend between similarity (1.0) and Q-value (0.0). Default 0.5.
metadata_filter`dict[str, Any]None`NoneOptional additional metadata key/value pairs that memories must match (ANDed with internal filters).
similarity_threshold`floatNone`NoneOptional minimum cosine similarity override. When omitted, the server’s configured default is used.
mmr_lambdafloat0.7MMR diversity weight applied after the utility blend. 1.0 disables diversity (pure utility ranking). 0.0 is pure diversity. Default 0.7 returns a relevance-leaning set with mild redundancy suppression.
Returns: List of MemoryResponse objects, ranked by blended score.

get_skill

def get_skill(
) -> SkillResponse | None
Return the project skill, or None if no skill has been created yet.

get_skill_async

async def get_skill_async(
) -> SkillResponse | None
Async variant of get_skill.

create_skill

def create_skill(
    *,
    n_passed: int | None = None,
    n_failed: int | None = None,
) -> SkillResponse
ParameterTypeDefaultDescription
n_passed`intNone`None
n_failed`intNone`None

augment_with_memories

def augment_with_memories(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
) -> AugmentedTask
Query memories and format them into the task text for prompt augmentation.
ParameterTypeDefaultDescription
taskstrrequiredTask to augment.
limitint10Maximum memories to retrieve.
lambda_float0.5Blend between similarity and Q-value.
metadata_filter`dict[str, Any]None`NoneOptional additional metadata key/value pairs that memories must match (ANDed with internal filters).
similarity_threshold`floatNone`NoneOptional minimum cosine similarity override. When omitted, the server’s configured default is used.
mmr_lambdafloat0.7MMR diversity weight applied after the utility blend. 1.0 disables diversity. Default 0.7.
Returns: AugmentedTask with augmented_task (task + memory blocks) and memories.

trace

def trace(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
    blocking: bool = False,
    auto_fail_on_exception: bool = True,
    reference_context: str | None = None,
    alpha: float | None = None,
) -> Generator[TraceContext, None, None]
Context manager that retrieves memories and auto-submits the trace. On entry, memories are queried and made available via ctx.augmented_task and ctx.memories. Call ctx.set_output(...) inside the block to record your agent’s result. On exit, the trace is submitted with the correct retrieved_memory_ids automatically.
ParameterTypeDefaultDescription
taskstrrequiredTask description for memory retrieval and trace logging.
limitint10Maximum memories to retrieve.
lambda_float0.5Blend between similarity and Q-value.
metadata_filter`dict[str, Any]None`None
similarity_threshold`floatNone`None
mmr_lambdafloat0.7
blockingboolFalseIf True, wait for memory creation before returning from the context (uses create_trace_and_wait).
auto_fail_on_exceptionboolTrueIf True and an unhandled exception occurs after set_output was called, the trace is submitted with result="fail" and the exception message as feedback_text.
reference_context`strNone`None
alpha`floatNone`None

trace_async

def trace_async(
    *,
    task: str,
    limit: int = 10,
    lambda_: float = 0.5,
    metadata_filter: dict[str, Any] | None = None,
    similarity_threshold: float | None = None,
    mmr_lambda: float = 0.7,
    blocking: bool = False,
    auto_fail_on_exception: bool = True,
    reference_context: str | None = None,
    alpha: float | None = None,
) -> AsyncGenerator[TraceContext, None]
Async version of trace.
ParameterTypeDefaultDescription
taskstrrequired
limitint10
lambda_float0.5
metadata_filter`dict[str, Any]None`None
similarity_threshold`floatNone`None
mmr_lambdafloat0.7
blockingboolFalse
auto_fail_on_exceptionboolTrue
reference_context`strNone`None
alpha`floatNone`None

create_trace

def create_trace(
    *,
    task: str,
    trajectory: TrajectoryInput,
    final_response: str | None = None,
    retrieved_memory_ids: Sequence[str] = (),
    model: str | None = None,
    metadata: dict[str, Any] | None = None,
    review_result: str | None = None,
    feedback_text: str | None = None,
    reference_context: str | None = None,
    alpha: float | None = None,
) -> TraceCreateResponse
Record your agent’s run without blocking your application. Call this after your agent finishes a task to send the full conversation to Reflect for storage. The call returns immediately with a TraceCreateResponse — the trace is ingested in the background, so your agent can move on to the next request without waiting. If you already know whether the response was correct (e.g. you compared it to an expected answer), pass review_result to include an inline review. Reflect will then generate a reflection and store it as a new memory in the background, so future runs of your agent can learn from this outcome. If you don’t know the result yet, omit review_result and review later via review_trace or the web dashboard.
ParameterTypeDefaultDescription
taskstrrequiredWhat your agent was asked to do — e.g. the user’s question or the job description. Reflect uses this to match memories on future runs, so be descriptive.
trajectoryTrajectoryInputrequiredThe conversation between the user and your agent. Pass a list of {"role": ..., "content": ...} message dicts (the same format most LLM APIs return), or a JSON string that deserializes to such a list.
final_response`strNone`NoneYour agent’s final answer. When None, Reflect extracts it from the last "assistant" message in the trajectory automatically.
retrieved_memory_idsSequence[str]()IDs of the memories your agent used during this run (from query_memories or augment_with_memories). Passing these lets Reflect update their Q-values when a review comes in, reinforcing helpful memories and down-ranking unhelpful ones.
model`strNone`NoneThe model your agent used (e.g. "gpt-5.4-mini"). Shown in the dashboard for filtering and analysis.
metadata`dict[str, Any]None`NoneAny extra context you want to attach — e.g. {"customer_id": "c42", "environment": "staging"}. Visible in the dashboard and useful for filtering.
review_result`strNone`NoneJudge whether the response was correct: "pass" or "success" if it was, "fail" or "failure" if not. When provided, Reflect generates a reflection from the conversation and stores it as a memory so your agent improves over time.
feedback_text`strNone`NoneWhen the response failed, explain what went wrong — e.g. "Missed the WHERE clause" or "Gave an answer about the wrong product". This feedback is included in the generated reflection so your agent learns the specific mistake. Ignored when review_result is None.
reference_context`strNone`None
alpha`floatNone`None
Returns: A TraceCreateResponse with the trace id and its ingest_status (typically "queued"). Example — log your agent’s run for later review
# After your agent responds to a user...
submission = client.create_trace(
    task="Summarize this article about climate change",
    trajectory=[
        {"role": "user", "content": "Summarize this article: ..."},
        {"role": "assistant", "content": "Here is a summary: ..."},
    ],
    model="gpt-5.4-mini",
    metadata={"user_id": "u123"},
)
# Returns immediately — your app continues serving requests.
# Review this trace later in the dashboard or via review_trace().
Example — log and review in one call (auto-graded)
# Compare the agent's answer to the expected answer...
is_correct = agent_answer.strip() == expected_answer.strip()

submission = client.create_trace(
    task=problem_description,
    trajectory=messages,
    retrieved_memory_ids=[m.id for m in memories],
    model="gpt-5.4-mini",
    review_result="pass" if is_correct else "fail",
    feedback_text=None if is_correct else f"Expected {expected_answer}",
)
# Reflect generates a reflection in the background.
# Next time your agent sees a similar task, it can retrieve
# this memory to avoid repeating the same mistake.
"success" and "failure" are aliases for the API’s "pass" and "fail". The SDK maps them automatically.

create_trace_async

async def create_trace_async(
    *,
    task: str,
    trajectory: TrajectoryInput,
    final_response: str | None = None,
    retrieved_memory_ids: Sequence[str] = (),
    model: str | None = None,
    metadata: dict[str, Any] | None = None,
    review_result: str | None = None,
    feedback_text: str | None = None,
    reference_context: str | None = None,
    alpha: float | None = None,
) -> TraceCreateResponse
Async variant of create_trace. Same parameters and return type. Uses asyncio.sleep between polls where applicable.

wait_for_trace

def wait_for_trace(
    *,
    trace_id: str,
    require_reviewed: bool = False,
    poll_interval: float = 0.25,
    wait_timeout: float = 60.0,
) -> TraceResponse
ParameterTypeDefaultDescription
trace_idstrrequired
require_reviewedboolFalse
poll_intervalfloat0.25
wait_timeoutfloat60.0

wait_for_trace_async

async def wait_for_trace_async(
    *,
    trace_id: str,
    require_reviewed: bool = False,
    poll_interval: float = 0.25,
    wait_timeout: float = 60.0,
) -> TraceResponse
Async variant of wait_for_trace. Same parameters and return type. Uses asyncio.sleep between polls where applicable.

create_trace_and_wait

def create_trace_and_wait(
    *,
    task: str,
    trajectory: TrajectoryInput,
    final_response: str | None = None,
    retrieved_memory_ids: Sequence[str] = (),
    model: str | None = None,
    metadata: dict[str, Any] | None = None,
    review_result: str | None = None,
    feedback_text: str | None = None,
    reference_context: str | None = None,
    alpha: float | None = None,
    poll_interval: float = 0.25,
    wait_timeout: float = 60.0,
) -> TraceResponse
Record your agent’s run and wait until the memory is created. Use this when your next step depends on the trace (and its memory) being fully processed — for example:
  • Evaluation loops where you run multiple tasks in sequence and need each memory to exist before the next task starts, so your agent can learn from earlier mistakes within the same run.
  • Tests where you want to assert on the created memory or the final review status.
  • Scripts and pipelines where you need confirmation that the reflection was stored before moving on.
This method submits the trace, then polls until it’s done:
  • Without review_result: waits until the trace is stored.
  • With review_result: waits until the review is processed, the reflection is generated, and the memory is saved. The returned TraceResponse will have review_status == "reviewed" and a populated created_memory_id.
If your application serves real-time traffic and you don’t want to block, use create_trace instead — it returns immediately while processing happens in the background.
ParameterTypeDefaultDescription
taskstrrequiredWhat your agent was asked to do. Reflect uses this to match memories on future runs, so be descriptive.
trajectoryTrajectoryInputrequiredThe conversation between the user and your agent — a list of {"role": ..., "content": ...} message dicts, or a JSON string.
final_response`strNone`NoneYour agent’s final answer. When None, Reflect extracts it from the last assistant message provided in the trajectory.
retrieved_memory_idsSequence[str]()IDs of the memories your agent used (from query_memories or augment_with_memories). Passing these lets Reflect update their Q-values based on the review.
model`strNone`NoneThe model your agent used (e.g. "gpt-5.4-mini"). Shown in the dashboard for filtering.
metadata`dict[str, Any]None`NoneExtra context to attach — e.g. {"source": "eval_pipeline", "run_id": "r42"}.
review_result`strNone`NoneJudge whether the response was correct: "pass" / "success" or "fail" / "failure". When provided, this method waits for the reflection to be generated and stored as a memory before returning.
feedback_text`strNone`NoneWhen the response failed, explain what went wrong so the reflection captures the specific mistake. Ignored when review_result is None.
reference_context`strNone`None
alpha`floatNone`None
poll_intervalfloat0.25How often (in seconds) to check whether processing is done. Default 0.25.
wait_timeoutfloat60.0Maximum seconds to wait. Raise TimeoutError if the trace is still processing. Default 60.0. Increase this if your reflections use a slow model.
Returns: The fully processed TraceResponse with the attached ReviewResponse and created_memory_id (when reviewed). Raises: RuntimeError — If processing fails — e.g. the LLM errored while generating the reflection. Check trace.last_ingest_error for details. TimeoutError — If processing doesn’t finish within wait_timeout seconds. Example — evaluation loop that learns across tasks
for problem in problems:
    # Retrieve memories from previous tasks in this run
    augmented = client.augment_with_memories(problem.question)
    answer = my_agent.solve(augmented.augmented_task)
    is_correct = answer.strip() == problem.expected.strip()

    trace = client.create_trace_and_wait(
        task=problem.question,
        trajectory=augmented_messages,
        final_response=answer,
        retrieved_memory_ids=[m.id for m in augmented.memories],
        model="gpt-5.4-mini",
        review_result="pass" if is_correct else "fail",
        feedback_text=None if is_correct else f"Expected {problem.expected}",
    )
    # MemoryResponse is now stored — the next iteration can retrieve it.
Example — interactive CLI with human review
answer = my_agent.solve(task)
result = input("Was this correct? [y/n]: ")

trace = client.create_trace_and_wait(
    task=task,
    trajectory=messages,
    final_response=answer,
    retrieved_memory_ids=[m.id for m in memories],
    review_result="pass" if result == "y" else "fail",
    feedback_text=input("Feedback: ") if result != "y" else None,
)
print(f"MemoryResponse created: {trace.created_memory_id}")
"success" and "failure" are aliases for the API’s "pass" and "fail". The SDK maps them automatically.

create_trace_and_wait_async

async def create_trace_and_wait_async(
    *,
    task: str,
    trajectory: TrajectoryInput,
    final_response: str | None = None,
    retrieved_memory_ids: Sequence[str] = (),
    model: str | None = None,
    metadata: dict[str, Any] | None = None,
    review_result: str | None = None,
    feedback_text: str | None = None,
    reference_context: str | None = None,
    alpha: float | None = None,
    poll_interval: float = 0.25,
    wait_timeout: float = 60.0,
) -> TraceResponse
Async variant of create_trace_and_wait. Same parameters and return type. Uses asyncio.sleep between polls where applicable.

list_traces

def list_traces(
    *,
    review_status: str | None = None,
) -> list[TraceResponse]
List traces for the project.
ParameterTypeDefaultDescription
review_status`strNone`NoneFilter by “pending”, “reviewed”, or None for all.
Returns: List of TraceResponse objects.

get_trace

def get_trace(
    trace_id: str,
) -> TraceResponse
Fetch a single trace by ID.
ParameterTypeDefaultDescription
trace_idstrrequired

get_trace_async

async def get_trace_async(
    trace_id: str,
) -> TraceResponse
Async variant of get_trace. Same parameters and return type. Uses asyncio.sleep between polls where applicable.

review_trace

def review_trace(
    *,
    trace_id: str,
    result: str,
    feedback_text: str | None = None,
    alpha: float | None = None,
) -> TraceResponse
Submit a deferred review for a trace.
ParameterTypeDefaultDescription
trace_idstrrequiredID of the trace to review.
resultstrrequired"pass" / "fail", or "success" / "failure" (aliases).
feedback_text`strNone`NoneOptional human feedback.
alpha`floatNone`NoneOptional Q-learning step size override for this review. When omitted, the server’s configured default is used. Must be in [0, 1].
Returns: The updated TraceResponse with the review attached.

delete_traces

def delete_traces(
    trace_ids: Sequence[str],
) -> DeleteTracesResponse
ParameterTypeDefaultDescription
trace_idsSequence[str]required

list_api_keys

def list_api_keys(
) -> list[ApiKeyResponse]

create_api_key

def create_api_key(
    *,
    label: str,
    scopes: Sequence[str],
    environment: str = "live",
) -> ApiKeyCreateResponse
ParameterTypeDefaultDescription
labelstrrequired
scopesSequence[str]required
environmentstr"live"

revoke_api_key

def revoke_api_key(
    key_id: str,
) -> ApiKeyResponse
ParameterTypeDefaultDescription
key_idstrrequired

delete_project

def delete_project(
) -> DeleteProjectResponse