GeminiEx (beam_agent_ex v0.1.0)

Copy Markdown View Source

Elixir wrapper for the Gemini CLI agent SDK.

Provides idiomatic Elixir access to the Gemini CLI ACP transport. Sessions are persistent, multi-turn, and stream JSON-RPC notifications over the official Gemini CLI's ACP mode.

Quick Start

{:ok, session} = GeminiEx.start_session(cli_path: "gemini")
{:ok, messages} = GeminiEx.query(session, "What is 2+2?")
GeminiEx.stop(session)

Streaming

session
|> GeminiEx.stream!("Explain quantum computing")
|> Enum.each(&IO.inspect/1)

Hooks

hook = GeminiEx.sdk_hook(:post_tool_use, fn ctx ->
  IO.inspect(ctx, label: "tool used")
  :ok
end)
{:ok, session} = GeminiEx.start_session(cli_path: "gemini", sdk_hooks: [hook])

Summary

Functions

Abort the current query. Alias for interrupt/1.

Get account information.

List active beta features from the system init data.

Get the API key source from the system init data.

Convert a single content_block into a flat message.

Supervisor child specification for a gemini_cli_session process.

Get the CLI version from the system init data.

Run a command via universal command execution.

Get the current model from session info.

Get the current permission mode from session info.

Delete a session and its messages.

Extract all TodoWrite items from a list of messages.

Filter todo items by status.

Flatten an assistant message (with content_blocks) into individual messages.

Fork a tracked session into a new session ID.

Get session metadata by ID.

Get messages for a session.

Get messages with options.

Query session health.

Interrupt the current query.

List available agents from the system init data.

List configured MCP servers from the system init data.

List available plugins from the system init data.

List all tracked sessions.

List sessions with filters.

List available skills from the system init data.

List available tools from the system init data.

Create an in-process MCP server definition.

Get status of all MCP servers.

Create an in-process MCP tool definition.

Convert a single flat message into a content_block.

Convert a list of flat messages into content_block format.

Normalize a list of messages from any adapter into a uniform flat stream.

Get the output style from the system init data.

Send a query and collect all response messages (blocking).

Reconnect a failed MCP server.

Revert the visible session history to a prior boundary.

Revert file changes to a checkpoint via universal checkpointing.

Create an SDK lifecycle hook.

Create an SDK lifecycle hook with a matcher.

Send a raw control message via universal control dispatch.

Check server health. Maps to session health for Gemini.

Query session info.

Set maximum thinking tokens via universal control.

Replace MCP server configurations.

Change the model at runtime.

Change the permission mode at runtime via universal control.

Create or replace share state for the current session.

Start a persistent Gemini CLI ACP session.

Stop a session.

Stop a running agent task via universal task tracking.

Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.

Returns a Stream that yields messages as they arrive.

Submit feedback via universal feedback tracking.

Generate and store a summary for the current session.

List available agents.

List available slash commands.

List available models.

Fork an existing thread.

List all threads for this session.

Read thread metadata, optionally including visible messages.

Resume an existing thread.

Rollback the visible thread history.

Start a new conversation thread.

Get a summary of todo counts by status.

Enable or disable an MCP server.

Respond to an agent request via universal turn response.

Clear any stored session revert state.

Revoke share state for the current session.

Get the working directory from the system init data.

Functions

abort(session)

@spec abort(pid()) :: :ok | {:error, term()}

Abort the current query. Alias for interrupt/1.

account_info(session)

@spec account_info(pid()) :: {:ok, map()} | {:error, term()}

Get account information.

active_betas(session)

@spec active_betas(pid()) :: {:ok, list()} | {:error, term()}

List active beta features from the system init data.

api_key_source(session)

@spec api_key_source(pid()) :: {:ok, binary() | nil} | {:error, term()}

Get the API key source from the system init data.

block_to_message(block)

@spec block_to_message(content_block()) :: %{
  type: :raw | :text | :thinking | :tool_result | :tool_use,
  content: term(),
  raw: term(),
  tool_input: term(),
  tool_name: term(),
  tool_use_id: term()
}

Convert a single content_block into a flat message.

child_spec(opts)

@spec child_spec(keyword() | map()) :: Supervisor.child_spec()

