Elixir wrapper for the Gemini CLI agent SDK.
Provides idiomatic Elixir access to the Gemini CLI ACP transport. Sessions are persistent, multi-turn, and stream JSON-RPC notifications over the official Gemini CLI's ACP mode.
Quick Start
{:ok, session} = GeminiEx.start_session(cli_path: "gemini")
{:ok, messages} = GeminiEx.query(session, "What is 2+2?")
GeminiEx.stop(session)Streaming
session
|> GeminiEx.stream!("Explain quantum computing")
|> Enum.each(&IO.inspect/1)Hooks
hook = GeminiEx.sdk_hook(:post_tool_use, fn ctx ->
IO.inspect(ctx, label: "tool used")
:ok
end)
{:ok, session} = GeminiEx.start_session(cli_path: "gemini", sdk_hooks: [hook])
Summary
Functions
Abort the current query. Alias for interrupt/1.
Get account information.
List active beta features from the system init data.
Get the API key source from the system init data.
Convert a single content_block into a flat message.
Supervisor child specification for a gemini_cli_session process.
Get the CLI version from the system init data.
Run a command via universal command execution.
Get the current model from session info.
Get the current permission mode from session info.
Delete a session and its messages.
Extract all TodoWrite items from a list of messages.
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
Fork a tracked session into a new session ID.
Get session metadata by ID.
Get messages for a session.
Get messages with options.
Query session health.
Interrupt the current query.
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
List all tracked sessions.
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
Create an in-process MCP server definition.
Get status of all MCP servers.
Create an in-process MCP tool definition.
Convert a single flat message into a content_block.
Convert a list of flat messages into content_block format.
Normalize a list of messages from any adapter into a uniform flat stream.
Get the output style from the system init data.
Send a query and collect all response messages (blocking).
Reconnect a failed MCP server.
Revert the visible session history to a prior boundary.
Revert file changes to a checkpoint via universal checkpointing.
Create an SDK lifecycle hook.
Create an SDK lifecycle hook with a matcher.
Send a raw control message via universal control dispatch.
Check server health. Maps to session health for Gemini.
Query session info.
Set maximum thinking tokens via universal control.
Replace MCP server configurations.
Change the model at runtime.
Change the permission mode at runtime via universal control.
Create or replace share state for the current session.
Start a persistent Gemini CLI ACP session.
Stop a session.
Stop a running agent task via universal task tracking.
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Returns a Stream that yields messages as they arrive.
Submit feedback via universal feedback tracking.
Generate and store a summary for the current session.
List available agents.
List available slash commands.
List available models.
Archive a thread.
Fork an existing thread.
List all threads for this session.
Read thread metadata, optionally including visible messages.
Resume an existing thread.
Rollback the visible thread history.
Start a new conversation thread.
Unarchive a thread.
Get a summary of todo counts by status.
Enable or disable an MCP server.
Respond to an agent request via universal turn response.
Clear any stored session revert state.
Revoke share state for the current session.
Get the working directory from the system init data.
Functions
Abort the current query. Alias for interrupt/1.
Get account information.
List active beta features from the system init data.
Get the API key source from the system init data.
@spec block_to_message(content_block()) :: %{ type: :raw | :text | :thinking | :tool_result | :tool_use, content: term(), raw: term(), tool_input: term(), tool_name: term(), tool_use_id: term() }
Convert a single content_block into a flat message.
@spec child_spec(keyword() | map()) :: Supervisor.child_spec()
Supervisor child specification for a gemini_cli_session process.
Accepts keyword list or map. Uses :session_id from opts as child id
when available.
Get the CLI version from the system init data.
@spec command_run(pid(), binary(), map()) :: {:ok, %{exit_code: integer(), output: binary()}} | {:error, {:port_exit, term()} | {:port_failed, term()} | {:timeout, timeout()}}
Run a command via universal command execution.
Get the current model from session info.
Extracts from the session's model field or system init data.
Get the current permission mode from session info.
@spec delete_session(binary()) :: :ok
Delete a session and its messages.
@spec extract_todos([message_map()]) :: [todo_item()]
Extract all TodoWrite items from a list of messages.
@spec filter_todos([BeamAgent.Todo.todo_item()], BeamAgent.Todo.todo_status()) :: [ BeamAgent.Todo.todo_item() ]
Filter todo items by status.
Flatten an assistant message (with content_blocks) into individual messages.
Fork a tracked session into a new session ID.
@spec get_session(binary()) :: {:ok, session_info_map()} | {:error, :not_found}
Get session metadata by ID.
@spec get_session_messages(binary()) :: {:ok, [message_map()]} | {:error, :not_found}
Get messages for a session.
@spec get_session_messages(binary(), message_filter_opts()) :: {:ok, [message_map()]} | {:error, :not_found}
Get messages with options.
Query session health.
Interrupt the current query.
List available agents from the system init data.
List configured MCP servers from the system init data.
List available plugins from the system init data.
@spec list_sessions() :: {:ok, [session_info_map()]}
List all tracked sessions.
@spec list_sessions(session_filter_opts()) :: {:ok, [session_info_map()]}
List sessions with filters.
List available skills from the system init data.
List available tools from the system init data.
@spec mcp_server(binary(), [ %{ description: binary(), handler: (map() -> {term(), term()}), input_schema: map(), name: binary() } ]) :: mcp_server_def()
Create an in-process MCP server definition.
Get status of all MCP servers.
@spec mcp_tool(binary(), binary(), map(), (map() -> {:error, binary()} | {:ok, [map()]})) :: mcp_tool_def()
Create an in-process MCP tool definition.
@spec message_to_block(map()) :: content_block()
Convert a single flat message into a content_block.
@spec messages_to_blocks([map()]) :: [content_block()]
Convert a list of flat messages into content_block format.
Normalize a list of messages from any adapter into a uniform flat stream.
Claude produces assistant messages with nested content_blocks.
All other adapters (including Gemini) produce individual typed messages.
This function flattens both into a uniform stream where each message has
a single, specific type — never nested content_blocks.
Examples
GeminiEx.normalize_messages(messages)
|> Enum.filter(& &1.type == :text)
|> Enum.map(& &1.content)
|> Enum.join("")
Get the output style from the system init data.
Send a query and collect all response messages (blocking).
Returns {:ok, messages} where messages is a list of beam_agent_core
message maps. Uses deadline-based timeout.
Options
:timeout- total query timeout in ms (default: 120_000)
@spec reconnect_mcp_server(pid(), binary()) :: {:ok, %{required(<<_::48>>) => <<_::88>>}} | {:error, :not_found}
Reconnect a failed MCP server.
@spec revert_session(pid(), map()) :: {:ok, session_info_map()} | {:error, :invalid_selector | :not_found}
Revert the visible session history to a prior boundary.
@spec rewind_files(pid(), binary()) :: :ok | {:error, :not_found | {:restore_failed, binary(), atom()}}
Revert file changes to a checkpoint via universal checkpointing.
Create an SDK lifecycle hook.
@spec sdk_hook(atom(), hook_callback(), %{tool_name: binary()}) :: %{ event: atom(), callback: hook_callback(), matcher: %{tool_name: binary()}, compiled_re: {:re_pattern, term(), term(), term(), term()} }
Create an SDK lifecycle hook with a matcher.
Send a raw control message via universal control dispatch.
@spec server_health(pid()) :: {:ok, %{ adapter: :gemini_cli, health: :active_query | :connecting | :error | :initializing | :ready }}
Check server health. Maps to session health for Gemini.
Query session info.
@spec set_max_thinking_tokens(pid(), pos_integer()) :: {:ok, %{max_thinking_tokens: pos_integer()}}
Set maximum thinking tokens via universal control.
@spec set_mcp_servers(pid(), [%{name: binary(), tools: [map()], version: binary()}]) :: {:error, :not_found} | {:ok, %{required(binary()) => binary()}}
Replace MCP server configurations.
Change the model at runtime.
Change the permission mode at runtime via universal control.
Start a persistent Gemini CLI ACP session.
@spec stop(pid()) :: :ok
Stop a session.
Stop a running agent task via universal task tracking.
@spec stream(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.
Non-raising variant of stream!/3.
@spec stream!(pid(), binary(), map()) :: Enumerable.t()
Returns a Stream that yields messages as they arrive.
Raises on errors. Uses Stream.resource/3 under the hood.
The query is dispatched to the CLI immediately when stream!/3
is called. Message consumption is lazy/pull-based.
Example
session
|> GeminiEx.stream!("Explain OTP")
|> Enum.each(fn msg -> IO.puts(msg.content) end)
Submit feedback via universal feedback tracking.
@spec summarize_session(pid()) :: {:ok, summary_info()} | {:error, :not_found}
Generate and store a summary for the current session.
List available agents.
List available slash commands.
List available models.
Archive a thread.
Fork an existing thread.
@spec thread_list(pid()) :: {:ok, [thread_info()]}
List all threads for this session.
@spec thread_read(pid(), binary()) :: {:ok, %{thread: thread_info(), messages: [map()]}} | {:error, :not_found}
Read thread metadata, optionally including visible messages.
Resume an existing thread.
@spec thread_rollback(pid(), binary(), map()) :: {:ok, thread_info()} | {:error, :invalid_selector | :not_found}
Rollback the visible thread history.
@spec thread_start(pid(), thread_opts()) :: {:ok, %{ archived: false, created_at: integer(), message_count: 0, metadata: map(), name: binary(), session_id: binary(), status: :active, thread_id: binary(), updated_at: integer(), visible_message_count: 0, parent_thread_id: binary() }}
Start a new conversation thread.
Unarchive a thread.
@spec todo_summary([todo_item()]) :: %{ :total => non_neg_integer(), required(atom()) => non_neg_integer() }
Get a summary of todo counts by status.
@spec toggle_mcp_server(pid(), binary(), boolean()) :: {:ok, %{required(<<_::48>>) => <<_::56>>}} | {:error, :not_found}
Enable or disable an MCP server.
Respond to an agent request via universal turn response.
@spec unrevert_session(pid()) :: {:ok, session_info_map()} | {:error, :not_found}
Clear any stored session revert state.
Get the working directory from the system init data.