BeamAgent.Runs (beam_agent_ex v0.1.0)

Copy Markdown View Source

Canonical run and step lifecycle for BeamAgent.

BeamAgent.Runs provides durable units of work that sit above transient task pids. A run can be scoped to a session, a thread, and an optional parent run. A step belongs to a run and inherits its session/thread scope.

This is the Elixir facade over the Erlang :beam_agent_runs public module. The underlying implementation is ETS-backed through :beam_agent_runs_core and :beam_agent_runs_store, so the API works uniformly across all BeamAgent backends.

Quick example

{:ok, run} =
  BeamAgent.Runs.start_run(%{session_id: "sess_001", thread_id: "thread_abc"}, %{
    kind: :workflow,
    input: %{goal: "Ship the feature"}
  })

{:ok, step} = BeamAgent.Runs.start_step(run.run_id, %{kind: :review})
{:ok, _step} = BeamAgent.Runs.complete_step(run.run_id, step.step_id, %{status: :ok})
{:ok, _run} = BeamAgent.Runs.complete_run(run.run_id, %{summary: "Done"})

Core concepts

  • Run scope: runs can reference :session_id, :thread_id, and :parent_run_id. Child runs inherit parent scope unless the caller provides the same explicit values.

  • Step inheritance: every step inherits its parent run's session/thread scope and is keyed by {run_id, step_id}.

  • Terminal safety: runs cannot complete while they still have active steps. Failing or cancelling a run cascades terminal state to active steps.

Summary

Types

Run record.

Filter map accepted by list_runs/1.

Options for start_run/2.

Run status atom.

Run scope passed to start_run/2.

Run shape returned by start_run/2.

Running step shape returned by start_step/2.

Step record.

Options for start_step/2.

Step status atom.

Terminal run shape returned by complete/fail/cancel operations.

Terminal step shape returned by complete/fail/cancel operations.

Functions

Cancel a run and cascade cancellation to active steps.

Cancel a running step.

Clear all run and step data.

Complete a run once all of its steps are terminal.

Complete a running step.

Ensure the runs ETS tables exist.

Fail a run and cascade failure to active steps.

Get a run by id.

Get a step by run id and step id.

List all runs without filters.

List runs with exact-match filters.

List steps for a run, oldest first.

Start a run with the given scope and options.

Start a step within a run.

Types

run()

@type run() :: %{
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => run_status(),
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:parent_run_id) => binary(),
  optional(:input) => term(),
  optional(:output) => term(),
  optional(:error) => term(),
  optional(:cancel_reason) => term(),
  optional(:completed_at) => integer()
}

Run record.

run_filter()

@type run_filter() :: %{
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:parent_run_id) => binary(),
  optional(:kind) => atom() | binary(),
  optional(:status) => run_status(),
  optional(:since) => integer(),
  optional(:limit) => pos_integer()
}

Filter map accepted by list_runs/1.

Supported keys:

  • :session_id
  • :thread_id
  • :parent_run_id
  • :kind
  • :status
  • :since
  • :limit

run_opts()

@type run_opts() :: %{
  optional(:run_id) => binary(),
  optional(:kind) => atom() | binary(),
  optional(:input) => term(),
  optional(:metadata) => map()
}

Options for start_run/2.

Supported keys:

  • :run_id — explicit run identifier
  • :kind — atom or binary classifier for the run
  • :input — arbitrary input payload
  • :metadata — arbitrary metadata map

run_status()

@type run_status() :: :running | :completed | :failed | :cancelled

Run status atom.

Values: :running, :completed, :failed, :cancelled.

scope()

@type scope() ::
  binary()
  | %{
      optional(:session_id) => binary(),
      optional(:thread_id) => binary(),
      optional(:parent_run_id) => binary()
    }

Run scope passed to start_run/2.

Use either a binary session id or a map containing any of :session_id, :thread_id, and :parent_run_id.

started_run()

@type started_run() :: %{
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => :running,
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  :input => term(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:parent_run_id) => binary()
}

Run shape returned by start_run/2.

started_step()

@type started_step() :: %{
  :step_id => binary(),
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => :running,
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  :input => term(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary()
}

Running step shape returned by start_step/2.

step()

@type step() :: %{
  :step_id => binary(),
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => step_status(),
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:input) => term(),
  optional(:output) => term(),
  optional(:error) => term(),
  optional(:cancel_reason) => term(),
  optional(:completed_at) => integer()
}

Step record.

step_opts()