Supervisor child specification for a gemini_cli_session process.

Accepts keyword list or map. Uses :session_id from opts as child id when available.

cli_version(session)

@spec cli_version(pid()) :: {:ok, binary() | nil} | {:error, term()}

Get the CLI version from the system init data.

command_run(session, command, opts \\ %{})

@spec command_run(pid(), binary(), map()) ::
  {:ok, %{exit_code: integer(), output: binary()}}
  | {:error,
     {:port_exit, term()} | {:port_failed, term()} | {:timeout, timeout()}}

Run a command via universal command execution.

current_model(session)

@spec current_model(pid()) :: {:ok, binary() | nil} | {:error, term()}

Get the current model from session info.

Extracts from the session's model field or system init data.

current_permission_mode(session)

@spec current_permission_mode(pid()) ::
  {:ok, atom() | binary() | nil} | {:error, term()}

Get the current permission mode from session info.

delete_session(session_id)

@spec delete_session(binary()) :: :ok

Delete a session and its messages.

extract_todos(messages)

@spec extract_todos([message_map()]) :: [todo_item()]

Extract all TodoWrite items from a list of messages.

filter_todos(todos, status)

Filter todo items by status.

flatten_assistant(message)

@spec flatten_assistant(map()) :: [map()]

Flatten an assistant message (with content_blocks) into individual messages.

fork_session(session, opts)

@spec fork_session(pid(), map()) :: {:ok, session_info_map()} | {:error, :not_found}

Fork a tracked session into a new session ID.

get_session(session_id)

@spec get_session(binary()) :: {:ok, session_info_map()} | {:error, :not_found}

Get session metadata by ID.

get_session_messages(session_id)

@spec get_session_messages(binary()) :: {:ok, [message_map()]} | {:error, :not_found}

Get messages for a session.

get_session_messages(session_id, opts)

@spec get_session_messages(binary(), message_filter_opts()) ::
  {:ok, [message_map()]} | {:error, :not_found}

Get messages with options.

health(session)

@spec health(pid()) :: atom()

Query session health.

interrupt(session)

@spec interrupt(pid()) :: :ok | {:error, term()}

Interrupt the current query.

list_agents(session)

@spec list_agents(pid()) :: {:ok, list()} | {:error, term()}

List available agents from the system init data.

list_mcp_servers(session)

@spec list_mcp_servers(pid()) :: {:ok, list()} | {:error, term()}

List configured MCP servers from the system init data.

list_plugins(session)

@spec list_plugins(pid()) :: {:ok, list()} | {:error, term()}

List available plugins from the system init data.

list_sessions()

@spec list_sessions() :: {:ok, [session_info_map()]}

List all tracked sessions.

list_sessions(opts)

@spec list_sessions(session_filter_opts()) :: {:ok, [session_info_map()]}

List sessions with filters.

list_skills(session)

@spec list_skills(pid()) :: {:ok, list()} | {:error, term()}

List available skills from the system init data.

list_tools(session)

@spec list_tools(pid()) :: {:ok, list()} | {:error, term()}

List available tools from the system init data.

mcp_server(name, tools)

@spec mcp_server(binary(), [
  %{
    description: binary(),
    handler: (map() -> {term(), term()}),
    input_schema: map(),
    name: binary()
  }
]) :: mcp_server_def()

Create an in-process MCP server definition.

mcp_server_status(session)

@spec mcp_server_status(pid()) :: {:ok, %{required(binary()) => map()}}

Get status of all MCP servers.

mcp_tool(name, description, input_schema, handler)

@spec mcp_tool(binary(), binary(), map(), (map() ->
                                       {:error, binary()} | {:ok, [map()]})) ::
  mcp_tool_def()

Create an in-process MCP tool definition.

message_to_block(message)

@spec message_to_block(map()) :: content_block()

Convert a single flat message into a content_block.

messages_to_blocks(messages)

@spec messages_to_blocks([map()]) :: [content_block()]

Convert a list of flat messages into content_block format.

normalize_messages(messages)

@spec normalize_messages([map()]) :: [map()]

Normalize a list of messages from any adapter into a uniform flat stream.

Claude produces assistant messages with nested content_blocks. All other adapters (including Gemini) produce individual typed messages. This function flattens both into a uniform stream where each message has a single, specific type — never nested content_blocks.

