BeamAgent.Journal (beam_agent_ex v0.1.0)

Copy Markdown View Source

Durable event journal for BeamAgent.

BeamAgent.Journal stores replayable canonical domain events such as run and step lifecycle transitions, artifact changes, and control mutations. It is distinct from the live event stream:

This is the Elixir facade over the Erlang :beam_agent_journal public module. The implementation is ETS-backed and process-free.

Summary

Types

Error returned when appending an invalid or inconsistent journal event.

Canonical journal entry.

Exact-match filter accepted by list/1 and stream_from/2.

Envelope passed to append/2.

Journal event type identifier.

Journal tag value.

Functions

Acknowledge a journal entry for a consumer id.

Append a normalized BeamAgent domain event to the durable journal.

Clear all journal events and acknowledgements.

Ensure the journal ETS tables exist.

Fetch a journal entry by id.

List all journal entries, oldest first.

List journal entries with exact-match filters.

Replay journal entries after the given cursor.

Replay journal entries after the given cursor with additional filters.

Types

append_error()

@type append_error() ::
  :already_exists
  | :session_id_required_for_thread
  | {:invalid_event,
     :event_id
     | :payload
     | :run_id
     | :session_id
     | :tags
     | :thread_id
     | :timestamp}
  | {:invalid_event_type, binary()}

Error returned when appending an invalid or inconsistent journal event.

entry()

@type entry() :: %{
  :event_id => binary(),
  :event_type => event_type(),
  :sequence => pos_integer(),
  :timestamp => integer(),
  :payload => map(),
  :tags => [tag()],
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:run_id) => binary()
}

Canonical journal entry.

event_filter()

@type event_filter() :: %{
  optional(:event_id) => binary(),
  optional(:event_type) => event_type(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:run_id) => binary(),
  optional(:tag) => tag(),
  optional(:since) => integer(),
  optional(:limit) => pos_integer()
}

Exact-match filter accepted by list/1 and stream_from/2.

event_input()

@type event_input() :: %{
  optional(:event_id) => binary(),
  optional(:session_id) => binary(),
  optional(:thread_id) => binary(),
  optional(:run_id) => binary(),
  optional(:timestamp) => integer(),
  optional(:tags) => [tag()],
  optional(:payload) => map()
}

Envelope passed to append/2.

event_type()

@type event_type() :: atom() | binary()

Journal event type identifier.

tag()

@type tag() :: atom() | binary()

Journal tag value.

Functions

ack(consumer_id, event_id)

@spec ack(binary(), binary()) :: :ok | {:error, :not_found}

Acknowledge a journal entry for a consumer id.

append(event_type, event)

@spec append(event_type(), event_input()) :: {:ok, entry()} | {:error, append_error()}

Append a normalized BeamAgent domain event to the durable journal.

clear()

@spec clear() :: :ok

Clear all journal events and acknowledgements.

ensure_tables()

@spec ensure_tables() :: :ok

Ensure the journal ETS tables exist.

get(event_id)

@spec get(binary()) :: {:ok, entry()} | {:error, :not_found}

Fetch a journal entry by id.

list()

@spec list() :: {:ok, [entry()]}

List all journal entries, oldest first.

list(filter)

@spec list(event_filter()) :: {:ok, [entry()]} | {:error, term()}

List journal entries with exact-match filters.

stream_from(cursor)

@spec stream_from(non_neg_integer()) :: {:ok, [entry()]} | {:error, term()}

Replay journal entries after the given cursor.

stream_from(cursor, filter)

@spec stream_from(non_neg_integer(), event_filter()) ::
  {:ok, [entry()]} | {:error, term()}

Replay journal entries after the given cursor with additional filters.