@type step_opts() :: %{
  optional(:step_id) => binary(),
  optional(:kind) => atom() | binary(),
  optional(:input) => term(),
  optional(:metadata) => map()
}

Options for start_step/2.

Supported keys:

  • :step_id — explicit step identifier
  • :kind — atom or binary classifier for the step
  • :input — arbitrary input payload
  • :metadata — arbitrary metadata map

step_status()

@type step_status() :: :running | :completed | :failed | :cancelled

Step status atom.

Values: :running, :completed, :failed, :cancelled.

terminal_run()

@type terminal_run() :: %{
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => :completed | :failed | :cancelled,
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  :completed_at => integer(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:parent_run_id) => binary(),
  optional(:input) => term(),
  optional(:output) => term(),
  optional(:error) => term(),
  optional(:cancel_reason) => term()
}

Terminal run shape returned by complete/fail/cancel operations.

terminal_step()

@type terminal_step() :: %{
  :step_id => binary(),
  :run_id => binary(),
  :kind => atom() | binary(),
  :status => :completed | :failed | :cancelled,
  :metadata => map(),
  :created_at => integer(),
  :updated_at => integer(),
  :completed_at => integer(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:input) => term(),
  optional(:output) => term(),
  optional(:error) => term(),
  optional(:cancel_reason) => term()
}

Terminal step shape returned by complete/fail/cancel operations.

Functions

cancel_run(run_id, reason)

@spec cancel_run(binary(), term()) ::
  {:ok, terminal_run()}
  | {:error,
     :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed,
        :cancelled}}

Cancel a run and cascade cancellation to active steps.

cancel_step(run_id, step_id, reason)

@spec cancel_step(binary(), binary(), term()) ::
  {:ok, terminal_step()}
  | {:error,
     :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed,
        :cancelled}}

Cancel a running step.

clear()

@spec clear() :: :ok

Clear all run and step data.

complete_run(run_id, result)

@spec complete_run(binary(), term()) ::
  {:ok, terminal_run()}
  | {:error,
     :active_steps
     | :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed,
        :completed}}

Complete a run once all of its steps are terminal.

complete_step(run_id, step_id, result)

@spec complete_step(binary(), binary(), term()) ::
  {:ok, terminal_step()}
  | {:error,
     :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed,
        :completed}}

Complete a running step.

ensure_tables()

@spec ensure_tables() :: :ok

Ensure the runs ETS tables exist.

fail_run(run_id, error_term)

@spec fail_run(binary(), term()) ::
  {:ok, terminal_run()}
  | {:error,
     :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed, :failed}}

Fail a run and cascade failure to active steps.

fail_step(run_id, step_id, error_term)

@spec fail_step(binary(), binary(), term()) ::
  {:ok, terminal_step()}
  | {:error,
     :not_found
     | {:invalid_status_transition, :cancelled | :completed | :failed, :failed}}

Fail a running step.

get_run(run_id)

@spec get_run(binary()) :: {:ok, run()} | {:error, :not_found}

Get a run by id.

get_step(run_id, step_id)

@spec get_step(binary(), binary()) :: {:ok, step()} | {:error, :not_found}

Get a step by run id and step id.

list_runs()

@spec list_runs() :: {:ok, [run()]}

List all runs without filters.

list_runs(filter)

@spec list_runs(run_filter()) ::
  {:ok, [run()]}
  | {:error,
     {:invalid_filter,
      :kind
      | :limit
      | :parent_run_id
      | :run_id
      | :session_id
      | :since
      | :status
      | :step_id
      | :thread_id}}

List runs with exact-match filters.

list_steps(run_id)

@spec list_steps(binary()) :: {:ok, [step()]} | {:error, :not_found}

List steps for a run, oldest first.

start_run(scope, opts)

@spec start_run(scope(), run_opts()) ::
  {:ok, started_run()}
  | {:error,
     :already_exists
     | :inconsistent_parent_scope
     | :parent_run_not_found
     | :session_id_required_for_thread
     | {:invalid_run_opt, :kind | :metadata | :run_id}
     | {:invalid_scope, atom()}
     | {:unsupported_scope_key, atom()}}

Start a run with the given scope and options.

start_step(run_id, opts)

@spec start_step(binary(), step_opts()) ::
  {:ok, started_step()}
  | {:error,
     :already_exists
     | :not_found
     | :run_not_active
     | {:invalid_step_opt, :kind | :metadata | :step_id}}

Start a step within a run.