Examples

GeminiEx.normalize_messages(messages)
|> Enum.filter(& &1.type == :text)
|> Enum.map(& &1.content)
|> Enum.join("")

output_style(session)

@spec output_style(pid()) :: {:ok, binary() | nil} | {:error, term()}

Get the output style from the system init data.

query(session, prompt, params \\ %{})

@spec query(pid(), binary(), query_opts()) ::
  {:ok, [message_map()]} | {:error, term()}

Send a query and collect all response messages (blocking).

Returns {:ok, messages} where messages is a list of beam_agent_core message maps. Uses deadline-based timeout.

Options

  • :timeout - total query timeout in ms (default: 120_000)

reconnect_mcp_server(session, server_name)

@spec reconnect_mcp_server(pid(), binary()) ::
  {:ok, %{required(<<_::48>>) => <<_::88>>}} | {:error, :not_found}

Reconnect a failed MCP server.

revert_session(session, selector)

@spec revert_session(pid(), map()) ::
  {:ok, session_info_map()} | {:error, :invalid_selector | :not_found}

Revert the visible session history to a prior boundary.

rewind_files(session, checkpoint_uuid)

@spec rewind_files(pid(), binary()) ::
  :ok | {:error, :not_found | {:restore_failed, binary(), atom()}}

Revert file changes to a checkpoint via universal checkpointing.

sdk_hook(event, callback)

@spec sdk_hook(atom(), hook_callback()) :: %{event: atom(), callback: hook_callback()}

Create an SDK lifecycle hook.

sdk_hook(event, callback, matcher)

@spec sdk_hook(atom(), hook_callback(), %{tool_name: binary()}) :: %{
  event: atom(),
  callback: hook_callback(),
  matcher: %{tool_name: binary()},
  compiled_re: {:re_pattern, term(), term(), term(), term()}
}

Create an SDK lifecycle hook with a matcher.

send_control(session, method, params \\ %{})

@spec send_control(pid(), binary(), map()) :: {:ok, term()} | {:error, term()}

Send a raw control message via universal control dispatch.

server_health(session)

@spec server_health(pid()) ::
  {:ok,
   %{
     adapter: :gemini_cli,
     health: :active_query | :connecting | :error | :initializing | :ready
   }}

Check server health. Maps to session health for Gemini.

session_info(session)

@spec session_info(pid()) :: {:ok, map()} | {:error, term()}

Query session info.

set_max_thinking_tokens(session, max_tokens)

@spec set_max_thinking_tokens(pid(), pos_integer()) ::
  {:ok, %{max_thinking_tokens: pos_integer()}}

Set maximum thinking tokens via universal control.

set_mcp_servers(session, servers)

@spec set_mcp_servers(pid(), [%{name: binary(), tools: [map()], version: binary()}]) ::
  {:error, :not_found} | {:ok, %{required(binary()) => binary()}}

Replace MCP server configurations.

set_model(session, model)

@spec set_model(pid(), binary()) :: {:ok, binary()} | {:error, term()}

Change the model at runtime.

set_permission_mode(session, mode)

@spec set_permission_mode(pid(), binary()) :: {:ok, binary()} | {:error, term()}

Change the permission mode at runtime via universal control.

share_session(session)

@spec share_session(pid()) :: {:ok, share_info()} | {:error, :not_found}

Create or replace share state for the current session.

share_session(session, opts)

@spec share_session(pid(), map()) :: {:ok, share_info()} | {:error, :not_found}

start_session(opts)

@spec start_session(keyword() | map()) :: {:ok, pid()} | {:error, term()}

Start a persistent Gemini CLI ACP session.

stop(session)

@spec stop(pid()) :: :ok

Stop a session.

stop_task(session, task_id)

@spec stop_task(pid(), binary()) :: :ok | {:error, :not_found}

Stop a running agent task via universal task tracking.

stream(session, prompt, params \\ %{})

@spec stream(pid(), binary(), map()) :: Enumerable.t()

Returns a Stream that yields {:ok, msg} or {:error, reason} tuples.

Non-raising variant of stream!/3.

stream!(session, prompt, params \\ %{})

