BeamAgent.Routines (beam_agent_ex v0.1.0)

Copy Markdown View Source

Canonical routines and scheduled execution for BeamAgent.

BeamAgent.Routines manages durable job records for delayed or recurring work. It intentionally does not start a scheduler daemon inside BeamAgent. Instead, callers create jobs, inspect due work, and invoke run_due/1 from a process they already own.

Supported schedules:

  • one-shot (type: :once)
  • interval (type: :interval)

Supported targets:

  • :run for canonical run creation without backend execution
  • :query for live-session or routed-session prompt execution

Summary

Functions

Cancel a routine job.

Clear all routines state.

Create a routine job.

Alias for list_due/0.

Alias for list_due/1.

Ensure the routines ETS tables exist.

Fetch a routine job by id.

List all routine jobs.

List routine jobs with exact-match filters.

List jobs currently due as of now.

List jobs due according to an explicit due filter.

Return the earliest next-run timestamp in the routines due index.

Execute all currently due jobs using default runner options.

Execute currently due jobs from the calling process.

Execute a routine job immediately without changing its normal cadence.

Update a routine job.

Types

due_filter()

@type due_filter() :: %{
  optional(:at) => integer(),
  optional(:limit) => pos_integer(),
  optional(:include_claimed) => boolean()
}

job_filter()

@type job_filter() :: %{
  optional(:job_id) => binary(),
  optional(:state) => job_state(),
  optional(:schedule_type) => atom(),
  optional(:target_type) => atom(),
  optional(:due_before) => integer(),
  optional(:limit) => pos_integer()
}

job_input()

@type job_input() :: %{
  optional(:job_id) => binary(),
  :schedule => schedule(),
  :target => target(),
  optional(:payload) => term(),
  optional(:metadata) => map(),
  optional(:routing_policy) => map(),
  optional(:retry_policy) => retry_policy(),
  optional(:idempotency_key) => binary(),
  optional(:state) => :active | :paused,
  optional(:next_run_at) => integer()
}

job_patch()

@type job_patch() :: %{
  optional(:schedule) => schedule(),
  optional(:target) => target(),
  optional(:payload) => term(),
  optional(:metadata) => map(),
  optional(:routing_policy) => map(),
  optional(:retry_policy) => retry_policy(),
  optional(:idempotency_key) => binary(),
  optional(:state) => :active | :paused,
  optional(:next_run_at) => integer()
}

job_record()

@type job_record() :: %{
  :job_id => binary(),
  :schedule => schedule(),
  :target => target(),
  :routing_policy => map(),
  :retry_policy => retry_policy(),
  :idempotency_key => binary(),
  :state => job_state(),
  :metadata => map(),
  :attempt_count => non_neg_integer(),
  :created_at => integer(),
  :updated_at => integer(),
  optional(:payload) => term(),
  optional(:next_run_at) => integer(),
  optional(:current_run_id) => binary(),
  optional(:current_slot_at) => integer(),
  optional(:last_run_id) => binary(),
  optional(:last_run_at) => integer(),
  optional(:last_result) => map(),
  optional(:last_error) => term(),
  optional(:cancelled_at) => integer(),
  optional(:completed_at) => integer()
}

job_state()

@type job_state() ::
  :active
  | :running
  | :retry_waiting
  | :paused
  | :completed
  | :exhausted
  | :cancelled

retry_policy()

@type retry_policy() :: %{
  optional(:max_attempts) => pos_integer(),
  optional(:backoff_ms) => non_neg_integer()
}

run_due_result()

@type run_due_result() :: %{
  job_id: binary(),
  run: BeamAgent.Runs.run(),
  slot_at: integer()
}

schedule()

@type schedule() ::
  %{type: :once, at: integer()}
  | %{
      :type => :interval,
      :every_ms => pos_integer(),
      optional(:start_at) => integer(),
      optional(:catch_up) => boolean()
    }

session_target()

@type session_target() :: %{kind: :live, ref: pid()} | %{kind: :routed, opts: map()}

target()

@type target() ::
  %{
    :type => :run,
    optional(:scope) => binary() | map(),
    optional(:run_opts) => map()
  }
  | %{
      :type => :query,
      :session => session_target(),
      :prompt => binary(),
      optional(:query_opts) => map(),
      optional(:thread) => thread_target(),
      optional(:stop_session) => boolean()
    }

thread_target()

@type thread_target() :: %{thread_id: binary()} | %{start: map()}

Functions

cancel(job_id)

@spec cancel(binary()) :: :ok

Cancel a routine job.

clear()

@spec clear() :: :ok

Clear all routines state.

create(job)

@spec create(job_input()) :: {:ok, job_record()} | {:error, term()}

Create a routine job.

due()

@spec due() :: {:ok, [job_record()]}

Alias for list_due/0.

due(filter)

@spec due(due_filter()) :: {:ok, [job_record()]} | {:error, term()}

Alias for list_due/1.

ensure_tables()

@spec ensure_tables() :: :ok

Ensure the routines ETS tables exist.

get(job_id)

@spec get(binary()) :: {:ok, job_record()} | {:error, :not_found}

Fetch a routine job by id.

list()

@spec list() :: {:ok, [job_record()]}

List all routine jobs.

list(filter)

@spec list(job_filter()) :: {:ok, [job_record()]} | {:error, term()}

List routine jobs with exact-match filters.

list_due()

@spec list_due() :: {:ok, [job_record()]}

List jobs currently due as of now.

list_due(filter)

@spec list_due(due_filter()) :: {:ok, [job_record()]} | {:error, term()}

List jobs due according to an explicit due filter.

next_due_at()

@spec next_due_at() :: {:ok, integer()} | {:error, :none}

Return the earliest next-run timestamp in the routines due index.

run_due()

@spec run_due() :: {:ok, [run_due_result()]}

Execute all currently due jobs using default runner options.

run_due(opts)

@spec run_due(map()) :: {:ok, [run_due_result()]}

Execute currently due jobs from the calling process.

run_now(job_id)

@spec run_now(binary()) :: {:ok, BeamAgent.Runs.run()} | {:error, :not_found}

Execute a routine job immediately without changing its normal cadence.

update(job_id, patch)

@spec update(binary(), job_patch()) :: {:ok, job_record()} | {:error, term()}

Update a routine job.