Elixir wrapper for the Copilot CLI agent SDK.
Provides idiomatic Elixir access to copilot_session (Erlang/OTP
gen_statem) with lazy streaming via Stream.resource/3.
Quick Start
{:ok, session} = CopilotEx.start_session(cli_path: "copilot")
{:ok, messages} = CopilotEx.query(session, "What is 2 + 2?")
CopilotEx.stop(session)Streaming
session
|> CopilotEx.stream!("Explain OTP supervision trees")
|> Enum.each(fn msg ->
case msg.type do
:text -> IO.write(msg.content)
:result -> IO.puts("\nDone!")
_ -> :ok
end
end)Session Options
:cli_path- Path to the Copilot CLI executable (default:"copilot"):work_dir- Working directory for the CLI subprocess:env- Environment variables as[{key, value}]charlists:buffer_max- Max raw binary buffer in bytes (default: 2MB):session_id- Resume a previous session (binary):model- Model to use (binary):system_message- System prompt sent to the Copilot session:permission_mode- Permission mode (binary):permission_handler-fn(request, invocation, opts) -> resultcallback:available_tools- Allowed tool list passed to Copilot:excluded_tools- Disallowed tool list passed to Copilot:mcp_servers- MCP server configurations (map):output_format- Structured output JSON schema (map):reasoning_effort- Reasoning effort level:sdk_mcp_servers- In-process MCP servers (list of server maps). All adapters share this unified API viabeam_agent_tool_registry.:sdk_hooks- SDK lifecycle hooks (list of hook maps):user_input_handler- User input request handler function
In-Process MCP Tools
Register tools via the unified beam_agent_tool_registry API (same as all adapters):
tool = :beam_agent_tool_registry.tool("weather", "Get weather",
%{"type" => "object",
"properties" => %{"city" => %{"type" => "string"}}},
fn args ->
city = Map.get(args, "city", "unknown")
{:ok, [%{type: :text, text: "72F in #{city}"}]}
end)
server = :beam_agent_tool_registry.server("my-tools", [tool])
{:ok, session} = CopilotEx.start_session(
cli_path: "copilot",
sdk_mcp_servers: [server]
)Permission Handling
Register a handler for Copilot permission requests (fail-closed by default):
handler = fn request, _invocation, _opts ->
case request do
%{"kind" => "file_write"} -> {:allow, %{}}
_ -> {:deny, "Not allowed"}
end
end
{:ok, session} = CopilotEx.start_session(
cli_path: "copilot",
permission_handler: handler
)
Summary
Functions
Abort the current active query. Alias for interrupt/1.
Get account information.
List active beta features from the system init data.
Get the API key source from the system init data.
Convert a single content_block into a flat message.
Supervisor child specification for a copilot_session process.
Get the CLI version from the system init data.
Run a command via universal command execution.
Get the current model from session info.
Get the current permission mode from session info.
Delete a native Copilot session by id.
Delete a session and its messages.
Extract all TodoWrite items from a list of messages.
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
Fork a tracked session into a new session ID.
Get native Copilot authentication status.
Get the last native Copilot session id.
Get native Copilot session metadata by id.
Get session metadata by ID.
Get messages for a session.
Get messages with options.
Get native Copilot CLI status information.
Get the current health/state of a session.
Interrupt/abort the current active query.
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
List native Copilot sessions.
List all tracked sessions.
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
Create an in-process MCP server definition.
Get status of all MCP servers.
Create an in-process MCP tool definition.
Convert a single flat message into a content_block.
Convert a list of flat messages into content_block format.
List native Copilot models with metadata.
Normalize a list of messages from any adapter into a uniform flat stream.
Get the output style from the system init data.
Send a query and collect all response messages (blocking).
Reconnect a failed MCP server.
Resume a native Copilot session by id.
Revert the visible session history to a prior boundary.
Revert file changes to a checkpoint via universal checkpointing.
Create an SDK lifecycle hook.
Create an SDK lifecycle hook with a matcher filter.
Send an arbitrary JSON-RPC command to the Copilot CLI.
Send a raw control message. Delegates to native Copilot implementation.
Check server health. Maps to session health for Copilot.
Destroy the current native Copilot session.
Fetch native Copilot events/messages for the current session.
Query session info (adapter, session_id, model, etc.).
Set maximum thinking tokens via universal control.
Replace MCP server configurations.
Change the model at runtime during a session.
Change the permission mode at runtime via universal control.
Create or replace share state for the current session.
Start a new Copilot CLI session.
Gracefully stop a session, closing the CLI subprocess.
Stop a running agent task via universal task tracking.
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Returns a Stream that yields messages as they arrive.
Submit feedback via universal feedback tracking.
Generate and store a summary for the current session.
List available agents.
List available slash commands.
List available models.
Archive a thread.
Fork an existing thread.
List all threads for this session.
Read thread metadata, optionally including visible messages.
Resume an existing thread.
Rollback the visible thread history.
Start a new conversation thread.
Unarchive a thread.
Get a summary of todo counts by status.
Enable or disable an MCP server.
Respond to an agent request via universal turn response.
Clear any stored session revert state.
Revoke share state for the current session.
Get the working directory from the system init data.
Types
@type hook_context() :: %{ event: atom(), agent_id: binary(), agent_transcript_path: binary(), agent_type: binary(), content: binary(), duration_ms: non_neg_integer(), interrupt: boolean(), params: map(), permission_prompt_tool_name: binary(), permission_suggestions: [any()], prompt: binary(), reason: term(), session_id: binary(), stop_hook_active: boolean(), stop_reason: atom() | binary(), system_info: map(), tool_input: map(), tool_name: binary(), tool_use_id: binary(), updated_permissions: map() }
@type message() :: %{ type: atom(), content: binary(), content_blocks: [any()], duration_api_ms: non_neg_integer(), duration_ms: non_neg_integer(), error_info: map(), errors: [any()], event_type: binary(), fast_mode_state: map(), is_error: boolean(), is_replay: boolean(), is_using_overage: boolean(), message_id: binary(), model: binary(), model_usage: map(), num_turns: non_neg_integer(), overage_disabled_reason: binary(), overage_resets_at: number(), overage_status: binary(), parent_tool_use_id: :null | binary(), permission_denials: [any()], rate_limit_status: binary(), rate_limit_type: binary(), raw: map(), request: map(), request_id: binary(), resets_at: number(), response: map(), session_id: binary(), stop_reason: binary(), stop_reason_atom: stop_reason(), structured_output: term(), subtype: binary(), surpassed_threshold: number(), system_info: map(), thread_id: binary(), timestamp: integer(), tool_input: map(), tool_name: binary(), tool_use_id: binary(), total_cost_usd: number(), usage: map(), utilization: number(), uuid: binary() }
@type message_filter_opts() :: %{ optional(:include_hidden) => boolean(), optional(:limit) => pos_integer(), optional(:offset) => non_neg_integer(), optional(:types) => [atom()] }
@type query_opts() :: %{ optional(:agent) => binary(), optional(:allowed_tools) => [binary()], optional(:approval_policy) => binary(), optional(:attachments) => [map()], optional(:cwd) => binary(), optional(:disallowed_tools) => [binary()], optional(:effort) => binary(), optional(:max_budget_usd) => number(), optional(:max_tokens) => pos_integer(), optional(:max_turns) => pos_integer(), optional(:mode) => binary(), optional(:model) => binary(), optional(:model_id) => binary(), optional(:output_format) => :json_schema | :text | binary() | map(), optional(:permission_mode) => :accept_edits | :bypass_permissions | :default | :dont_ask | :plan | binary(), optional(:provider) => map(), optional(:provider_id) => binary(), optional(:sandbox_mode) => binary(), optional(:summary) => binary(), optional(:system) => binary() | map(), optional(:system_prompt) => binary() | %{preset: binary(), type: :preset, append: binary()}, optional(:thinking) => map(), optional(:timeout) => timeout(), optional(:tools) => [any()] | map() }
@type session_list_opts() :: %{ optional(:adapter) => atom(), optional(:cwd) => binary(), optional(:limit) => pos_integer(), optional(:model) => binary(), optional(:since) => integer() }
@type stop_reason() ::
:end_turn
| :max_tokens
| :stop_sequence
| :refusal
| :tool_use_stop
| :unknown_stop
@type summary_info() :: %{ content: binary(), generated_at: integer(), generated_by: binary(), message_count: non_neg_integer(), session_id: binary() }
@type thread_info() :: %{ created_at: integer(), message_count: non_neg_integer(), session_id: binary(), status: :active | :archived | :completed | :paused, thread_id: binary(), updated_at: integer(), visible_message_count: non_neg_integer(), archived: boolean(), archived_at: integer(), metadata: map(), name: binary(), parent_thread_id: binary(), summary: map() }
@type thread_read_result() :: %{thread: thread_info(), messages: [map()]}
Functions
Abort the current active query. Alias for interrupt/1.
Get account information.
List active beta features from the system init data.
Get the API key source from the system init data.
@spec block_to_message(content_block()) :: flat_message()
Convert a single content_block into a flat message.
@spec child_spec(keyword() | map()) :: Supervisor.child_spec()
Supervisor child specification for a copilot_session process.
Accepts keyword list or map. Uses :session_id from opts as child id
when available.
Examples
children = [
{CopilotEx, cli_path: "copilot", work_dir: "/my/project"}
]
Supervisor.start_link(children, strategy: :one_for_one)
Get the CLI version from the system init data.
@spec command_run(pid(), binary(), map()) :: {:error, {:port_exit, term()} | {:port_failed, term()} | {:timeout, timeout()}} | {:ok, %{exit_code: integer(), output: binary()}}
Run a command via universal command execution.
Get the current model from session info.
Extracts from the session's model field or system init data.
Get the current permission mode from session info.
Delete a native Copilot session by id.
@spec delete_session(binary()) :: :ok
Delete a session and its messages.
Extract all TodoWrite items from a list of messages.
@spec filter_todos([todo_item()], BeamAgent.Todo.todo_status()) :: [todo_item()]
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
@spec fork_session(pid(), map()) :: {:error, :not_found} | {:ok, session_info()}
Fork a tracked session into a new session ID.
Get native Copilot authentication status.
Get the last native Copilot session id.
Get native Copilot session metadata by id.
@spec get_session(binary()) :: {:error, :not_found} | {:ok, session_info()}
Get session metadata by ID.
Get messages for a session.
@spec get_session_messages(binary(), message_filter_opts()) :: {:error, :not_found} | {:ok, [message()]}
Get messages with options.
Get native Copilot CLI status information.
@spec health(pid()) :: :connecting | :initializing | :ready | :active_query | :error
Get the current health/state of a session.
Examples
:ready = CopilotEx.health(session)
Interrupt/abort the current active query.
Examples
:ok = CopilotEx.interrupt(session)
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
List native Copilot sessions.
@spec list_sessions() :: {:ok, [session_info()]}
List all tracked sessions.
@spec list_sessions(session_list_opts()) :: {:ok, [session_info()]}
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
@spec mcp_server( binary(), [ %{ description: binary(), handler: (map() -> {term(), term()}), input_schema: map(), name: binary() } ] ) :: mcp_server_def()
Create an in-process MCP server definition.
Get status of all MCP servers.
@spec mcp_tool(binary(), binary(), map(), (map() -> {:error, binary()} | {:ok, [map()]})) :: mcp_tool_def()
Create an in-process MCP tool definition.
@spec message_to_block(map()) :: content_block()
Convert a single flat message into a content_block.
@spec messages_to_blocks([map()]) :: [content_block()]
Convert a list of flat messages into content_block format.
List native Copilot models with metadata.
Normalize a list of messages from any adapter into a uniform flat stream.
Claude produces assistant messages with nested content_blocks.
All other adapters (including Copilot) produce individual typed messages.
This function flattens both into a uniform stream where each message has
a single, specific type — never nested content_blocks.
Examples
CopilotEx.normalize_messages(messages)
|> Enum.filter(& &1.type == :text)
|> Enum.map(& &1.content)
|> Enum.join("")
Get the output style from the system init data.
@spec query(pid(), binary(), query_opts()) :: {:error, term()} | {:ok, [message()]}
Send a query and collect all response messages (blocking).
Returns {:ok, messages} once the query completes (session.idle).
Uses deadline-based timeout.
Options
:timeout- total query timeout in ms (default: 120_000)
Examples
{:ok, messages} = CopilotEx.query(session, "Hello!")
last = List.last(messages)
IO.puts(last.content)
@spec reconnect_mcp_server(pid(), binary()) :: {:ok, %{required(<<_::48>>) => <<_::88>>}} | {:error, :not_found}
Reconnect a failed MCP server.
Resume a native Copilot session by id.
@spec revert_session(pid(), map()) :: {:error, :invalid_selector | :not_found} | {:ok, session_info()}
Revert the visible session history to a prior boundary.
@spec rewind_files(pid(), binary()) :: :ok | {:error, :not_found | {:restore_failed, binary(), atom()}}
Revert file changes to a checkpoint via universal checkpointing.
@spec sdk_hook(atom(), (hook_context() -> :ok | {:deny, binary()})) :: %{ callback: (hook_context() -> :ok | {:deny, binary()}), event: atom() }
Create an SDK lifecycle hook.
Hooks fire at the shared BeamAgent.Hooks lifecycle points, including
blocking events like :pre_tool_use, :user_prompt_submit, and
:permission_request, plus notification events such as :post_tool_use,
:post_tool_use_failure, :session_start, :session_end,
:subagent_start, :subagent_stop, :pre_compact, :notification,
:config_change, :task_completed, and :teammate_idle.
Examples
hook = CopilotEx.sdk_hook(:pre_tool_use, fn ctx ->
case ctx.tool_name do
"Bash" -> {:deny, "No shell access"}
_ -> :ok
end
end)
{:ok, session} = CopilotEx.start_session(sdk_hooks: [hook])
@spec sdk_hook( atom(), (hook_context() -> :ok | {:deny, binary()}), %{tool_name: binary()} ) :: %{ callback: (hook_context() -> :ok | {:deny, binary()}), event: atom(), matcher: %{tool_name: binary()}, compiled_re: {:re_pattern, term(), term(), term(), term()} }
Create an SDK lifecycle hook with a matcher filter.
Examples
hook = CopilotEx.sdk_hook(:pre_tool_use,
fn _ctx -> {:deny, "blocked"} end,
%{tool_name: "Bash"})
Send an arbitrary JSON-RPC command to the Copilot CLI.
Examples
{:ok, result} = CopilotEx.send_command(session, "config.get", %{})
Send a raw control message. Delegates to native Copilot implementation.
@spec server_health(pid()) :: {:ok, %{ adapter: :copilot, health: :active_query | :connecting | :error | :initializing | :ready }}
Check server health. Maps to session health for Copilot.
Destroy the current native Copilot session.
Fetch native Copilot events/messages for the current session.
Query session info (adapter, session_id, model, etc.).
Examples
{:ok, info} = CopilotEx.session_info(session)
info.copilot_session_id
@spec set_max_thinking_tokens(pid(), pos_integer()) :: {:ok, %{max_thinking_tokens: pos_integer()}}
Set maximum thinking tokens via universal control.
@spec set_mcp_servers(pid(), [%{name: binary(), tools: [map()], version: binary()}]) :: {:error, :not_found} | {:ok, %{required(binary()) => binary()}}
Replace MCP server configurations.
Change the model at runtime during a session.
Examples
{:ok, _} = CopilotEx.set_model(session, "gpt-4o")
Change the permission mode at runtime via universal control.
Start a new Copilot CLI session.
Returns {:ok, pid} on success. The session process speaks
full bidirectional JSON-RPC 2.0 over Content-Length framed stdio.
Examples
{:ok, session} = CopilotEx.start_session(cli_path: "copilot")
{:ok, session} = CopilotEx.start_session(
cli_path: "copilot",
model: "gpt-4o",
permission_mode: "acceptEdits"
)
@spec stop(pid()) :: :ok
Gracefully stop a session, closing the CLI subprocess.
Stop a running agent task via universal task tracking.
@spec stream(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Non-raising variant of stream!/3.
@spec stream!(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields messages as they arrive.
Raises on errors. Uses Stream.resource/3 under the hood with
demand-driven backpressure — the gen_statem only delivers the
next message when the stream consumer requests it.
The query is dispatched to the CLI immediately when stream!/3
is called (not lazily on first consumption). Message consumption
is lazy/pull-based.
The stream halts automatically when a :result or terminal :error
message is received. Note: Copilot can emit non-terminal :error
messages (warnings), so the halt condition checks is_error: true
— unlike other adapters where all :error messages are terminal.
Examples
CopilotEx.stream!(session, "Explain GenServer")
|> Stream.filter(& &1.type == :text)
|> Enum.map(& &1.content)
|> Enum.join("")
# With options
CopilotEx.stream!(session, "Hello", %{timeout: 60_000})
|> Enum.to_list()
Submit feedback via universal feedback tracking.
@spec summarize_session(pid()) :: {:error, :not_found} | {:ok, summary_info()}
Generate and store a summary for the current session.
@spec summarize_session(pid(), map()) :: {:error, :not_found} | {:ok, summary_info()}
List available agents.
List available slash commands.
List available models.
@spec thread_archive(pid(), binary()) :: {:error, :not_found} | {:ok, thread_info()}
Archive a thread.
@spec thread_fork(pid(), binary()) :: {:error, :not_found} | {:ok, thread_info()}
Fork an existing thread.
@spec thread_fork(pid(), binary(), thread_start_opts()) :: {:error, :not_found} | {:ok, thread_info()}
@spec thread_list(pid()) :: {:ok, [thread_info()]}
List all threads for this session.
@spec thread_read(pid(), binary()) :: {:error, :not_found} | {:ok, thread_read_result()}
Read thread metadata, optionally including visible messages.
@spec thread_read(pid(), binary(), map()) :: {:error, :not_found} | {:ok, thread_read_result()}
@spec thread_resume(pid(), binary()) :: {:error, :not_found} | {:ok, thread_info()}
Resume an existing thread.
@spec thread_rollback(pid(), binary(), map()) :: {:error, :invalid_selector | :not_found} | {:ok, thread_info()}
Rollback the visible thread history.
@spec thread_start(pid(), thread_start_opts()) :: {:ok, thread_start_result()}
Start a new conversation thread.
@spec thread_unarchive(pid(), binary()) :: {:error, :not_found} | {:ok, thread_info()}
Unarchive a thread.
@spec todo_summary([todo_item()]) :: %{ :total => non_neg_integer(), required(atom()) => non_neg_integer() }
Get a summary of todo counts by status.
@spec toggle_mcp_server(pid(), binary(), boolean()) :: {:ok, %{required(<<_::48>>) => <<_::56>>}} | {:error, :not_found}
Enable or disable an MCP server.
Respond to an agent request via universal turn response.
@spec unrevert_session(pid()) :: {:error, :not_found} | {:ok, session_info()}
Clear any stored session revert state.
Get the working directory from the system init data.