@spec stream!(pid(), binary(), map()) :: Enumerable.t()

Returns a Stream that yields messages as they arrive.

Raises on errors. Uses Stream.resource/3 under the hood.

The query is dispatched to the CLI immediately when stream!/3 is called. Message consumption is lazy/pull-based.

Example

session
|> GeminiEx.stream!("Explain OTP")
|> Enum.each(fn msg -> IO.puts(msg.content) end)

submit_feedback(session, feedback)

@spec submit_feedback(pid(), map()) :: :ok

Submit feedback via universal feedback tracking.

summarize_session(session)

@spec summarize_session(pid()) :: {:ok, summary_info()} | {:error, :not_found}

Generate and store a summary for the current session.

summarize_session(session, opts)

@spec summarize_session(pid(), map()) :: {:ok, summary_info()} | {:error, :not_found}

supported_agents(session)

@spec supported_agents(pid()) :: {:ok, list()} | {:error, term()}

List available agents.

supported_commands(session)

@spec supported_commands(pid()) :: {:ok, list()} | {:error, term()}

List available slash commands.

supported_models(session)

@spec supported_models(pid()) :: {:ok, list()} | {:error, term()}

List available models.

thread_archive(session, thread_id)

@spec thread_archive(pid(), binary()) :: {:ok, thread_info()} | {:error, :not_found}

Archive a thread.

thread_fork(session, thread_id)

@spec thread_fork(pid(), binary()) :: {:ok, thread_info()} | {:error, :not_found}

Fork an existing thread.

thread_fork(session, thread_id, opts)

@spec thread_fork(pid(), binary(), thread_opts()) ::
  {:ok, thread_info()} | {:error, :not_found}

thread_list(session)

@spec thread_list(pid()) :: {:ok, [thread_info()]}

List all threads for this session.

thread_read(session, thread_id)

@spec thread_read(pid(), binary()) ::
  {:ok, %{thread: thread_info(), messages: [map()]}} | {:error, :not_found}

Read thread metadata, optionally including visible messages.

thread_read(session, thread_id, opts)

@spec thread_read(pid(), binary(), map()) ::
  {:ok, %{thread: thread_info(), messages: [map()]}} | {:error, :not_found}

thread_resume(session, thread_id)

@spec thread_resume(pid(), binary()) :: {:ok, thread_info()} | {:error, :not_found}

Resume an existing thread.

thread_rollback(session, thread_id, selector)

@spec thread_rollback(pid(), binary(), map()) ::
  {:ok, thread_info()} | {:error, :invalid_selector | :not_found}

Rollback the visible thread history.

thread_start(session, opts \\ %{})

@spec thread_start(pid(), thread_opts()) ::
  {:ok,
   %{
     archived: false,
     created_at: integer(),
     message_count: 0,
     metadata: map(),
     name: binary(),
     session_id: binary(),
     status: :active,
     thread_id: binary(),
     updated_at: integer(),
     visible_message_count: 0,
     parent_thread_id: binary()
   }}

Start a new conversation thread.

thread_unarchive(session, thread_id)

@spec thread_unarchive(pid(), binary()) :: {:ok, thread_info()} | {:error, :not_found}

Unarchive a thread.

todo_summary(todos)

@spec todo_summary([todo_item()]) :: %{
  :total => non_neg_integer(),
  required(atom()) => non_neg_integer()
}

Get a summary of todo counts by status.

toggle_mcp_server(session, server_name, enabled)

@spec toggle_mcp_server(pid(), binary(), boolean()) ::
  {:ok, %{required(<<_::48>>) => <<_::56>>}} | {:error, :not_found}

Enable or disable an MCP server.

turn_respond(session, request_id, params)

@spec turn_respond(pid(), binary(), map()) ::
  :ok | {:error, :not_found | :already_resolved}

Respond to an agent request via universal turn response.

unrevert_session(session)

@spec unrevert_session(pid()) :: {:ok, session_info_map()} | {:error, :not_found}

Clear any stored session revert state.

unshare_session(session)

@spec unshare_session(pid()) :: :ok | {:error, :not_found}

Revoke share state for the current session.

working_directory(session)

@spec working_directory(pid()) :: {:ok, binary() | nil} | {:error, term()}

Get the working directory from the system init data.