Durable event journal for BeamAgent.
BeamAgent.Journal stores replayable canonical domain events such as run and
step lifecycle transitions, artifact changes, and control mutations. It is
distinct from the live event stream:
BeamAgent.event_subscribe/1streams transient session activityBeamAgent.Journalpersists append-only BeamAgent domain events for replay
This is the Elixir facade over the Erlang :beam_agent_journal public
module. The implementation is ETS-backed and process-free.
Summary
Types
Error returned when appending an invalid or inconsistent journal event.
Canonical journal entry.
Exact-match filter accepted by list/1 and stream_from/2.
Envelope passed to append/2.
Journal event type identifier.
Journal tag value.
Functions
Acknowledge a journal entry for a consumer id.
Append a normalized BeamAgent domain event to the durable journal.
Clear all journal events and acknowledgements.
Ensure the journal ETS tables exist.
Fetch a journal entry by id.
List all journal entries, oldest first.
List journal entries with exact-match filters.
Replay journal entries after the given cursor.
Replay journal entries after the given cursor with additional filters.
Types
@type append_error() :: :already_exists | :session_id_required_for_thread | {:invalid_event, :event_id | :payload | :run_id | :session_id | :tags | :thread_id | :timestamp} | {:invalid_event_type, binary()}
Error returned when appending an invalid or inconsistent journal event.
@type entry() :: %{ :event_id => binary(), :event_type => event_type(), :sequence => pos_integer(), :timestamp => integer(), :payload => map(), :tags => [tag()], optional(:session_id) => binary(), optional(:thread_id) => binary(), optional(:run_id) => binary() }
Canonical journal entry.
@type event_filter() :: %{ optional(:event_id) => binary(), optional(:event_type) => event_type(), optional(:session_id) => binary(), optional(:thread_id) => binary(), optional(:run_id) => binary(), optional(:tag) => tag(), optional(:since) => integer(), optional(:limit) => pos_integer() }
Exact-match filter accepted by list/1 and stream_from/2.
@type event_input() :: %{ optional(:event_id) => binary(), optional(:session_id) => binary(), optional(:thread_id) => binary(), optional(:run_id) => binary(), optional(:timestamp) => integer(), optional(:tags) => [tag()], optional(:payload) => map() }
Envelope passed to append/2.
Journal event type identifier.
Journal tag value.
Functions
Acknowledge a journal entry for a consumer id.
@spec append(event_type(), event_input()) :: {:ok, entry()} | {:error, append_error()}
Append a normalized BeamAgent domain event to the durable journal.
@spec clear() :: :ok
Clear all journal events and acknowledgements.
@spec ensure_tables() :: :ok
Ensure the journal ETS tables exist.
Fetch a journal entry by id.
@spec list() :: {:ok, [entry()]}
List all journal entries, oldest first.
@spec list(event_filter()) :: {:ok, [entry()]} | {:error, term()}
List journal entries with exact-match filters.
@spec stream_from(non_neg_integer()) :: {:ok, [entry()]} | {:error, term()}
Replay journal entries after the given cursor.
@spec stream_from(non_neg_integer(), event_filter()) :: {:ok, [entry()]} | {:error, term()}
Replay journal entries after the given cursor with additional filters.