BeamAgent.Orchestrator (beam_agent_ex v0.1.0)

Copy Markdown View Source

Canonical BeamAgent orchestration primitives.

BeamAgent.Orchestrator exposes process-free parent-child execution mechanics over canonical runs, sessions, threads, and the durable journal. It does not start a worker pool or scheduler inside BeamAgent.

Child execution truth still lives in BeamAgent.Runs. The orchestrator layer adds explicit cross-session lineage and convenience APIs for delegation, collection, and cancellation.

Summary

Functions

Wait for a run to reach a terminal state by polling the canonical run store.

Cancel a run and any active orchestrated descendants.

Clear all orchestrator lineage state.

Collect the canonical orchestration view for a run.

Create a delegated child run under a parent run.

Ensure the orchestrator ETS tables exist.

List direct orchestrator children for a parent run.

Create a child orchestration record, optionally opening a child session or thread substrate.

Return a summary status map for a run and its direct children.

Types

await_result()

@type await_result() :: %{
  :status => :completed | :failed | :cancelled,
  :run => BeamAgent.Runs.run(),
  optional(:output) => term(),
  optional(:error) => term(),
  optional(:cancel_reason) => term()
}

child()

@type child() :: %{
  :relation => :spawned | :delegated,
  :substrate => :run | :session | :thread | :session_thread,
  :parent_run_id => binary(),
  :run => BeamAgent.Runs.run(),
  :metadata => map(),
  optional(:task) => term(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:session_ref) => pid(),
  optional(:owns_session) => boolean(),
  optional(:stop_session) => boolean(),
  optional(:thread) => map()
}

child_status()

@type child_status() :: %{
  :run => BeamAgent.Runs.run(),
  :step_count => non_neg_integer(),
  :active_step_count => non_neg_integer(),
  :child_count => non_neg_integer(),
  :active_child_count => non_neg_integer(),
  :awaitable => boolean(),
  optional(:relation) => :spawned | :delegated,
  optional(:parent_run_id) => binary(),
  optional(:substrate) => :run | :session | :thread | :session_thread,
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:metadata) => map(),
  optional(:task) => term()
}

collect_opts()

@type collect_opts() :: %{
  optional(:include_steps) => boolean(),
  optional(:include_journal) => boolean(),
  optional(:include_descendants) => boolean()
}

collect_result()

@type collect_result() :: %{
  :run => BeamAgent.Runs.run(),
  :children => [child()],
  optional(:descendants) => [child()],
  optional(:steps) => [BeamAgent.Runs.step()],
  optional(:journal) => [BeamAgent.Journal.entry()],
  optional(:link) => map()
}

parent()

@type parent() :: binary() | BeamAgent.Runs.run()

session_target()

@type session_target() ::
  :inherit
  | :none
  | binary()
  | pid()
  | %{:kind => :live, :ref => pid(), optional(:stop_session) => boolean()}
  | %{kind: :session_id, id: binary()}
  | %{:kind => :routed, :opts => map(), optional(:stop_session) => boolean()}

spawn_opts()

@type spawn_opts() :: %{
  optional(:run_id) => binary(),
  optional(:kind) => atom() | binary(),
  optional(:input) => term(),
  optional(:metadata) => map(),
  optional(:session) => session_target(),
  optional(:thread) => thread_target()
}

thread_target()

@type thread_target() ::
  :inherit | :none | binary() | %{thread_id: binary()} | %{start: map()}

Functions

await(run_id, timeout)

@spec await(binary(), non_neg_integer()) ::
  {:ok, await_result()} | {:error, :timeout | :not_found | term()}

Wait for a run to reach a terminal state by polling the canonical run store.

cancel(run_id, reason)

@spec cancel(binary(), term()) :: :ok | {:error, term()}

Cancel a run and any active orchestrated descendants.

clear()

@spec clear() :: :ok

Clear all orchestrator lineage state.

collect(run_id, opts)

@spec collect(binary(), collect_opts()) :: {:ok, collect_result()} | {:error, term()}

Collect the canonical orchestration view for a run.

delegate(parent, task, opts)

@spec delegate(parent(), term(), map()) ::
  {:ok, BeamAgent.Runs.run()} | {:error, term()}

Create a delegated child run under a parent run.

ensure_tables()

@spec ensure_tables() :: :ok

Ensure the orchestrator ETS tables exist.

list_children(parent)

@spec list_children(parent()) ::
  {:ok, [child()]} | {:error, :parent_not_found | {:invalid_parent, binary()}}

List direct orchestrator children for a parent run.

spawn(parent, opts)

@spec spawn(parent(), spawn_opts()) :: {:ok, child()} | {:error, term()}

Create a child orchestration record, optionally opening a child session or thread substrate.

status(run_id)

@spec status(binary()) :: {:ok, child_status()} | {:error, :not_found}

Return a summary status map for a run and its direct children.