Unified public API for the BeamAgent SDK -- an Elixir/OTP wrapper around five agentic coder backends: Claude, Codex, Gemini, OpenCode, and Copilot.
BeamAgent is the primary entry point for session lifecycle, queries,
streaming, threads, session history, durable runs, long-term memory, journal-backed
domain events, and policy-driven backend routing. Domain-specific features --
skills, apps, files, MCP, accounts, search, configuration, and more --
live in focused submodules (see Submodules below) and work identically
across all five backends thanks to native-first routing with universal
fallbacks.
Quick Start
Start a session, send a query, and process the response:
{:ok, session} =
BeamAgent.start_session(%{
backend: :auto,
routing: %{policy: :preferred_then_fallback, preferred_backends: [:claude, :codex]}
})
{:ok, messages} = BeamAgent.query(session, "What is the BEAM?")
for %{content: content} <- messages do
IO.puts(content)
end
:ok = BeamAgent.stop(session)Streaming with Events
Subscribe to events before sending a query for real-time streaming:
{:ok, session} = BeamAgent.start_session(%{backend: :claude})
{:ok, ref} = BeamAgent.event_subscribe(session)
{:ok, _messages} = BeamAgent.query(session, "Explain OTP")
defp loop(session, ref) do
case BeamAgent.receive_event(session, ref, 10_000) do
{:ok, %{type: :result}} ->
IO.puts("Done.")
{:ok, %{type: :text, content: content}} ->
IO.write(content)
loop(session, ref)
{:ok, _other} ->
loop(session, ref)
{:error, :complete} ->
IO.puts("Stream complete.")
{:error, :timeout} ->
IO.puts("Timed out.")
end
endOr use the convenience stream!/3 function for an Enumerable:
session
|> BeamAgent.stream!("Explain GenServer")
|> Enum.each(fn msg -> IO.write(msg[:content] || "") end)Key Concepts
Sessions
A session is a supervised gen_statem process that owns a single transport
connection to a backend CLI. Sessions are started with start_session/1
and stopped with stop/1. Each session has a unique binary session_id,
tracks message history, and can host multiple conversation threads.
Events
Events provide a streaming view of session activity. Call
event_subscribe/1 to register the calling process as a subscriber,
then receive_event/2 to pull events one at a time. Events are
delivered as normalized message/0 maps. The stream ends with an
{:error, :complete} sentinel after a result or error message.
Threads
Threads group related queries into named conversation contexts within
a session. Use thread_start/2 to create a thread, thread_resume/2 to
switch to it, and thread_list/1 to enumerate threads. Each thread
tracks its own message history as a subset of the session history.
Hooks
SDK-level lifecycle hooks fire at well-defined points (session start,
query start, tool use, etc.). Pass hook definitions in session opts
via the :sdk_hooks key. Hooks run in-process and cannot block the
engine state machine.
MCP (Model Context Protocol)
MCP lets you define custom tools as Erlang/Elixir functions that the
backend can invoke in-process. See BeamAgent.MCP for server
registration, status inspection, and runtime toggling.
Providers
Providers represent authentication/API endpoints for a backend. See
BeamAgent.Provider for provider selection and OAuth flows. Provider
management is most relevant for backends that support multiple API
endpoints (e.g., OpenCode with different LLM providers).
Architecture
Every public function in this module follows the native_or routing
pattern: it first attempts the backend's native implementation, and if
the backend returns {:error, {:unsupported_native_call, _}}, it falls
back to a universal implementation in one of the core modules.
The call chain is: BeamAgent -> :beam_agent -> :beam_agent_core ->
:beam_agent_router -> :beam_agent_session_engine -> backend handler.
This thin wrapper design means BeamAgent contains zero business logic --
it is purely a delegation layer.
Submodules
Domain-specific functions are organized into focused submodules.
BeamAgent retains session lifecycle, streaming, and convenience wrappers.
BeamAgent.Account-- authentication, login/logout, rate limitsBeamAgent.Artifacts-- typed artifact and context storageBeamAgent.Audit-- durable audit records layered on the journalBeamAgent.Apps-- project/app management and modesBeamAgent.Capabilities-- backend capability matrix and checksBeamAgent.Catalog-- tools, skills, plugins, agents, models, commandsBeamAgent.Checkpoint-- file checkpoint and rewind operationsBeamAgent.Command-- shell commands, session messages, async promptsBeamAgent.Config-- session configuration read/writeBeamAgent.Control-- turn steering, realtime, reviews, server managementBeamAgent.Context-- context pressure, summaries, and policy-driven compactionBeamAgent.File-- file search, read, list, and statusBeamAgent.Hooks-- SDK lifecycle hook definitions and dispatchBeamAgent.Journal-- durable canonical domain-event journalBeamAgent.Memory-- long-term memory and lexical recallBeamAgent.MCP-- MCP server/tool registration and managementBeamAgent.Orchestrator-- parent-child orchestration and delegation lineageBeamAgent.Policy-- reusable allow/deny policy profilesBeamAgent.Provider-- LLM provider selection, OAuth flowsBeamAgent.Raw-- escape-hatch functions for backend-native callsBeamAgent.Routing-- backend routing and policy-driven selectionBeamAgent.Routines-- durable routines and caller-driven scheduled executionBeamAgent.Runtime-- runtime state, model/mode switching, interruptsBeamAgent.Runs-- canonical run and step lifecycleBeamAgent.Search-- fuzzy file search sessionsBeamAgent.SessionStore-- session history storage and retrievalBeamAgent.Skills-- skill listing, remote export, configurationBeamAgent.Threads-- thread management within sessions
Core concepts
- Sessions: A session is a connection to one AI backend. You start one, send queries, and stop it when done. Think of it like a phone call to an AI.
- The five backends: BeamAgent wraps Claude, Codex, Gemini, OpenCode, and
Copilot. You can pick one directly or ask BeamAgent routing to select one
with
backend: :auto, and the API is the same for all of them. - Queries:
query/2sends a prompt and waits for the complete response.event_subscribe/1+receive_event/2gives you streaming responses piece by piece. - Session pid: Every session is an Erlang process. The
pidyou get fromstart_session/1is how you talk to it. Pass it as the first argument to every function. - Native-first routing: Most functions try the backend's own implementation first. If the backend doesn't support a feature natively, BeamAgent uses a universal OTP-based fallback. You don't need to think about this — it just works.
Architecture deep dive
- Delegation chain:
BeamAgentdelegates to:beam_agent(Erlang), which delegates to:beam_agent_core, which routes through:beam_agent_session_engine(agen_statem) to the backend handler. Zero business logic lives inBeamAgentor:beam_agent. - Session engine: Each session is a single
gen_statemprocess that owns the transport (Erlang port, HTTP client, or WebSocket). No additional processes are spawned per session. - native_or pattern: The Erlang
native_or/4macro triesAdapterModule:Function(Session, Args...)and falls back to a closure if the adapter doesn't export the function. Universal fallbacks use ETS-backed core modules (beam_agent_*_core). - Transport architecture: Three transport types --
beam_agent_transport_port(stdio),beam_agent_transport_http(HTTP), andbeam_agent_transport_ws(WebSocket). The handler'sinit_handler/1callback selects the transport. - Thick framework, thin adapters: The session engine handles lifecycle, queuing, telemetry, buffering, and consumer management. Backend handlers only implement protocol encoding/decoding and message normalization.
Summary
Types
Active share metadata returned by share_session/1 and share_session/2.
Backend identifier atom.
A normalized message map flowing through the SDK.
The normalized message type tag.
Permission mode controlling tool and edit approval.
Result from a permission handler callback.
Function that pulls the next message from a session event stream.
Session metadata map returned by list_sessions/0 and related functions.
Share metadata map returned by share_session/1 and share_session/2.
Normalized stop reason from the backend.
Summary metadata map returned by summarize_session/1 and summarize_session/2.
Predicate that determines if a message terminates collection.
Functions
Resolve the backend identifier for a running session.
Build a supervisor child spec for embedding a session in a supervision tree.
Collect messages from a subscription until a result message or deadline.
Collect messages with a custom terminal predicate.
Delete a session and all its messages from the universal store.
Stream session events as an Enumerable (returns tagged tuples).
Stream session events as an Enumerable (raises on errors).
Subscribe the calling process to streaming events from a session.
Remove an event subscription and flush any pending events from the mailbox.
Create a fork (copy) of a session's metadata and message history.
Get messages from the backend's native session store.
Get messages from the backend's native session store with options.
Get metadata for a specific session by identifier.
Get all messages for a session from the universal store.
Get messages for a session with filtering options.
Return the current health state of a session as an atom.
Initialize ETS tables with default settings (public access).
Initialize ETS tables with the given options.
List all registered backend identifiers.
List sessions from the backend's native session store.
List sessions from the backend's native session store with filters.
List all tracked sessions from the universal session store.
List tracked sessions with optional filters.
Generate a unique request identifier.
Normalize a raw wire-format message into the SDK message format.
Parse a raw permission mode value into a permission_mode/0 atom.
Parse a raw stop reason value into a stop_reason/0 atom.
Send a synchronous query to the session with default parameters.
Send a synchronous query with explicit parameters.
Receive the next event from a subscription with a 5-second default timeout.
Receive the next event from a subscription with an explicit timeout.
Revert a session's visible conversation state to a prior boundary.
Return the projected capabilities for a live session.
Retrieve metadata about a running session.
Change the model at runtime for a live session.
Change the permission mode at runtime for a live session.
Generate a shareable link/state for a session.
Generate a shareable link/state for a session with options.
Start a new agent session connected to a backend.
Stop a running session and close its transport connection.
Stream query responses as an Enumerable (returns tagged tuples).
Stream query responses as an Enumerable (raises on errors).
Generate and store a summary for a session's conversation history.
Generate and store a session summary with options.
Archive a thread, marking it as archived and inactive.
Compact a thread by reducing its visible message history.
Fork an existing thread, copying its visible message history.
Fork an existing thread with options.
List all threads for a session, sorted by updated_at descending.
List threads for a session with backend-specific options.
List loaded (in-memory) threads for a session.
List loaded threads for a session with filter options.
Merge a metadata patch into a thread's metadata map.
Rename a thread.
Read thread metadata and optionally its message history.
Read thread metadata with options.
Resume an existing thread by its identifier.
Resume an existing thread with backend-specific options.
Rollback a thread's visible message history to a prior boundary.
Start a new conversation thread within a session.
Unarchive a previously archived thread, restoring it to active status.
Unsubscribe from a thread and clear it as the active thread if applicable.
Clear any revert state and restore the full visible message history.
Revoke the current share state for a session.
Types
@type backend() :: :beam_agent.backend()
Backend identifier atom.
One of :claude, :codex, :gemini, :opencode, or :copilot.
Used throughout the SDK to select which backend adapter handles a session.
@type message() :: :beam_agent.message()
A normalized message map flowing through the SDK.
Every message carries a required :type field (see message_type/0) and
optional fields that vary by type. Common fields present on most messages:
:uuid (unique identifier), :session_id, :content, and :timestamp.
Result messages additionally carry :duration_ms, :num_turns,
:stop_reason_atom, :usage, and :total_cost_usd. Tool-use messages
carry :tool_name and :tool_input. Error messages carry :category
(an atom for structured error handling: :rate_limit,
:subscription_exhausted, :context_exceeded, :auth_expired,
:server_error, or :unknown), optionally :retry_after (seconds),
and optionally :error_type (backend-specific: :tool_error,
:session_error, :subagent_failed).
@type message_type() :: :beam_agent.message_type()
The normalized message type tag.
Values: :text, :assistant, :tool_use, :tool_result, :system,
:result, :error, :user, :control_request,
:control_response, :stream_event, :rate_limit_event,
:tool_progress, :tool_use_summary, :thinking, :auth_status,
:prompt_suggestion, :raw.
The :result type signals query completion. The :error type signals
a backend error; every error message carries a :category atom
(:rate_limit, :subscription_exhausted, :context_exceeded,
:auth_expired, :server_error, or :unknown) and optionally
:retry_after (seconds) when the backend provides it.
The :raw type preserves unrecognized wire messages
verbatim so normalization stays lossless.
@type permission_mode() :: :beam_agent.permission_mode()
Permission mode controlling tool and edit approval.
Values: :default (normal approval flow), :accept_edits (auto-approve
file edits), :bypass_permissions (approve everything),
:plan (read-only planning mode), :dont_ask (TypeScript SDK only,
auto-approve without prompting).
@type permission_result() :: :beam_agent.permission_result()
Result from a permission handler callback.
Variants:
{:allow, updated_input}-- approve with optional input modifications{:deny, reason}-- deny with a human-readable reason{:deny, reason, interrupt}-- deny and request turn interruption{:allow, updated_input, rule_update}-- approve with rule/permission updatesmap()-- richer structured result with keys like:behavior,:updated_input,:updated_permissions,:message, and:interrupt
@type receive_fun() :: :beam_agent.receive_fun()
Function that pulls the next message from a session event stream.
Signature: fun(session, ref, timeout) -> {:ok, message()} | {:error, term()}.
Used by collect_messages/4 and collect_messages/5 to abstract the
message retrieval mechanism.
@type session_info_map() :: %{ session_id: binary(), adapter: atom(), created_at: integer(), cwd: binary(), extra: map(), message_count: non_neg_integer(), model: binary(), updated_at: integer() }
Session metadata map returned by list_sessions/0 and related functions.
Contains :session_id, :adapter, :created_at, :cwd, :extra,
:message_count, :model, and :updated_at fields.
@type stop_reason() :: :beam_agent.stop_reason()
Normalized stop reason from the backend.
Values: :end_turn (normal completion), :max_tokens (output truncated),
:stop_sequence (custom stop sequence hit), :refusal (model declined),
:tool_use_stop (stopped for tool use), :unknown_stop (unrecognized).
Parsed from the binary wire format into atoms for pattern matching.
@type summary_info_map() :: %{ content: binary(), generated_at: integer(), generated_by: binary(), message_count: non_neg_integer(), session_id: binary() }
Summary metadata map returned by summarize_session/1 and summarize_session/2.
Contains :content, :generated_at, :generated_by, :message_count,
and :session_id fields.
@type terminal_pred() :: :beam_agent.terminal_pred()
Predicate that determines if a message terminates collection.
Returns true for messages that should halt the collect_messages loop
(the halting message is included in the result list). Returns false
for messages that should continue collection. The default predicate
checks for type: :result.
Functions
Resolve the backend identifier for a running session.
Parameters
session-- pid of a running session.
Returns
{:ok, backend}wherebackendis an atom like:claude,:codex,:gemini,:opencode, or:copilot.{:error, reason}if the backend cannot be determined.
Build a supervisor child spec for embedding a session in a supervision tree.
Returns an OTP child_spec map suitable for passing to
Supervisor.start_child/2 or including in a supervisor init/1 return value.
Parameters
opts-- session configuration map (same asstart_session/1).
Examples
child_spec = BeamAgent.child_spec(%{backend: :claude})
{:ok, _pid} = Supervisor.start_child(MySupervisor, child_spec)
Collect messages from a subscription until a result message or deadline.
Loops calling receive_fun to pull messages from the subscription
identified by ref. Accumulates messages until either a message with
type: :result arrives or the wall-clock deadline is reached.
Returns all collected messages in order.
This is the building block behind query/2 synchronous semantics.
Parameters
session-- pid of a running session.ref-- subscription reference.deadline-- monotonic time deadline in milliseconds.receive_fun-- areceive_fun/0that pulls the next message.
Returns
{:ok, messages}or{:error, reason}.
Collect messages with a custom terminal predicate.
Same as collect_messages/4 but stops when terminal_pred returns true
for a message instead of checking for type: :result. This allows callers
to define their own completion condition (e.g., stop on the first
:tool_use message, or after N text chunks).
Parameters
session-- pid of a running session.ref-- subscription reference.deadline-- monotonic time deadline in milliseconds.receive_fun-- areceive_fun/0that pulls the next message.terminal_pred-- aterminal_pred/0function.
Returns
{:ok, messages}or{:error, reason}.
@spec delete_session(binary()) :: :ok
Delete a session and all its messages from the universal store.
Also signals completion to any active event subscribers for that session. Idempotent -- deleting a nonexistent session is a no-op.
Parameters
session_id-- binary session identifier.
Returns
:ok
@spec event_stream(pid(), keyword() | map()) :: Enumerable.t()
Stream session events as an Enumerable (returns tagged tuples).
Like event_stream!/2 but wraps each event in {:ok, event} and errors
in {:error, reason} instead of raising. Suitable for pipelines that
need to handle errors gracefully.
Parameters
session-- pid of a running session.opts-- optional keyword list or map. Optional keys::timeout-- event receive timeout in milliseconds (default 30,000).
Returns
An Enumerable.t() of {:ok, message()} or {:error, reason} tuples.
@spec event_stream!(pid(), keyword() | map()) :: Enumerable.t()
Stream session events as an Enumerable (raises on errors).
Subscribes to session events and returns a lazy Stream that yields
each event as it arrives. Automatically unsubscribes on stream
completion. Raises on subscription failure or stream errors.
Parameters
session-- pid of a running session.opts-- optional keyword list or map. Optional keys::timeout-- event receive timeout in milliseconds (default 30,000).
Returns
An Enumerable.t() of message/0 maps.
Examples
session
|> BeamAgent.event_stream!()
|> Enum.take(10)
|> Enum.each(&IO.inspect/1)
Subscribe the calling process to streaming events from a session.
After subscribing, the caller receives events via receive_event/2 or
receive_event/3. Events are normalized message/0 maps delivered in
real time as the backend produces them. The stream ends with an
{:error, :complete} sentinel after a result or error message.
Parameters
session-- pid of a running session.
Returns
{:ok, ref}whererefis a unique subscription reference.{:error, reason}on failure.
Examples
{:ok, ref} = BeamAgent.event_subscribe(session)
{:ok, _messages} = BeamAgent.query(session, "Hello")
{:ok, event} = BeamAgent.receive_event(session, ref, 5_000)
Remove an event subscription and flush any pending events from the mailbox.
Parameters
session-- pid of a running session.ref-- subscription reference fromevent_subscribe/1.
Returns
{:ok, :ok}on success.{:error, :bad_ref}if the reference is invalid.
@spec fork_session(binary(), map()) :: {:ok, session_info_map()} | {:error, :not_found}
Create a fork (copy) of a session's metadata and message history.
The new session receives a copy of all messages and metadata from the source session.
Parameters
session_id-- binary id of the source session.opts-- fork options map. Optional keys::session_id-- explicit id for the fork (auto-generated if omitted):include_hidden-- include reverted messages (defaulttrue):extra-- additional metadata to merge into the fork
Returns
{:ok, fork_meta}or{:error, :not_found}.
Get messages from the backend's native session store.
Falls back to get_session_messages/1 if native message retrieval is
not supported by the backend.
Parameters
session_id-- binary session identifier.
Returns
{:ok, messages}or{:error, reason}.
Get messages from the backend's native session store with options.
Falls back to get_session_messages/2 if native retrieval is not supported.
Parameters
session_id-- binary session identifier.opts-- backend-specific message filter options.
Returns
{:ok, messages}or{:error, reason}.
@spec get_session(binary()) :: {:ok, session_info_map()} | {:error, :not_found}
Get metadata for a specific session by identifier.
Parameters
session_id-- binary session identifier.
Returns
{:ok, session_meta}or{:error, :not_found}.
Get all messages for a session from the universal store.
Returns the full message history in chronological order.
Parameters
session_id-- binary session identifier.
Returns
{:ok, messages}or{:error, :not_found}if no session exists with that identifier.
Examples
{:ok, messages} = BeamAgent.get_session_messages("sess_abc123")
IO.puts("Total messages: #{length(messages)}")
Get messages for a session with filtering options.
Parameters
session_id-- binary session identifier.opts-- filter map with optional keys::limit-- maximum number of messages to return:offset-- skip this many messages from the start:types-- list ofmessage_type/0atoms to include:include_hidden-- iftrue, include reverted/hidden messages
Returns
{:ok, messages}or{:error, :not_found}.
Return the current health state of a session as an atom.
Possible values depend on the session engine state: :connecting,
:initializing, :ready, :active_query, :error, or :unknown.
Parameters
session-- pid of a running session.
Initialize ETS tables with default settings (public access).
Equivalent to init(%{}). Must be called before any SDK functions that
touch ETS. This is idempotent -- calling it again after initialization is
a no-op.
Examples
:ok = BeamAgent.init()
Initialize ETS tables with the given options.
Options
:table_access--:public(default) or:hardened
In :public mode, all tables use public access. Any process can read and
write. In :hardened mode, a linked helper process is spawned to own
protected tables and proxy writes, while reads remain zero-cost from any
process.
This function is idempotent. Calling it again after initialization is a
no-op that returns :ok. Should be called early in the consumer's init/1
callback, before any SDK functions that touch ETS.
Examples
:ok = BeamAgent.init(%{table_access: :hardened})
List all registered backend identifiers.
Returns a list of atoms representing the backends available in this
build of the SDK (e.g., [:claude, :codex, :gemini, :opencode, :copilot]).
Examples
iex> backends = BeamAgent.list_backends()
iex> :claude in backends
true
List sessions from the backend's native session store.
Attempts to call the Claude backend's native session listing. Falls back
to list_sessions/0 if the backend does not support native session listing.
Returns
{:ok, sessions}or{:error, reason}.
List sessions from the backend's native session store with filters.
Like list_native_sessions/0 but passes filter options to the native call.
Falls back to list_sessions/1 if native listing is not supported.
Parameters
opts-- backend-specific filter options map.
Returns
{:ok, sessions}or{:error, reason}.
@spec list_sessions() :: {:ok, [session_info_map()]}
List all tracked sessions from the universal session store.
Returns session metadata maps sorted by updated_at descending.
Sessions are tracked automatically when messages are recorded.
Returns
{:ok, sessions}where each entry is asession_info_map/0.
Examples
{:ok, sessions} = BeamAgent.list_sessions()
for s <- sessions do
IO.puts("#{s.session_id} (#{s.model})")
end
@spec list_sessions(map()) :: {:ok, [session_info_map()]}
List tracked sessions with optional filters.
Parameters
opts-- filter map with optional keys::adapter-- filter by backend atom:cwd-- filter by working directory:model-- filter by model name:limit-- maximum number of results:since-- unix millisecond timestamp lower bound onupdated_at
Returns
{:ok, sessions}sorted byupdated_atdescending.
Generate a unique request identifier.
Produces a binary UUID suitable for use as a control message request_id
or query correlation identifier.
Returns
A binary UUID string.
Normalize a raw wire-format message into the SDK message format.
Converts a backend-specific message map into the canonical message/0
format used throughout the SDK. Applies type detection from the message
content, normalizes field names, and adds any missing required keys with
default values.
Parameters
raw-- a raw message map from the backend wire format.
Returns
A normalized message/0 map.
Parse a raw permission mode value into a permission_mode/0 atom.
Accepts binaries ("auto"), strings, or atoms and returns the
corresponding permission_mode/0 atom. Unrecognized values are
mapped to :default.
Parameters
mode-- the raw permission mode value.
Returns
A permission_mode/0 atom.
Parse a raw stop reason value into a stop_reason/0 atom.
Accepts binaries ("end_turn"), strings, or atoms and returns the
corresponding stop_reason/0 atom for pattern matching. Unrecognized
values are mapped to :unknown.
Parameters
reason-- the raw stop reason value.
Returns
A stop_reason/0 atom.
Send a synchronous query to the session with default parameters.
Blocks until the backend produces a complete response (a result-type message). All intermediate messages (text chunks, tool use, thinking, etc.) are collected and returned as a list.
Parameters
session-- pid of a running session.prompt-- the user prompt as a binary string.
Returns
{:ok, messages}wheremessagesis a list of normalizedmessage/0maps in chronological order.{:error, reason}on failure.
Examples
{:ok, session} = BeamAgent.start_session(%{backend: :claude})
{:ok, messages} = BeamAgent.query(session, "What is Erlang?")
result = List.last(messages)
IO.puts(result[:content])
Send a synchronous query with explicit parameters.
Like query/2 but accepts a query options map to control model selection,
tool permissions, timeout, output format, and other query-level settings.
Parameters
session-- pid of a running session.prompt-- the user prompt as a binary string.params-- query options map. Keys include:model,:max_turns,:permission_mode,:timeout,:max_tokens,:system_prompt,:allowed_tools,:disallowed_tools,:output_format,:thinking,:max_budget_usd,:agent, and:attachments.
Security
Attachment file paths are read without sandboxing. Do not pass untrusted
user input directly into the :attachments list. Path validation and
directory confinement, when required, are the caller's responsibility.
Returns
{:ok, messages}or{:error, reason}.
Examples
{:ok, messages} = BeamAgent.query(session, "Refactor this module", %{
model: "claude-sonnet-4-20250514",
max_turns: 5,
permission_mode: :accept_edits,
timeout: 120_000
})
Receive the next event from a subscription with a 5-second default timeout.
Equivalent to receive_event(session, ref, 5000).
Parameters
session-- pid of a running session.ref-- subscription reference fromevent_subscribe/1.
Returns
{:ok, event}whereeventis amessage/0map.{:error, :complete}when the stream has ended.{:error, :timeout}if no event arrives within 5 seconds.{:error, :bad_ref}if the subscription is invalid.
Receive the next event from a subscription with an explicit timeout.
Blocks the calling process until an event arrives, the stream completes, or the timeout expires.
Parameters
session-- pid of a running session.ref-- subscription reference fromevent_subscribe/1.timeout-- maximum wait time in milliseconds.
Returns
{:ok, event}-- amessage/0map.{:error, :complete}-- stream has ended.{:error, :timeout}-- no event within the timeout.{:error, :bad_ref}-- invalid subscription reference.
@spec revert_session(binary(), map()) :: {:ok, session_info_map()} | {:error, :invalid_selector | :not_found}
Revert a session's visible conversation state to a prior boundary.
The underlying message store remains append-only. Revert changes the
active view by storing a visible_message_count in the session metadata.
Parameters
session_id-- binary id of the source session.selector-- boundary selector map. Accepts one of:%{visible_message_count: n}-- set boundary to N messages%{message_id: id}-- set boundary to the message with this id%{uuid: id}-- set boundary to the message with this uuid
Returns
{:ok, updated_meta}or{:error, :not_found | :invalid_selector}.
Return the projected capabilities for a live session.
Resolves the backend from the running session process and returns the full
capability list. This is a convenience wrapper around
BeamAgent.Capabilities.for_session/1.
Parameters
session-- pid of a running session.
Returns
{:ok, caps}or{:error, reason}.
Retrieve metadata about a running session.
Returns a map containing :session_id, :backend, :model, current state,
working directory, and handler-specific metadata merged from the backend's
build_session_info callback.
Parameters
session-- pid of a running session.
Returns
{:ok, info_map}or{:error, reason}.
Change the model at runtime for a live session.
Sends a model-change request through the session engine to the backend
handler. Returns {:ok, model} on success or {:error, reason} if the
backend does not support runtime model switching.
Parameters
session-- pid of a running session.model-- the model identifier binary (e.g.,"claude-sonnet-4-6").
Returns
{:ok, model}or{:error, reason}.
Change the permission mode at runtime for a live session.
Sends a permission-mode-change request through the session engine to the
backend handler. Returns {:ok, mode} on success or {:error, reason} if
the backend does not support runtime permission mode changes.
Parameters
session-- pid of a running session.mode-- the permission mode binary (e.g.,"auto","acceptEdits").
Returns
{:ok, mode}or{:error, reason}.
Start a new agent session connected to a backend.
Launches a supervised gen_statem process that owns a transport connection
to the specified backend CLI. The session is ready to accept queries once
this call returns successfully.
Parameters
opts-- session configuration map. The:backendkey is required and must be one of:claude,:codex,:gemini,:opencode, or:copilot.
Returns
{:ok, pid}on success wherepidis the session process.{:error, reason}if the session could not be started.
Examples
{:ok, session} = BeamAgent.start_session(%{
backend: :claude,
model: "claude-sonnet-4-20250514",
system_prompt: "You are a helpful assistant.",
permission_mode: :default
})
Stop a running session and close its transport connection.
Gracefully shuts down the session gen_statem, closes the underlying
transport (port, HTTP, or WebSocket), and cleans up session state.
Parameters
session-- pid of a running session process.
Returns
:ok
@spec stream(pid(), binary(), keyword() | map()) :: Enumerable.t()
Stream query responses as an Enumerable (returns tagged tuples).
Like stream!/3 but wraps each message in {:ok, msg} and errors in
{:error, reason} instead of raising. Suitable for pipelines that need
to handle errors gracefully.
Parameters
session-- pid of a running session.prompt-- the user prompt as a binary string.params-- optional query parameters (keyword list or map).
Returns
An Enumerable.t() of {:ok, message()} or {:error, reason} tuples.
Examples
session
|> BeamAgent.stream("Explain OTP")
|> Enum.each(fn
{:ok, msg} -> IO.write(msg[:content] || "")
{:error, reason} -> IO.puts("Error: #{inspect(reason)}")
end)
@spec stream!(pid(), binary(), keyword() | map()) :: Enumerable.t()
Stream query responses as an Enumerable (raises on errors).
Sends a query and returns a lazy Stream that yields each response
message as it arrives. Raises on query failure or timeout.
Parameters
session-- pid of a running session.prompt-- the user prompt as a binary string.params-- optional query parameters (keyword list or map).
Returns
An Enumerable.t() of message/0 maps.
Examples
session
|> BeamAgent.stream!("Explain GenServer")
|> Enum.each(fn msg ->
IO.write(msg[:content] || "")
end)
@spec summarize_session(binary()) :: {:ok, summary_info_map()} | {:error, :not_found}
Generate and store a summary for a session's conversation history.
Produces a deterministic summary from the session's messages including the first user message and latest agent output.
Parameters
session_id-- binary id of the source session.
Returns
{:ok, summary_map}with:content,:generated_at,:message_count, and:generated_byfields.{:error, :not_found}.
@spec summarize_session(binary(), map()) :: {:ok, summary_info_map()} | {:error, :not_found}
Generate and store a session summary with options.
Parameters
session_id-- binary id of the source session.opts-- options map. Optional keys::content/:summary-- explicit summary text (skips auto-generation):generated_by-- attribution string (default"beam_agent_core")
Returns
{:ok, summary_map}or{:error, :not_found}.
Archive a thread, marking it as archived and inactive.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, updated_thread_meta}or{:error, :not_found}.
Compact a thread by reducing its visible message history.
Uses thread_rollback internally with a selector derived from the
options map. If no selector is provided, compacts to zero visible messages.
Parameters
session-- pid of a running session.opts-- compaction options map. Optional keys::thread_id-- target thread (defaults to active thread):count-- number of messages to hide from the end:visible_message_count-- set boundary directly:selector-- explicit rollback selector map
Returns
{:ok, result_map}or{:error, :not_found}.
Fork an existing thread, copying its visible message history.
Creates a new thread with a copy of all visible messages from the source
thread. Message thread_id fields are rewritten to the new thread id.
Parameters
session-- pid of a running session.thread_id-- binary identifier of the source thread.
Returns
{:ok, forked_thread_meta}on success.{:error, :not_found}if the source thread does not exist.{:error, :message_limit_reached}if the fork would exceed message limits.
@spec thread_fork(pid(), binary(), map()) :: {:ok, map()} | {:error, :not_found | :message_limit_reached}
Fork an existing thread with options.
Parameters
session-- pid of a running session.thread_id-- binary identifier of the source thread.opts-- fork options map. Optional keys::thread_id-- explicit id for the fork:name-- name for the forked thread:parent_thread_id-- override the parent reference
Returns
{:ok, forked_thread_meta}on success.{:error, :not_found}if the source thread does not exist.{:error, :message_limit_reached}if the fork would exceed message limits.
List all threads for a session, sorted by updated_at descending.
Parameters
session-- pid of a running session.
Returns
{:ok, thread_list}where each entry is a thread metadata map.{:error, reason}.
List threads for a session with backend-specific options.
Falls back to thread_list/1 if the backend does not support filtered
thread listing.
Parameters
session-- pid of a running session.opts-- backend-specific listing options.
Returns
{:ok, thread_list}or{:error, reason}.
List loaded (in-memory) threads for a session.
Returns threads with their active state, optionally filtered by the backend's native implementation.
Parameters
session-- pid of a running session.
Returns
{:ok, result_map}with:threads,:active_thread_id, and:countfields.{:error, reason}.
List loaded threads for a session with filter options.
Parameters
session-- pid of a running session.opts-- filter options map. Optional keys::include_archived-- include archived threads (defaulttrue):thread_id-- filter to a specific thread:status-- filter by thread status:limit-- maximum number of results
Returns
{:ok, result_map}or{:error, reason}.
Merge a metadata patch into a thread's metadata map.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.metadata_patch-- map of key-value pairs to merge into the thread's existing metadata.
Returns
{:ok, result_map}or{:error, :not_found}.
Rename a thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.name-- new thread name as a binary.
Returns
{:ok, result_map}or{:error, :not_found}.
Read thread metadata and optionally its message history.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, %{thread: thread_meta}}or{:error, :not_found}.
Read thread metadata with options.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.opts-- options map. Optional keys::include_messages-- iftrue, includes the:messageskey in the result
Returns
{:ok, %{thread: thread_meta, messages: [message()]}}or{:error, :not_found}.
Resume an existing thread by its identifier.
Sets the thread as the active thread for the session and updates its status to active. Subsequent queries will be associated with this thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, thread_meta}or{:error, :not_found}.
Examples
{:ok, thread} = BeamAgent.thread_resume(session, "thread_abc123")
IO.puts("Resumed: #{thread.name}")
Resume an existing thread with backend-specific options.
Like thread_resume/2 but passes additional options to the backend's
native implementation. Falls back to thread_resume/2 if the backend
does not support extended resume options.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.opts-- backend-specific resume options.
Returns
{:ok, thread_meta}or{:error, :not_found}.
Rollback a thread's visible message history to a prior boundary.
The underlying messages are preserved; only the visible window changes.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.selector-- boundary selector map. Accepts one of:%{count: n}-- hide the last N visible messages%{visible_message_count: n}-- set boundary directly%{message_id: id}or%{uuid: id}-- set boundary to a message
Returns
{:ok, updated_thread_meta}or{:error, :not_found | :invalid_selector}.
Start a new conversation thread within a session.
Creates a named thread that groups related queries. The new thread
becomes the active thread for the session. Thread messages are stored
as a subset of the session's message history, tagged with thread_id.
Parameters
session-- pid of a running session.opts-- thread options map. Optional keys::name-- human-readable thread name (defaults to thethread_id):thread_id-- explicit id (auto-generated if omitted):metadata-- arbitrary metadata map:parent_thread_id-- id of the parent thread (for fork lineage)
Returns
{:ok, thread_meta}with:thread_id,:session_id,:name,:status, and other metadata fields.{:error, reason}.
Examples
{:ok, thread} = BeamAgent.thread_start(session, %{
name: "refactor-discussion"
})
thread_id = thread.thread_id
{:ok, _messages} = BeamAgent.query(session, "Let's refactor the router")
Unarchive a previously archived thread, restoring it to active status.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, updated_thread_meta}or{:error, :not_found}.
Unsubscribe from a thread and clear it as the active thread if applicable.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, result_map}with:thread_idand:unsubscribedfields.{:error, :not_found}.
@spec unrevert_session(binary()) :: {:ok, session_info_map()} | {:error, :not_found}
Clear any revert state and restore the full visible message history.
Undoes a previous revert_session/2 call so all messages are visible again.
Parameters
session_id-- binary id of the source session.
Returns
{:ok, updated_meta}or{:error, :not_found}.