Conversation thread lifecycle management for the BeamAgent SDK.
This module provides logical conversation threading within sessions. A thread groups related queries into a named conversation branch, enabling parallel workstreams, forking, archiving, and rollback within a single session.
Threads are scoped to a session: each session can have multiple threads, and
each thread tracks its own query history. Messages recorded against a thread
are also stored in the parent session for unified history via
BeamAgent.SessionStore.
When to use directly vs through BeamAgent
Most callers interact with threads through BeamAgent. Use this module
directly when you need fine-grained control over thread lifecycle, forking,
archiving, or rollback operations — for example, in a multi-agent orchestrator
or a conversation branching UI.
Quick example
# Start a new thread within a session:
{:ok, thread} = BeamAgent.Threads.thread_start("sess_001", %{name: "bug-investigation"})
# List all threads for the session:
{:ok, threads} = BeamAgent.Threads.thread_list("sess_001")
# Fork the thread to explore an alternative approach:
{:ok, fork} = BeamAgent.Threads.thread_fork("sess_001", thread.thread_id, %{
name: "alternative-design"
})
# Roll back the last 3 messages:
{:ok, _} = BeamAgent.Threads.thread_rollback("sess_001", thread.thread_id, %{count: 3})Core concepts
Thread: a named conversation branch within a session, identified by a binary thread ID. Each thread has its own message history, status (
:active,:paused,:completed,:archived), and a visible message count for rollback support.Active thread: each session tracks at most one active thread. Starting or resuming a thread sets it as the active thread for that session.
Forking:
thread_fork/3creates a new thread with a copy of the source thread's visible message history. The fork records itsparent_thread_idfor lineage tracking.Rollback:
thread_rollback/3hides messages beyond a boundary without deleting them. Thevisible_message_countfield controls which messages are visible when reading the thread.
Architecture deep dive
This module is a thin Elixir facade that defdelegates every call to the
Erlang :beam_agent_threads module.
The underlying implementation lives in :beam_agent_threads_core, which owns
two ETS tables: beam_agent_threads_core for thread metadata and
beam_agent_active_threads for active-thread tracking.
Thread messages are stored in the session-level message store with a
thread_id tag, keeping all messages queryable at both the session and thread
level.
See also: BeamAgent.SessionStore, BeamAgent.
Summary
Functions
Get the currently active thread for a session.
Archive a thread.
Clear all thread data from the store.
Clear the active thread for a session.
Delete a thread.
Ensure the thread store tables exist.
Fork an existing thread (storage layer).
Get metadata for a specific thread.
Get all visible messages for a specific thread.
List all threads for a session (storage layer).
Read a thread with its metadata.
Read a thread with optional message history.
Record a message against a thread.
Resume an existing thread by ID (storage layer).
Roll back the visible thread history (storage layer).
Set the active thread for a session.
Start a new conversation thread within a session.
Archive a thread.
Compact a thread by reducing its visible message history.
Count the number of threads for a session.
Fork an existing thread.
Fork an existing thread with options.
List all threads for a session, sorted by most-recently updated.
List threads for a session with backend-specific options.
List loaded (in-memory) threads for a session.
List loaded threads for a session with filter options.
Merge a metadata patch into a thread's metadata map.
Rename a thread.
Read a thread's metadata.
Read a thread with optional message history.
Resume an existing thread by ID, making it the active thread.
Resume an existing thread with backend-specific options.
Roll back the visible thread history.
Start a new conversation thread within a session.
Unarchive a thread, restoring it to active status.
Unsubscribe from a thread and clear it as the active thread if applicable.
Unarchive a thread.
Functions
Get the currently active thread for a session.
Returns {:ok, thread_id} with the active thread's binary ID, or
{:error, :none} if no thread is currently active.
@spec archive_thread(binary(), binary()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found}
Archive a thread.
Sets the thread status to :archived with an archived_at timestamp. The
thread data is preserved but marked as inactive.
Returns {:ok, updated_meta} or {:error, :not_found}.
@spec clear() :: :ok
Clear all thread data from the store.
Deletes every entry from the threads and active-threads tables. The tables themselves remain in place.
Returns :ok.
@spec clear_active_thread(binary()) :: :ok
Clear the active thread for a session.
After this call, the session has no active thread until one is started or resumed.
Returns :ok.
Delete a thread.
Removes the thread from the store. If this was the active thread for the session, the active thread is cleared.
Returns :ok.
@spec ensure_tables() :: :ok
Ensure the thread store tables exist.
Creates the beam_agent_threads_core and beam_agent_active_threads tables
if they do not already exist. Idempotent and safe to call from any process.
Returns :ok.
@spec fork_thread(binary(), binary(), :beam_agent_threads.thread_opts()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found | :message_limit_reached}
Fork an existing thread (storage layer).
Creates a new thread with a copy of the source thread's visible message
history. Each copied message has its thread_id rewritten to the new fork
ID. The fork records a parent_thread_id for lineage.
Returns {:ok, fork_meta}, {:error, :not_found}, or
{:error, :message_limit_reached}.
@spec get_thread(binary(), binary()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found}
Get metadata for a specific thread.
Returns {:ok, thread_meta} or {:error, :not_found}.
@spec get_thread_messages(binary(), binary()) :: {:ok, [:beam_agent_core.message()]} | {:error, :not_found}
Get all visible messages for a specific thread.
Filters session messages by thread_id tag and applies the thread's
visible_message_count boundary.
Returns {:ok, messages} sorted by recording order, or
{:error, :not_found} if the thread does not exist.
@spec list_threads(binary()) :: {:ok, [:beam_agent_threads.thread_meta()]}
List all threads for a session (storage layer).
Returns {:ok, threads} with all thread metadata maps, sorted by
updated_at descending (most recently updated first).
@spec read_thread(binary(), binary()) :: {:ok, %{ :thread => :beam_agent_threads.thread_meta(), optional(:messages) => [:beam_agent_core.message()] }} | {:error, :not_found}
Read a thread with its metadata.
Returns {:ok, %{thread: thread_meta}} on success or {:error, :not_found}.
Equivalent to read_thread/3 with an empty opts map.
@spec read_thread(binary(), binary(), map()) :: {:ok, %{ :thread => :beam_agent_threads.thread_meta(), optional(:messages) => [:beam_agent_core.message()] }} | {:error, :not_found}
Read a thread with optional message history.
opts may include:
:include_messages— iftrue, include the visible thread messages
Returns {:ok, result} where result contains :thread, optionally
:messages, or {:error, :not_found}.
@spec record_thread_message(binary(), binary(), :beam_agent_core.message()) :: :ok
Record a message against a thread.
Tags the message with the thread ID and stores it in both the thread metadata
(incrementing message_count and visible_message_count) and the
session-level message store for unified history.
Returns :ok.
@spec resume_thread(binary(), binary()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found}
Resume an existing thread by ID (storage layer).
Sets the thread status to :active and marks it as the active thread for
the session.
Returns {:ok, thread_meta} or {:error, :not_found}.
@spec rollback_thread(binary(), binary(), map()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found | :invalid_selector}
Roll back the visible thread history (storage layer).
Hides messages beyond a specified boundary without deleting them. The
visible_message_count field in thread metadata controls which messages are
visible when reading the thread.
selector is a map with one of:
:count— hide the last N visible messages:visible_message_count— set the visible boundary directly:message_idor:uuid— set the boundary to a specific message
Returns {:ok, updated_meta}, {:error, :not_found}, or
{:error, :invalid_selector}.
Set the active thread for a session.
Overwrites any previously active thread for this session.
Returns :ok.
@spec start_thread(binary(), :beam_agent_threads.thread_opts()) :: {:ok, :beam_agent_threads.thread_meta()}
Start a new conversation thread within a session.
Creates a thread entry in the store and sets it as the active thread for the
session. A thread ID is auto-generated if not provided in opts.
opts may include:
:name— human-readable thread name (binary):metadata— arbitrary metadata map:thread_id— explicit thread ID (binary, auto-generated if omitted):parent_thread_id— ID of the parent thread for lineage
Returns {:ok, thread_meta}.
Example
{:ok, thread} = BeamAgent.Threads.start_thread("sess_001", %{name: "bug-investigation"})
thread.thread_id # => "thread_a1b2c3d4"
Archive a thread.
Sets the thread status to :archived with an archived_at timestamp. The
thread data is preserved but marked as inactive.
Returns {:ok, updated_meta} or {:error, :not_found}.
Compact a thread by reducing its visible message history.
Uses thread_rollback internally with a selector derived from the
options map.
Parameters
session-- pid of a running session.opts-- compaction options map. Optional keys::thread_id-- target thread (defaults to active thread):count-- number of messages to hide from the end:visible_message_count-- set boundary directly:selector-- explicit rollback selector map
Returns
{:ok, result_map}or{:error, :not_found}.
@spec thread_count(binary()) :: non_neg_integer()
Count the number of threads for a session.
Returns the thread count as a non-negative integer.
Fork an existing thread.
Creates a new thread with a copy of the source thread's visible message
history. Each copied message has its thread_id rewritten to the new fork ID.
The fork records a parent_thread_id for lineage.
opts may include:
:thread_id— explicit ID for the fork (auto-generated if omitted):name— human-readable name for the fork:parent_thread_id— defaults to the sourcethread_id
Returns {:ok, fork_meta}, {:error, :not_found}, or
{:error, :message_limit_reached} if the per-session message limit
would be exceeded by copying messages into the fork.
Example
{:ok, fork} = BeamAgent.Threads.thread_fork("sess_001", "thread_abc", %{
name: "alternative-approach"
})
Fork an existing thread with options.
Same as thread_fork/2 but accepts an opts map for the fork's name,
explicit thread ID, and parent thread ID.
List all threads for a session, sorted by most-recently updated.
Returns {:ok, threads} with all thread metadata maps.
Example
{:ok, threads} = BeamAgent.Threads.thread_list("sess_001")
[%{thread_id: latest_id, name: name} | _] = threads
List threads for a session with backend-specific options.
Falls back to thread_list/1 if the backend does not support filtered
thread listing.
Parameters
session-- pid of a running session.opts-- backend-specific listing options.
Returns
{:ok, threads}or{:error, reason}.
List loaded (in-memory) threads for a session.
Returns threads with their active state, optionally filtered by the backend's native implementation.
Parameters
session-- pid of a running session.
Returns
{:ok, result_map}with:threads,:active_thread_id, and:count.{:error, reason}.
List loaded threads for a session with filter options.
Parameters
session-- pid of a running session.opts-- filter options map. Optional keys::include_archived-- include archived threads (defaulttrue):thread_id-- filter to a specific thread:status-- filter by thread status:limit-- maximum number of results
Returns
{:ok, result_map}or{:error, reason}.
Merge a metadata patch into a thread's metadata map.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.metadata_patch-- map of key-value pairs to merge into the thread's existing metadata.
Returns
{:ok, result_map}or{:error, :not_found}.
Rename a thread.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.name-- new thread name as a binary.
Returns
{:ok, result_map}or{:error, :not_found}.
Read a thread's metadata.
Returns {:ok, %{thread: thread_meta}} on success or {:error, :not_found}.
Equivalent to thread_read/3 with an empty opts map.
Read a thread with optional message history.
opts may include:
:include_messages— iftrue, the returned map includes a:messageskey with the thread's visible messages
Returns {:ok, result} where result contains at least a :thread key, or
{:error, :not_found}.
Resume an existing thread by ID, making it the active thread.
Sets the thread status to :active and marks it as the active thread for the
session.
Returns {:ok, thread_meta} or {:error, :not_found}.
Resume an existing thread with backend-specific options.
Like thread_resume/2 but passes additional options to the backend's
native implementation. Falls back to thread_resume/2 if the backend
does not support extended resume options.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.opts-- backend-specific resume options.
Returns
{:ok, thread_meta}or{:error, :not_found}.
Roll back the visible thread history.
Hides messages beyond a specified boundary without deleting them. The
visible_message_count field in thread metadata controls which messages are
visible when reading the thread.
selector is a map with one of:
:count— hide the last N visible messages:visible_message_count— set the visible boundary directly:message_idor:uuid— set the boundary to a specific message
Returns {:ok, updated_meta}, {:error, :not_found}, or
{:error, :invalid_selector}.
Example
# Hide the last 3 messages:
{:ok, _} = BeamAgent.Threads.thread_rollback("sess_001", "thread_abc", %{count: 3})
# Set boundary to a specific message:
{:ok, _} = BeamAgent.Threads.thread_rollback("sess_001", "thread_abc", %{uuid: "msg_xyz"})
Start a new conversation thread within a session.
Creates a thread entry in the store and sets it as the active thread for the
session. A thread ID is auto-generated if not provided in opts.
opts may include:
:name— human-readable thread name (binary):metadata— arbitrary metadata map:thread_id— explicit thread ID (binary, auto-generated if omitted):parent_thread_id— ID of the parent thread for lineage
Returns {:ok, thread_meta}.
Example
{:ok, thread} = BeamAgent.Threads.thread_start("sess_001", %{name: "feature-work"})
thread.thread_id # => "thread_a1b2c3d4"
Unarchive a thread, restoring it to active status.
Returns {:ok, updated_meta} or {:error, :not_found}.
Unsubscribe from a thread and clear it as the active thread if applicable.
Parameters
session-- pid of a running session.thread_id-- binary thread identifier.
Returns
{:ok, result_map}with:thread_idand:unsubscribedfields.{:error, :not_found}.
@spec unarchive_thread(binary(), binary()) :: {:ok, :beam_agent_threads.thread_meta()} | {:error, :not_found}
Unarchive a thread.
Restores an archived thread to active status and clears the archived flag.
Returns {:ok, updated_meta} or {:error, :not_found}.