Control plane for session configuration, permissions, tasks, and feedback.
This module is the public API for the BeamAgent control layer. It manages per-session configuration state, permission and approval workflows, task lifecycle tracking, user feedback collection, and pending request/response handling for turn-based agent interactions.
All state is ETS-backed, keyed by session ID, and persists for the node lifetime or until explicitly cleared. The control layer works identically across all five backends (Claude, Codex, Gemini, OpenCode, Copilot).
When to use directly vs through BeamAgent
Use this module directly when you need to configure runtime session settings,
manage background tasks, submit feedback, or handle turn-based pending
requests without going through the higher-level BeamAgent API.
Quick example
# Set session permission mode:
:ok = BeamAgent.Control.set_permission_mode(session_id, "acceptEdits")
# Configure thinking token budget:
:ok = BeamAgent.Control.set_max_thinking_tokens(session_id, 8192)
# Dispatch a named control method:
{:ok, _} = BeamAgent.Control.dispatch(session_id, "setModel", %{
"model" => "claude-sonnet-4-6"
})
# Register and stop a background task:
:ok = BeamAgent.Control.register_task(session_id, "task-abc", self())
:ok = BeamAgent.Control.stop_task(session_id, "task-abc")Core concepts
Session Config: an ETS-backed key-value store scoped to a session ID. Arbitrary atom keys map to arbitrary term values. Convenience accessors exist for common keys (
:permission_mode,:max_thinking_tokens).Task Registration: long-running background tasks can be registered with a session so that they can be listed, monitored, and stopped via the control dispatch protocol. Each registered task also creates a linked
BeamAgent.Runsrecord so task history survives after the live task entry is removed.Callback Broker: sessions can register callback functions for permission handling, approval decisions, and user input prompts. The broker invokes these callbacks safely (catching exceptions) and falls back to configured defaults when no handler is registered.
Pending Requests: turn-based interaction protocol where the agent stores a pending request (e.g., asking for user input) and the consumer resolves it later with a response.
Architecture deep dive
This module is a thin Elixir facade that defdelegates every call to
:beam_agent_control. The underlying implementation lives in
:beam_agent_control_core, which owns five ETS tables: config, tasks,
feedback, callbacks, and pending. Task registration is bridged into
BeamAgent.Runs, so the live task table can stay ephemeral while durable run
history remains queryable.
See also: BeamAgent.Runtime, BeamAgent.Catalog, BeamAgent.
Summary
Functions
Append audio to an active realtime session (storage layer).
Append text to an active realtime session (storage layer).
Clear all control state across every session.
Clear all universal collaboration state (review and realtime sessions).
Clear all configuration for a session.
Clear all feedback entries for a session.
Clear all callback handlers for a session.
List available collaboration modes.
List canonical collaboration modes (storage layer).
Delete a persisted session from the backend server.
Dispatch a named control method to the appropriate handler.
Ensure all control ETS tables exist.
List available experimental features.
List experimental features with filter options.
List universal experimental features (storage layer).
Get all configuration for a session as a map.
Get a configuration value for a session.
Get all feedback entries for a session, in submission order.
Get the maximum thinking token budget for a session.
Get the response for a pending request.
Get the permission mode for a session.
Retrieve a single persisted session by its identifier.
List all pending requests for a session, sorted oldest first.
List all sub-agents registered on the backend server.
List all persisted sessions known to the backend server.
List all tasks registered for a session.
Register callback handlers for a session.
Register an active task for a session.
Request an approval decision through the session's callback broker.
Request permission through the session's callback broker.
Request user input through the session's callback broker.
Resolve a pending request with a response.
Start a code review session.
Check the health of the backend server.
Set a configuration value for a session.
Set the maximum thinking token budget for a session.
Set the permission mode for a session.
Start a universal realtime session for a thread (storage layer).
Start a universal code review session (storage layer).
Stop and tear down an active realtime session (storage layer).
Stop a running task by sending an interrupt to its process.
Store a pending request from the agent.
Submit feedback for a session.
Append audio data to an active realtime thread.
Append text data to an active realtime thread.
Start a realtime thread for audio/text streaming.
Stop an active realtime thread.
Interrupt an active turn in a thread.
Steer an active turn by injecting additional input mid-conversation.
Steer an active turn with additional options.
Unregister a task, removing it from the session's task list.
Functions
Append audio to an active realtime session (storage layer).
Use thread_realtime_append_audio/3 for the session-pid-based API. This
function operates directly on a session_id binary.
Returns {:ok, result}.
Append text to an active realtime session (storage layer).
Use thread_realtime_append_text/3 for the session-pid-based API. This
function operates directly on a session_id binary.
Returns {:ok, result}.
@spec clear() :: :ok
Clear all control state across every session.
Deletes all objects from every control ETS table. Use this for test cleanup
or node-wide reset. Individual session cleanup should use clear_config/1,
clear_feedback/1, and clear_session_callbacks/1 instead.
Returns :ok.
@spec clear_collaboration() :: :ok
Clear all universal collaboration state (review and realtime sessions).
Returns :ok.
@spec clear_config(binary()) :: :ok
Clear all configuration for a session.
@spec clear_feedback(binary()) :: :ok
Clear all feedback entries for a session.
@spec clear_session_callbacks(binary()) :: :ok
Clear all callback handlers for a session.
Removes every registered callback handler stored for session_id. After
this call, the session operates as if no callbacks were ever registered; the
permission_default reverts to :deny.
Returns :ok.
List available collaboration modes.
Returns the modes supported by the backend (e.g., pair programming, review, teaching).
Parameters
session-- pid of a running session.
Returns
{:ok, modes}or{:error, reason}.
@spec collaboration_modes(binary()) :: {:ok, %{modes: [map(), ...], session_id: binary(), source: :universal}}
List canonical collaboration modes (storage layer).
Use collaboration_mode_list/1 for the session-pid-based API with native
backend routing. This function operates directly on a session_id binary
and returns only the universal collaboration modes.
Returns {:ok, %{session_id: binary(), source: :universal, modes: [map()]}}.
Delete a persisted session from the backend server.
Parameters
session-- pid of a running session.session_id-- binary session identifier to delete.
Returns
{:ok, result}or{:error, reason}.
@spec dispatch(binary(), binary(), map()) :: {:ok, %{ optional(:model) => atom() | binary() | map(), optional(:permission_mode) => atom() | binary() }} | {:error, :not_found | {:invalid_param, :max_thinking_tokens} | {:missing_param, :max_thinking_tokens | :model | :permission_mode | :task_id} | {:unknown_method, binary()}}
Dispatch a named control method to the appropriate handler.
Supported methods:
"setModel"— set the model; requires"model"key in params"setPermissionMode"— set the permission mode; requires"permissionMode"key"setMaxThinkingTokens"— set the thinking token budget; requires"maxThinkingTokens"key"stopTask"— stop a running background task; requires"taskId"key
Returns {:ok, result_map} or {:error, reason}.
Example
{:ok, %{model: "claude-sonnet-4-6"}} =
BeamAgent.Control.dispatch(session_id, "setModel", %{"model" => "claude-sonnet-4-6"})
{:error, {:unknown_method, "noSuchMethod"}} =
BeamAgent.Control.dispatch(session_id, "noSuchMethod", %{})
@spec ensure_tables() :: :ok
Ensure all control ETS tables exist.
Creates the config, tasks, feedback, callbacks, and pending tables if they do not already exist. Idempotent and safe to call from any process.
Returns :ok.
List available experimental features.
Parameters
session-- pid of a running session.
Returns
{:ok, features}or{:error, reason}.
List experimental features with filter options.
Parameters
session-- pid of a running session.opts-- filter options map.
Returns
{:ok, features}or{:error, reason}.
@spec experimental_features(binary(), map()) :: {:ok, %{features: [map(), ...], session_id: binary(), source: :universal}}
List universal experimental features (storage layer).
Use experimental_feature_list/1 or experimental_feature_list/2 for the
session-pid-based API with native backend routing. This function operates
directly on a session_id binary and returns only the universal feature set.
Returns {:ok, %{session_id: binary(), source: :universal, features: [map()]}}.
Get all configuration for a session as a map.
Returns {:ok, map} — an empty map when nothing is set.
@spec get_config(binary(), atom()) :: {:ok, atom() | binary() | map() | pos_integer()} | {:error, :not_set}
Get a configuration value for a session.
Returns {:ok, value} or {:error, :not_set} when the key has not been
written.
Get all feedback entries for a session, in submission order.
@spec get_max_thinking_tokens(binary()) :: {:ok, pos_integer()} | {:error, :not_set}
Get the maximum thinking token budget for a session.
Returns {:ok, tokens} or {:error, :not_set} when no budget has been
configured.
Get the response for a pending request.
Returns {:ok, response_map} if resolved, {:error, :pending} if still
awaiting a response, or {:error, :not_found} if no such request exists.
Get the permission mode for a session.
Returns {:ok, mode} or {:error, :not_set} when no mode has been configured.
Retrieve a single persisted session by its identifier.
Parameters
session-- pid of a running session.session_id-- binary session identifier.
Returns
{:ok, session_map}or{:error, :not_found}.
@spec list_pending_requests(binary()) :: {:ok, [:beam_agent_control_core.pending_request()]}
List all pending requests for a session, sorted oldest first.
Each entry is a map with :request_id, :session_id, :request, :status,
:created_at, and optionally :response and :resolved_at. Sensitive fields
inside request/response payloads are redacted for display-safe reads.
List all sub-agents registered on the backend server.
Parameters
session-- pid of a running session.
Returns
{:ok, agents}or{:error, reason}.
List all persisted sessions known to the backend server.
Parameters
session-- pid of a running session.
Returns
{:ok, sessions}or{:error, reason}.
@spec list_tasks(binary()) :: {:ok, [:beam_agent_control_core.task_meta()]}
List all tasks registered for a session.
Returns {:ok, tasks} where each task map contains :task_id, :session_id,
:pid, :started_at (millisecond timestamp), :status (:running or
:stopped), and an optional :run_id. Stopped tasks also carry
:stopped_at.
Register callback handlers for a session.
The opts map may contain:
:permission_handler—fun(method, params, context)returning a permission result tuple:permission_default—:allowor:deny(default:deny):approval_handler—fun(method, params, context)returning:accept,:accept_for_session,:decline, or:cancel:user_input_handler—fun(request, context)returning{:ok, response}or any term
Returns :ok.
Register an active task for a session.
Associates a task ID and owning process with the session. The task is initially
marked as running. Use stop_task/2 to signal the task to stop, and
unregister_task/2 to remove it after completion.
Each registered task also creates a linked canonical run. list_tasks/1
exposes that linkage through the optional :run_id field.
Example
:ok = BeamAgent.Control.register_task(session_id, "task-abc-123", self())
{:ok, [%{task_id: "task-abc-123", status: :running}]} =
BeamAgent.Control.list_tasks(session_id)
@spec request_approval(binary(), binary(), map(), map()) :: :accept | :accept_for_session | :decline | :cancel
Request an approval decision through the session's callback broker.
Invokes the registered approval_handler (or adapts the
permission_handler to approval semantics).
Returns :accept, :accept_for_session, :decline, or :cancel.
@spec request_permission(binary(), binary(), map(), map()) :: :beam_agent_core.permission_result()
Request permission through the session's callback broker.
Invokes the registered permission_handler (or falls back to the
approval_handler adapted to permission semantics, or the
permission_default). The handler is called safely — exceptions are caught
and the default is returned.
Returns a permission result:
{:allow, params}— permission granted{:allow, params, override_default}— granted, with updated session default{:deny, reason}— permission denied{:deny, reason, cancelled}— denied, with cancellation flag
Request user input through the session's callback broker.
Stores a pending request, then invokes the registered user_input_handler
if one exists. If the handler responds, the pending request is resolved
immediately. If no handler is registered or the handler fails, the request
remains pending for external resolution via resolve_pending_request/3.
Returns {:ok, response} when the handler responds, or {:ok, pending_info}
when the request is awaiting external resolution.
@spec resolve_pending_request(binary(), binary(), map()) :: :ok | {:error, :not_found | :already_resolved}
Resolve a pending request with a response.
Marks the pending request as resolved and publishes a
pending_request_resolved event.
Returns :ok, {:error, :not_found}, or {:error, :already_resolved}.
Start a code review session.
Initialises a review workflow where the agent reviews code changes.
Parameters
session-- pid of a running session.opts-- review options map (e.g.,:diff,:branch,:files).
Returns
{:ok, result}or{:error, reason}.
Check the health of the backend server.
Returns a status map with health indicators including the backend name, session identifier, and uptime in milliseconds.
Parameters
session-- pid of a running session.
Returns
{:ok, health_map}or{:error, reason}.
Set a configuration value for a session.
Stores an arbitrary term under the given atom key, scoped to the session ID. Overwrites any previous value for the same key.
@spec set_max_thinking_tokens(binary(), pos_integer()) :: :ok
Set the maximum thinking token budget for a session.
tokens must be a positive integer. Used by backends that support extended
thinking (e.g., Claude) to cap the number of tokens the model may use for
internal reasoning.
Set the permission mode for a session.
The permission mode controls how the agent handles tool execution approvals.
Common values include "acceptEdits", "auto", and "manual". The exact
interpretation depends on the backend.
Example
:ok = BeamAgent.Control.set_permission_mode(session_id, "acceptEdits")
@spec start_realtime(binary(), map()) :: {:ok, %{ backend: any(), event_count: 1, input_summary: %{audio_chunks: 0, text_chunks: 0}, inputs: [], mode: any(), output_events: [map(), ...], params: map(), realtime_id: binary(), session_id: binary(), source: :universal, started_at: integer(), status: :active, thread_id: binary(), transport: any(), transport_metadata: map(), updated_at: integer(), voice_enabled: boolean() }}
Start a universal realtime session for a thread (storage layer).
Use thread_realtime_start/2 for the session-pid-based API with native
backend routing. This function operates directly on a session_id binary and
stores the realtime session in ETS via the universal collaboration layer.
Returns {:ok, realtime_session}.
@spec start_review(binary(), map()) :: {:ok, %{ backend: any(), comments: [map()], created_at: integer(), issues: [map()], mode: any(), params: map(), participants: [map()], resolutions: [map()], review_id: binary(), review_metrics: %{ comment_count: non_neg_integer(), issue_count: non_neg_integer(), participant_count: non_neg_integer(), resolution_count: non_neg_integer() }, session_id: binary(), source: any(), stage: any(), stage_history: [map(), ...], status: :active, target: any(), thread_id: binary(), updated_at: integer() }}
Start a universal code review session (storage layer).
Use review_start/2 for the session-pid-based API with native backend
routing. This function operates directly on a session_id binary and stores
the review session in ETS via the universal collaboration layer.
params may include :files, :diff, :review_type, :mode, :target,
:participants, :comments, :issues, and :resolutions.
Returns {:ok, review_session}.
Stop and tear down an active realtime session (storage layer).
Use thread_realtime_stop/2 for the session-pid-based API. This function
operates directly on a session_id binary.
Returns {:ok, result}.
Stop a running task by sending an interrupt to its process.
Attempts a gen_statem interrupt call first, falling back to
Process.exit(pid, :shutdown) if the call fails.
Returns :ok if the task was found and signaled, or {:error, :not_found}.
Stopping a task also cancels its linked run in BeamAgent.Runs.
Store a pending request from the agent.
Called when the agent asks for user input or needs a response before it can
continue. A pending_request_stored event is published on the session's event
bus.
Submit feedback for a session.
Feedback entries are accumulated in submission order. Each entry is augmented
with a submitted_at timestamp, session_id, and sequence number.
@spec thread_realtime_append_audio(pid() | binary(), binary(), map()) :: {:ok, map()} | {:error, term()}
Append audio data to an active realtime thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.opts-- audio data options (contains encoded audio bytes).
Returns
{:ok, result}or{:error, reason}.
@spec thread_realtime_append_text(pid() | binary(), binary(), map()) :: {:ok, map()} | {:error, term()}
Append text data to an active realtime thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.opts-- text data options (contains the text to append).
Returns
{:ok, result}or{:error, reason}.
Start a realtime thread for audio/text streaming.
Initiates a WebSocket or streaming connection for real-time interactions.
Parameters
session-- pid of a running session.opts-- realtime session options map.
Returns
{:ok, result}or{:error, reason}.
Stop an active realtime thread.
Closes the streaming connection and finalizes the realtime session.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, result}or{:error, reason}.
Interrupt an active turn in a thread.
Cancels the agent's in-progress response for the specified turn.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.turn_id-- binary turn identifier.
Returns
{:ok, result}or{:error, reason}.
Steer an active turn by injecting additional input mid-conversation.
Allows you to redirect or refine the agent's current turn within a thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.turn_id-- binary identifier of the active turn.input-- steering input (binary prompt or list of content block maps).
Returns
{:ok, result}or{:error, reason}.
@spec turn_steer(pid(), binary(), binary(), binary() | [map()], map()) :: {:ok, map()} | {:error, term()}
Steer an active turn with additional options.
Same as turn_steer/4 but accepts backend-specific options such as
:model or :system_prompt overrides for the steered response.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.turn_id-- binary turn identifier.input-- steering input.opts-- backend-specific options map.
Returns
{:ok, result}or{:error, reason}.
Unregister a task, removing it from the session's task list.
Use this after a task has completed or been cleaned up.