Canonical routines and scheduled execution for BeamAgent.
BeamAgent.Routines manages durable job records for delayed or recurring
work. It intentionally does not start a scheduler daemon inside BeamAgent.
Instead, callers create jobs, inspect due work, and invoke run_due/1 from
a process they already own.
Supported schedules:
- one-shot (
type: :once) - interval (
type: :interval)
Supported targets:
:runfor canonical run creation without backend execution:queryfor live-session or routed-session prompt execution
Summary
Functions
Cancel a routine job.
Clear all routines state.
Create a routine job.
Alias for list_due/0.
Alias for list_due/1.
Ensure the routines ETS tables exist.
Fetch a routine job by id.
List all routine jobs.
List routine jobs with exact-match filters.
List jobs currently due as of now.
List jobs due according to an explicit due filter.
Return the earliest next-run timestamp in the routines due index.
Execute all currently due jobs using default runner options.
Execute currently due jobs from the calling process.
Execute a routine job immediately without changing its normal cadence.
Update a routine job.
Types
@type due_filter() :: %{ optional(:at) => integer(), optional(:limit) => pos_integer(), optional(:include_claimed) => boolean() }
@type job_input() :: %{ optional(:job_id) => binary(), :schedule => schedule(), :target => target(), optional(:payload) => term(), optional(:metadata) => map(), optional(:routing_policy) => map(), optional(:retry_policy) => retry_policy(), optional(:idempotency_key) => binary(), optional(:state) => :active | :paused, optional(:next_run_at) => integer() }
@type job_patch() :: %{ optional(:schedule) => schedule(), optional(:target) => target(), optional(:payload) => term(), optional(:metadata) => map(), optional(:routing_policy) => map(), optional(:retry_policy) => retry_policy(), optional(:idempotency_key) => binary(), optional(:state) => :active | :paused, optional(:next_run_at) => integer() }
@type job_record() :: %{ :job_id => binary(), :schedule => schedule(), :target => target(), :routing_policy => map(), :retry_policy => retry_policy(), :idempotency_key => binary(), :state => job_state(), :metadata => map(), :attempt_count => non_neg_integer(), :created_at => integer(), :updated_at => integer(), optional(:payload) => term(), optional(:next_run_at) => integer(), optional(:current_run_id) => binary(), optional(:current_slot_at) => integer(), optional(:last_run_id) => binary(), optional(:last_run_at) => integer(), optional(:last_result) => map(), optional(:last_error) => term(), optional(:cancelled_at) => integer(), optional(:completed_at) => integer() }
@type job_state() ::
:active
| :running
| :retry_waiting
| :paused
| :completed
| :exhausted
| :cancelled
@type retry_policy() :: %{ optional(:max_attempts) => pos_integer(), optional(:backoff_ms) => non_neg_integer() }
@type run_due_result() :: %{ job_id: binary(), run: BeamAgent.Runs.run(), slot_at: integer() }
@type schedule() :: %{type: :once, at: integer()} | %{ :type => :interval, :every_ms => pos_integer(), optional(:start_at) => integer(), optional(:catch_up) => boolean() }
@type target() :: %{ :type => :run, optional(:scope) => binary() | map(), optional(:run_opts) => map() } | %{ :type => :query, :session => session_target(), :prompt => binary(), optional(:query_opts) => map(), optional(:thread) => thread_target(), optional(:stop_session) => boolean() }
Functions
@spec cancel(binary()) :: :ok
Cancel a routine job.
@spec clear() :: :ok
Clear all routines state.
@spec create(job_input()) :: {:ok, job_record()} | {:error, term()}
Create a routine job.
@spec due() :: {:ok, [job_record()]}
Alias for list_due/0.
@spec due(due_filter()) :: {:ok, [job_record()]} | {:error, term()}
Alias for list_due/1.
@spec ensure_tables() :: :ok
Ensure the routines ETS tables exist.
@spec get(binary()) :: {:ok, job_record()} | {:error, :not_found}
Fetch a routine job by id.
@spec list() :: {:ok, [job_record()]}
List all routine jobs.
@spec list(job_filter()) :: {:ok, [job_record()]} | {:error, term()}
List routine jobs with exact-match filters.
@spec list_due() :: {:ok, [job_record()]}
List jobs currently due as of now.
@spec list_due(due_filter()) :: {:ok, [job_record()]} | {:error, term()}
List jobs due according to an explicit due filter.
@spec next_due_at() :: {:ok, integer()} | {:error, :none}
Return the earliest next-run timestamp in the routines due index.
@spec run_due() :: {:ok, [run_due_result()]}
Execute all currently due jobs using default runner options.
@spec run_due(map()) :: {:ok, [run_due_result()]}
Execute currently due jobs from the calling process.
@spec run_now(binary()) :: {:ok, BeamAgent.Runs.run()} | {:error, :not_found}
Execute a routine job immediately without changing its normal cadence.
@spec update(binary(), job_patch()) :: {:ok, job_record()} | {:error, term()}
Update a routine job.