Elixir wrapper for the Codex CLI agent SDK.
Provides idiomatic Elixir access to the Codex app-server, direct realtime
voice, and exec transports. For most use cases, use start_session/1 for
full bidirectional JSON-RPC, start_realtime/1 for direct realtime voice,
or start_exec/1 for simpler one-shot queries.
Quick Start
{:ok, session} = CodexEx.start_session(cli_path: "codex")
{:ok, messages} = CodexEx.query(session, "What is 2+2?")
CodexEx.stop(session)Streaming
session
|> CodexEx.stream!("Explain quantum computing")
|> Enum.each(&IO.inspect/1)Thread Management (app-server only)
{:ok, %{"threadId" => tid}} = CodexEx.thread_start(session, %{})
{:ok, messages} = CodexEx.query(session, "Follow-up question")
Summary
Types
A content block used in message-to-block conversions.
A flat message produced from a content block.
Hook callback function type.
Hook context map passed to SDK lifecycle hook callbacks.
SDK hook definition returned by sdk_hook/2,3.
MCP server definition map.
MCP tool definition map.
A message map as returned by beam_agent_core.
Message filter options for get_session_messages/2.
Query parameter map accepted by query/3 and send_query/4.
Server health status map.
Session filter options for list_sessions/1.
Session store entry map.
Share state information.
Stop reason atoms returned by the backend.
Session summary information.
Todo item extracted from messages.
Functions
Abort the current turn. Alias for interrupt/1.
Get account information.
Start a native account login flow.
Cancel a native account login flow.
Log out of the native account session.
Read native account rate limits.
List active beta features from the system init data.
Get the API key source from the system init data.
List native apps/connectors.
Convert a single content_block into a flat message.
Supervisor child specification for a codex_session process.
Get the CLI version from the system init data.
List native collaboration mode presets.
Run a command in the Codex sandbox.
Write stdin to a running command execution.
Apply a batch of native config edits.
Read the effective native config.
Read native config requirements.
Write a single native config value.
Get the current model from session info.
Get the current permission mode from session info.
Delete a session and its messages.
Supervisor child specification for a codex_exec process.
List native experimental feature flags.
Detect migratable external agent config artifacts.
Import selected external agent config artifacts.
Extract all TodoWrite items from a list of messages.
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
Fork a tracked session into a new session ID.
Run a fuzzy file search.
Start a native fuzzy file search session.
Stop a native fuzzy file search session.
Update a native fuzzy file search session query.
Get session metadata by ID.
Get messages for a session.
Get messages with options.
Query session health.
Interrupt the current turn.
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
List all tracked sessions.
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
Create an in-process MCP server definition.
Start an MCP server OAuth login flow.
Reload native MCP server config from disk.
Get status of all MCP servers.
List native MCP server status entries.
Create an in-process MCP tool definition.
Convert a single flat message into a content_block.
Convert a list of flat messages into content_block format.
List native models.
Normalize a list of messages from any adapter into a uniform flat stream.
Get the output style from the system init data.
Send a query and collect all response messages (blocking).
Reconnect a failed MCP server.
Revert the visible session history to a prior boundary.
Start a native review request.
Revert file changes to a checkpoint via universal checkpointing.
Create an SDK lifecycle hook.
Create an SDK lifecycle hook with a matcher.
Send a raw control message to the session.
Check server health. Maps to session health for Codex.
Query session info.
Set maximum thinking tokens via universal control.
Replace MCP server configurations.
Change the model at runtime.
Change the permission mode at runtime.
Create or replace share state for the current session.
Enable or disable a native skill by path.
List native skills.
Export a native remote skill.
List native remote skills.
Start a Codex exec session (one-shot JSONL queries).
Start a Codex direct realtime session (native realtime websocket voice/text).
Start a Codex app-server session (full bidirectional JSON-RPC).
Stop a session.
Stop a running agent task via universal task tracking.
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Returns a Stream that yields messages as they arrive.
Submit a feedback report to the Codex server.
Generate and store a summary for the current session.
List available agents.
List available slash commands.
List available models.
Archive a thread.
Fork an existing thread.
List all threads.
List all currently loaded threads.
Patch stored thread metadata.
Set a thread display name.
Read a stored thread.
Append an audio chunk to a native realtime thread session.
Append text to a native realtime thread session.
Start a native realtime session for a thread.
Stop a native realtime thread session.
Resume an existing thread by ID.
Rollback a thread.
Start a new conversation thread.
Unarchive a thread.
Unsubscribe the current connection from a thread.
Get a summary of todo counts by status.
Enable or disable an MCP server.
Interrupt an in-flight turn explicitly by thread and turn id.
Respond to an agent request (approval, user input, etc.).
Steer an in-flight turn with additional input.
Clear any stored session revert state.
Revoke share state for the current session.
Start Windows sandbox setup.
Get the working directory from the system init data.
Types
@type content_block() :: %{ :type => :raw | :text | :thinking | :tool_result | :tool_use, optional(:content) => binary(), optional(:id) => binary(), optional(:input) => map(), optional(:name) => binary(), optional(:raw) => map(), optional(:text) => binary(), optional(:thinking) => binary(), optional(:tool_use_id) => binary() }
A content block used in message-to-block conversions.
@type flat_message() :: %{ :type => :raw | :text | :thinking | :tool_result | :tool_use, optional(:content) => term(), optional(:raw) => term(), optional(:tool_input) => term(), optional(:tool_name) => term(), optional(:tool_use_id) => term() }
A flat message produced from a content block.
@type hook_callback() :: (hook_context() -> :ok | {:deny, binary()})
Hook callback function type.
@type hook_context() :: %{ :event => atom(), optional(:agent_id) => binary(), optional(:agent_transcript_path) => binary(), optional(:agent_type) => binary(), optional(:content) => binary(), optional(:duration_ms) => non_neg_integer(), optional(:interrupt) => boolean(), optional(:params) => map(), optional(:permission_prompt_tool_name) => binary(), optional(:permission_suggestions) => [any()], optional(:prompt) => binary(), optional(:reason) => term(), optional(:session_id) => binary(), optional(:stop_hook_active) => boolean(), optional(:stop_reason) => atom() | binary(), optional(:system_info) => map(), optional(:tool_input) => map(), optional(:tool_name) => binary(), optional(:tool_use_id) => binary(), optional(:updated_permissions) => map() }
Hook context map passed to SDK lifecycle hook callbacks.
@type hook_def() :: %{ :callback => hook_callback(), :event => atom(), optional(:matcher) => %{tool_name: binary()}, optional(:compiled_re) => {:re_pattern, term(), term(), term(), term()} }
SDK hook definition returned by sdk_hook/2,3.
@type mcp_server_def() :: %{ name: binary(), tools: [mcp_tool_def()], version: binary() }
MCP server definition map.
@type mcp_tool_def() :: %{ description: binary(), handler: (map() -> {:error, binary()} | {:ok, [any()]}), input_schema: map(), name: binary() }
MCP tool definition map.
@type message() :: %{ :type => atom(), optional(:content) => binary(), optional(:content_blocks) => [any()], optional(:duration_api_ms) => non_neg_integer(), optional(:duration_ms) => non_neg_integer(), optional(:error_info) => map(), optional(:errors) => [any()], optional(:event_type) => binary(), optional(:fast_mode_state) => map(), optional(:is_error) => boolean(), optional(:is_replay) => boolean(), optional(:is_using_overage) => boolean(), optional(:message_id) => binary(), optional(:model) => binary(), optional(:model_usage) => map(), optional(:num_turns) => non_neg_integer(), optional(:overage_disabled_reason) => binary(), optional(:overage_resets_at) => number(), optional(:overage_status) => binary(), optional(:parent_tool_use_id) => :null | binary(), optional(:permission_denials) => [any()], optional(:rate_limit_status) => binary(), optional(:rate_limit_type) => binary(), optional(:raw) => map(), optional(:request) => map(), optional(:request_id) => binary(), optional(:resets_at) => number(), optional(:response) => map(), optional(:session_id) => binary(), optional(:stop_reason) => binary(), optional(:stop_reason_atom) => stop_reason(), optional(:structured_output) => term(), optional(:subtype) => binary(), optional(:surpassed_threshold) => number(), optional(:system_info) => map(), optional(:thread_id) => binary(), optional(:timestamp) => integer(), optional(:tool_input) => map(), optional(:tool_name) => binary(), optional(:tool_use_id) => binary(), optional(:total_cost_usd) => number(), optional(:usage) => map(), optional(:utilization) => number(), optional(:uuid) => binary() }
A message map as returned by beam_agent_core.
@type message_filter_opts() :: %{ optional(:include_hidden) => boolean(), optional(:limit) => pos_integer(), optional(:offset) => non_neg_integer(), optional(:types) => [atom()] }
Message filter options for get_session_messages/2.
@type query_params() :: %{ optional(:agent) => binary(), optional(:allowed_tools) => [binary()], optional(:approval_policy) => binary(), optional(:attachments) => [map()], optional(:cwd) => binary(), optional(:disallowed_tools) => [binary()], optional(:effort) => binary(), optional(:max_budget_usd) => number(), optional(:max_tokens) => pos_integer(), optional(:max_turns) => pos_integer(), optional(:mode) => binary(), optional(:model) => binary(), optional(:model_id) => binary(), optional(:output_format) => :json_schema | :text | binary() | map(), optional(:permission_mode) => :accept_edits | :bypass_permissions | :default | :dont_ask | :plan | binary(), optional(:provider) => map(), optional(:provider_id) => binary(), optional(:sandbox_mode) => binary(), optional(:summary) => binary(), optional(:system) => binary() | map(), optional(:system_prompt) => binary() | %{preset: binary(), type: :preset, append: binary()}, optional(:thinking) => map(), optional(:timeout) => timeout(), optional(:tools) => [any()] | map() }
Query parameter map accepted by query/3 and send_query/4.
@type server_health_info() :: %{
adapter: :codex,
health:
:active_query | :active_turn | :connecting | :error | :initializing | :ready
}
Server health status map.
@type session_filter_opts() :: %{ optional(:adapter) => atom(), optional(:cwd) => binary(), optional(:limit) => pos_integer(), optional(:model) => binary(), optional(:since) => integer() }
Session filter options for list_sessions/1.
@type session_store_entry() :: %{ session_id: binary(), adapter: atom(), created_at: integer(), cwd: binary(), extra: map(), message_count: non_neg_integer(), model: binary(), updated_at: integer() }
Session store entry map.
@type stop_reason() ::
:end_turn
| :max_tokens
| :refusal
| :stop_sequence
| :tool_use_stop
| :unknown_stop
Stop reason atoms returned by the backend.
@type summary_info() :: %{ content: binary(), generated_at: integer(), generated_by: binary(), message_count: non_neg_integer(), session_id: binary() }
Session summary information.
@type todo_item() :: %{ :content => binary(), :status => :completed | :in_progress | :pending, optional(:active_form) => binary() }
Todo item extracted from messages.
Functions
Abort the current turn. Alias for interrupt/1.
Get account information.
Start a native account login flow.
Cancel a native account login flow.
Log out of the native account session.
Read native account rate limits.
List active beta features from the system init data.
Get the API key source from the system init data.
List native apps/connectors.
@spec block_to_message(content_block()) :: flat_message()
Convert a single content_block into a flat message.
@spec child_spec(keyword() | map()) :: Supervisor.child_spec()
Supervisor child specification for a codex_session process.
Accepts keyword list or map. Uses :session_id from opts as child id
when available.
Get the CLI version from the system init data.
List native collaboration mode presets.
Run a command in the Codex sandbox.
Returns {:error, :not_supported} for exec sessions.
Write stdin to a running command execution.
Apply a batch of native config edits.
Read the effective native config.
Read native config requirements.
Write a single native config value.
Get the current model from session info.
Extracts from the session's model field or system init data.
Get the current permission mode from session info.
@spec delete_session(binary()) :: :ok
Delete a session and its messages.
@spec exec_child_spec(keyword() | map()) :: Supervisor.child_spec()
Supervisor child specification for a codex_exec process.
Accepts keyword list or map. Uses :session_id from opts as child id
when available.
List native experimental feature flags.
Detect migratable external agent config artifacts.
Import selected external agent config artifacts.
@spec extract_todos([BeamAgent.message()]) :: [todo_item()]
Extract all TodoWrite items from a list of messages.
@spec filter_todos([BeamAgent.Todo.todo_item()], BeamAgent.Todo.todo_status()) :: [ BeamAgent.Todo.todo_item() ]
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
@spec fork_session(pid(), map()) :: {:ok, session_store_entry()} | {:error, :not_found}
Fork a tracked session into a new session ID.
Run a fuzzy file search.
Start a native fuzzy file search session.
Stop a native fuzzy file search session.
@spec fuzzy_file_search_session_update(pid(), binary(), binary()) :: {:ok, map()} | {:error, term()}
Update a native fuzzy file search session query.
@spec get_session(binary()) :: {:ok, session_store_entry()} | {:error, :not_found}
Get session metadata by ID.
Get messages for a session.
@spec get_session_messages(binary(), message_filter_opts()) :: {:ok, [message()]} | {:error, :not_found}
Get messages with options.
Query session health.
Interrupt the current turn.
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
@spec list_sessions() :: {:ok, [session_store_entry()]}
List all tracked sessions.
@spec list_sessions(session_filter_opts()) :: {:ok, [session_store_entry()]}
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
@spec mcp_server(binary(), [mcp_tool_def()]) :: mcp_server_def()
Create an in-process MCP server definition.
Start an MCP server OAuth login flow.
Reload native MCP server config from disk.
Get status of all MCP servers.
List native MCP server status entries.
@spec mcp_tool(binary(), binary(), map(), (map() -> {:ok, [map()]} | {:error, binary()})) :: mcp_tool_def()
Create an in-process MCP tool definition.
@spec message_to_block(map()) :: content_block()
Convert a single flat message into a content_block.
@spec messages_to_blocks([map()]) :: [content_block()]
Convert a list of flat messages into content_block format.
List native models.
Normalize a list of messages from any adapter into a uniform flat stream.
Claude produces assistant messages with nested content_blocks.
All other adapters (including Codex) produce individual typed messages.
This function flattens both into a uniform stream where each message has
a single, specific type — never nested content_blocks.
Examples
CodexEx.normalize_messages(messages)
|> Enum.filter(& &1.type == :text)
|> Enum.map(& &1.content)
|> Enum.join("")
Get the output style from the system init data.
@spec query(pid(), binary(), query_params()) :: {:ok, [message()]} | {:error, term()}
Send a query and collect all response messages (blocking).
Returns {:ok, messages} where messages is a list of beam_agent_core
message maps. Uses deadline-based timeout.
Options
:timeout- total query timeout in ms (default: 120_000)
@spec reconnect_mcp_server(pid(), binary()) :: {:ok, %{required(<<_::48>>) => <<_::88>>}} | {:error, :not_found}
Reconnect a failed MCP server.
@spec revert_session(pid(), map()) :: {:ok, session_store_entry()} | {:error, :invalid_selector | :not_found}
Revert the visible session history to a prior boundary.
Start a native review request.
@spec rewind_files(pid(), binary()) :: :ok | {:error, :not_found | {:restore_failed, binary(), atom()}}
Revert file changes to a checkpoint via universal checkpointing.
@spec sdk_hook(atom(), hook_callback()) :: %{event: atom(), callback: hook_callback()}
Create an SDK lifecycle hook.
@spec sdk_hook(atom(), hook_callback(), %{tool_name: binary()}) :: hook_def()
Create an SDK lifecycle hook with a matcher.
Send a raw control message to the session.
Low-level interface for sending arbitrary JSON-RPC control messages.
App-server sessions dispatch normally; exec sessions return
{:error, :not_supported}.
@spec server_health(pid()) :: {:ok, server_health_info()}
Check server health. Maps to session health for Codex.
Query session info.
@spec set_max_thinking_tokens(pid(), pos_integer()) :: {:ok, %{max_thinking_tokens: pos_integer()}}
Set maximum thinking tokens via universal control.
@spec set_mcp_servers(pid(), [mcp_server_def()]) :: {:ok, map()} | {:error, :not_found}
Replace MCP server configurations.
Change the model at runtime.
Change the permission mode at runtime.
Enable or disable a native skill by path.
List native skills.
Export a native remote skill.
List native remote skills.
Start a Codex exec session (one-shot JSONL queries).
Start a Codex direct realtime session (native realtime websocket voice/text).
Start a Codex app-server session (full bidirectional JSON-RPC).
@spec stop(pid()) :: :ok
Stop a session.
Stop a running agent task via universal task tracking.
@spec stream(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Non-raising variant of stream!/3.
@spec stream!(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields messages as they arrive.
Raises on errors. Uses Stream.resource/3 under the hood.
The query is dispatched to the CLI immediately when stream!/3
is called. Message consumption is lazy/pull-based.
Example
session
|> CodexEx.stream!("Explain OTP")
|> Enum.each(fn msg -> IO.puts(msg.content) end)
Submit a feedback report to the Codex server.
@spec summarize_session(pid()) :: {:ok, summary_info()} | {:error, :not_found}
Generate and store a summary for the current session.
@spec summarize_session(pid(), map()) :: {:ok, summary_info()} | {:error, :not_found}
List available agents.
List available slash commands.
List available models.
Archive a thread.
Fork an existing thread.
List all threads.
List all currently loaded threads.
Patch stored thread metadata.
Set a thread display name.
Read a stored thread.
Append an audio chunk to a native realtime thread session.
Append text to a native realtime thread session.
Start a native realtime session for a thread.
Stop a native realtime thread session.
Resume an existing thread by ID.
Rollback a thread.
Start a new conversation thread.
Unarchive a thread.
Unsubscribe the current connection from a thread.
@spec todo_summary([todo_item()]) :: %{ :total => non_neg_integer(), required(atom()) => non_neg_integer() }
Get a summary of todo counts by status.
@spec toggle_mcp_server(pid(), binary(), boolean()) :: {:ok, %{required(<<_::48>>) => <<_::56>>}} | {:error, :not_found}
Enable or disable an MCP server.
Interrupt an in-flight turn explicitly by thread and turn id.
Respond to an agent request (approval, user input, etc.).
@spec turn_steer( pid(), binary(), binary(), binary() | [%{required(binary()) => term()}] ) :: {:ok, map()} | {:error, term()}
Steer an in-flight turn with additional input.
@spec unrevert_session(pid()) :: {:ok, session_store_entry()} | {:error, :not_found}
Clear any stored session revert state.
Start Windows sandbox setup.
Get the working directory from the system init data.