Skip to main content
Version: v1.7.0

h2oGPTe REST API: Guide

Overview​

This guide shows how to call the h2oGPTe REST API directly using raw HTTP requests with Python's requests library (no SDK required). You run a complete workflow: check server health, create a collection, upload and ingest a document, run agent queries (streaming and non-streaming), inspect results, and clean up.

Prerequisites​

Before you begin, you need:

  • Python 3.x installed
  • The requests library (for example, python -m pip install requests)
  • An h2oGPTe global API key — see APIs to create one
note API key required

All API calls require a global API key passed as a Bearer token.

note API endpoints
  • API base URL: https://YOUR_H2OGPTE_URL/api/v1
  • OpenAPI spec: https://YOUR_H2OGPTE_URL/api-spec.yaml
  • Swagger UI: https://YOUR_H2OGPTE_URL/swagger-ui/

API reference at a glance​

Use this table to quickly find the detailed sections below.

CategoryMethodEndpointNotes
HealthGET/rpc/health/readinessReadiness probe
HealthGET/rpc/health/livenessLiveness probe
ModelsGET/api/v1/modelsList LLMs
Models (OpenAI)GET/openai_api/v1/modelsOpenAI-compatible list
CollectionsPOST/api/v1/collectionsCreate collection
CollectionsGET/api/v1/collectionsList collections
CollectionsGET/api/v1/collections/{id}Get single collection
CollectionsDELETE/api/v1/collections/{id}Delete collection
UploadsPUT/api/v1/uploadsUpload a file
IngestionPOST/api/v1/uploads/{id}/ingestIngest into collection
ChatPOST/api/v1/chatsCreate session
ChatGET/api/v1/chatsList sessions
ChatGET/api/v1/chats/{id}Get session detail
ChatDELETE/api/v1/chats/{id}Delete session
ChatGET/api/v1/chats/{id}/messagesGet messages
ChatGET/api/v1/chats/{id}/questionsSuggested follow-ups
CompletionsPOST/api/v1/chats/{id}/completionsQuery LLM / Agent
Completions (OpenAI)POST/openai_api/v1/chat/completionsOpenAI-compatible chat
MessagesGET/api/v1/messages/{id}/metaMessage metadata
MessagesGET/api/v1/messages/{id}/referencesRAG citations
Agent FilesGET/api/v1/chats/{id}/agent_server_filesList agent files
Agent FilesDELETE/api/v1/chats/{id}/agent_server_filesDelete agent files
Agent DirsGET/api/v1/chats/{id}/agent_server_directories/statsSession dir stats
Agent DirsGET/api/v1/chats/{id}/agent_server_directories/{name}/statsSingle dir stats
Agent DirsGET/api/v1/agents/directory_statsAll sessions dir stats
Agent ToolsGET/api/v1/agents/toolsList available tools
Agent ToolsGET/api/v1/agents/tool_preferenceGet tool preference
File DownloadGET/file?id={doc_id}&name={filename}Download agent file
Agent ToolsGET/api/v1/agents/custom_tools/{tool_id}Get single custom tool
Agent ToolsGET/api/v1/agents/custom_tools/{tool_id}/downloadDownload tool file
Memory BlocksPOST/api/v1/memory_blocksCreate memory block
Memory BlocksGET/api/v1/memory_blocksList memory blocks
Memory BlocksGET/api/v1/memory_blocks/{id}Get memory block
Memory BlocksPUT/api/v1/memory_blocks/{id}Update memory block
Memory BlocksDELETE/api/v1/memory_blocks/{id}Delete memory block
Memory BlocksPUT/api/v1/memory_blocks/{id}/permissions/{username}Share memory block
Memory BlocksDELETE/api/v1/memory_blocks/{id}/permissions/{username}Unshare memory block
Memory BlocksPUT/api/v1/memory_blocks/{id}/privacySet public/private
Memory BlocksGET/api/v1/users/current/default_memory_blocksGet defaults
Memory BlocksPUT/api/v1/users/current/default_memory_blocksSet default

Key agent llm_args parameters​

ParameterTypeDefaultDescription
use_agentboolfalseEnable AI agent with tool access
agent_accuracystring"standard"Effort: "quick" | "basic" | "standard" | "maximum"
agent_max_turnsint | "auto""auto"Max agent iterations
agent_toolsstring | list"auto"Tool selection: "auto", "all", or list of names
agent_typestring"auto"Agent type: "auto", "general", "task", "deep_research", "coder", "search"
agent_total_timeoutint3600Total timeout in seconds
agent_stream_filesbooltrueStream files as they are generated
temperaturefloat0.0LLM sampling temperature
max_new_tokensint1536Max output tokens
memory_block_idstring (UUID)nullAttach a memory block by ID
memory_block_namestringnullAttach a memory block by name

The generation approach is controlled by a top-level rag_config parameter in the request body:

ParameterTypeDefaultDescription
rag_config.rag_typestring"auto"Generation approach: "auto", "llm_only", "agent_only", "rag", "agentic_rag", "rlm_rag", "fast_agentic_rag", "hyde1", "hyde2", "rag+", "all_data"
note

The system automatically enables agent mode for agentic_rag, rlm_rag, and fast_agentic_rag. For agent_only, set "use_agent": true explicitly in llm_args. If omitted, the system silently falls back to rag mode. The RLM RAG example below demonstrates the full request structure. To use a different approach, swap the rag_type value; for agent_only, also add "use_agent": true to llm_args.

Setup and configuration​

Every request needs the base URL and an Authorization header. Set these once and reuse them throughout.

import requests
import json
import os
import time
import io
from pathlib import Path
from datetime import datetime
from urllib.parse import quote
from pprint import pprint

# Configuration
BASE_URL = "https://YOUR_H2OGPTE_URL" # no trailing slash
API_KEY = "YOUR_API_KEY"

API_V1 = f"{BASE_URL}/api/v1"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}

# Helper functions

def _handle(resp: requests.Response, label: str = "") -> dict:
"""Raise on HTTP error and return parsed JSON."""
if not resp.ok:
print(f"[{resp.status_code}] {label}: {resp.text[:400]}")
resp.raise_for_status()
try:
return resp.json()
except Exception:
return {"raw": resp.text}


def api_get(path, params=None):
"""GET {API_V1}/{path}"""
return _handle(
requests.get(f"{API_V1}/{path}", headers=HEADERS, params=params),
label=f"GET {path}"
)


def api_post(path, body=None, files=None, params=None):
"""POST {API_V1}/{path}"""
if files:
# multipart — drop Content-Type so requests sets boundary automatically
hdrs = {k: v for k, v in HEADERS.items() if k != "Content-Type"}
return _handle(
requests.post(f"{API_V1}/{path}", headers=hdrs, files=files, params=params),
label=f"POST {path}"
)
return _handle(
requests.post(f"{API_V1}/{path}", headers=HEADERS, json=body, params=params),
label=f"POST {path}"
)


def api_put(path, files=None, body=None, params=None):
"""PUT {API_V1}/{path}"""
if files:
hdrs = {k: v for k, v in HEADERS.items() if k != "Content-Type"}
return _handle(
requests.put(f"{API_V1}/{path}", headers=hdrs, files=files, params=params),
label=f"PUT {path}"
)
return _handle(
requests.put(f"{API_V1}/{path}", headers=HEADERS, json=body, params=params),
label=f"PUT {path}"
)


def api_delete(path, body=None, params=None):
"""DELETE {API_V1}/{path}"""
return _handle(
requests.delete(f"{API_V1}/{path}", headers=HEADERS, json=body, params=params),
label=f"DELETE {path}"
)


def api_patch(path, body=None):
"""PATCH {API_V1}/{path}"""
return _handle(
requests.patch(f"{API_V1}/{path}", headers=HEADERS, json=body),
label=f"PATCH {path}"
)

print(f"Configured: {API_V1}")

Check server health​

You can check server readiness using health probe endpoints outside of /api/v1.

resp = requests.get(f"{BASE_URL}/rpc/health/readiness", headers=HEADERS)
print(f"Health status: {resp.status_code} {resp.text.strip()[:120]}")

A 200 response means the server is ready to accept requests.

You can also check liveness:

resp = requests.get(f"{BASE_URL}/rpc/health/liveness", headers=HEADERS)
print(f"Liveness: {resp.status_code} {resp.text.strip()[:120]}")

List available LLMs​

GET /api/v1/models returns all language models currently loaded on the server.

models_resp = api_get("models")
models = models_resp if isinstance(models_resp, list) else models_resp.get("data", [])

print(f"Available models ({len(models)}):")
for m in models[:10]: # show first 10
name = m.get("name") or m.get("id") or str(m)
print(f" • {name}")

# Pick a default model (or use 'auto' for automatic routing)
DEFAULT_LLM = "auto"
if models:
first_model_name = models[0].get("name") or models[0].get("id")
print(f"\nUsing DEFAULT_LLM = '{DEFAULT_LLM}' (set to a specific model name if preferred)")

You can pass a specific model name in subsequent requests or use "auto" for automatic routing.

Manage collections​

Use a collection to group related documents for RAG (Retrieval-Augmented Generation) queries.

Create a collection​

collection_payload = {
"name": f"demo-agent-csv-{int(time.time())}",
"description": "Demo collection for REST API agent file demo",
}
coll = api_post("collections", body=collection_payload)
collection_id = coll["id"]
print(f"Created collection: {coll['name']} (id={collection_id})")

List collections​

cols = api_get("collections", params={"limit": 5, "sort_column": "updated_at", "ascending": False})
print(f"Your collections (most recent 5):")
for c in cols:
print(f" • [{c['id'][:8]}...] {c['name']} docs={c.get('document_count', 0)}")

Get a single collection​

coll_detail = api_get(f"collections/{collection_id}")
print(f"Collection detail:")
pprint({k: v for k, v in coll_detail.items() if k in [
"id", "name", "description", "document_count", "document_size", "updated_at"
]})

Upload and ingest documents​

To add a document to a collection, follow a two-step process:

  1. Upload the raw file bytes with PUT /api/v1/uploads to get an upload ID.
  2. Ingest the upload into a collection with POST /api/v1/uploads/{upload_id}/ingest.

Upload a file​

sample_csv_content = """product,category,revenue,units_sold,month
Widget A,Electronics,12500,250,January
Widget B,Electronics,8300,166,January
Gadget X,Accessories,4200,420,January
Widget A,Electronics,14200,284,February
"""

csv_filename = "sales_data.csv"
csv_bytes = sample_csv_content.encode("utf-8")
print(f"Created sample CSV ({len(csv_bytes)} bytes): {csv_filename}")
print(sample_csv_content)

upload_resp = api_put(
"uploads",
files={"file": (csv_filename, csv_bytes, "text/csv")},
)
upload_id = upload_resp["id"]
print(f"Uploaded: id={upload_id} filename={upload_resp['filename']}")

Ingest into the collection​

resp = requests.post(
f"{API_V1}/uploads/{upload_id}/ingest",
headers=HEADERS,
params={
"collection_id": collection_id,
"gen_doc_summaries": False,
"gen_doc_questions": False,
},
)
print(f"Ingest status: {resp.status_code}")
# 204 No Content = success

# Wait briefly for ingestion to complete
time.sleep(3)

# Verify document appeared in collection
coll_after = api_get(f"collections/{collection_id}")
print(f"Collection now has {coll_after.get('document_count', 0)} document(s)")

Create chat sessions​

Use a chat session to provide context for a conversation with an LLM, optionally backed by a collection for RAG. For pure agent tasks (no RAG), create a session without a collection.

Create a session (agent-only, no collection)​

session_resp = api_post("chats", body={})
session_id_agent = session_resp["id"]
print(f"Created agent-only session: {session_id_agent}")

Create a session with a collection (for RAG)​

session_rag_resp = requests.post(
f"{API_V1}/chats",
headers=HEADERS,
json={},
params={"collection_id": collection_id},
)
session_rag_resp.raise_for_status()
session_id_rag = session_rag_resp.json()["id"]
print(f"Created RAG session: {session_id_rag} (collection={collection_id[:8]}...)")

List chat sessions​

sessions = api_get("chats", params={"limit": 5})
print(f"Recent sessions ({len(sessions)}):")
for s in sessions:
print(f" • [{s['id']}] collection={str(s.get('collection_id','—'))[:12]} "
f"updated={s['updated_at'][:19]}")

Send an agent query (non-streaming)​

POST /api/v1/chats/{session_id}/completions

Key agent parameters in llm_args:

ParameterTypeDescription
use_agentboolEnable the AI agent
agent_accuracystringEffort level: "quick", "basic", "standard", "maximum"
agent_max_turnsint or "auto"Max agent iterations
agent_toolsstring or list"auto", "all", or list of specific tool names
agent_total_timeoutintWall-clock budget in seconds (default 3600)
agent_stream_filesboolWhether agent-generated files are streamed back
agent_prompt = (
"Analyze the sales data and perform the following tasks:\n"
"- Calculate total revenue and total units sold per product\n"
"- Calculate total revenue per category\n"
"- Find the best-selling product by revenue\n"
"- Save the summary as a CSV file named 'sales_summary.csv'\n"
"- Save a month-over-month revenue report as 'monthly_revenue.csv'\n"
"Provide a brief written summary of the findings."
)

completion_payload = {
"message": agent_prompt,
"llm": DEFAULT_LLM,
"stream": False,
"llm_args": {
"use_agent": True,
"agent_accuracy": "standard", # quick | basic | standard | maximum
"agent_max_turns": "auto", # or an integer
"agent_tools": "auto", # or a list of specific tool names
"agent_total_timeout": 300, # seconds
"agent_stream_files": True, # stream files as they are generated
"temperature": 0.0,
"max_new_tokens": 4096,
},
"rag_config": {
"rag_type": "llm_only" # no RAG for a pure agent task
},
"include_chat_history": "off",
}

print("Sending agent request (may take 30–120 s) …")
t0 = time.time()

resp = requests.post(
f"{API_V1}/chats/{session_id_agent}/completions",
headers=HEADERS,
json=completion_payload,
timeout=360,
)
resp.raise_for_status()
completion = resp.json()

message_id = completion["message_id"]
body = completion["body"]

print(f"\nCompleted in {time.time()-t0:.1f}s")
print(f"Message ID : {message_id}")
print(f"\n--- Agent Response ---\n")
print(body[:2000])

Send an agent query (streaming)​

To receive a streaming JSONL response, set "stream": true. Each line is a JSON object with body (incremental text) and finished (bool). The final message (finished: true) contains the message_id.

streaming_payload = {
"message": "Write a short Python script that generates fibonacci numbers up to 100 and saves them to 'fibonacci.csv'.",
"llm": DEFAULT_LLM,
"stream": True,
"llm_args": {
"use_agent": True,
"agent_accuracy": "quick",
"agent_total_timeout": 120,
"temperature": 0.0,
},
"rag_config": {"rag_type": "llm_only"},
"include_chat_history": "off",
}

stream_session = api_post("chats", body={})
stream_session_id = stream_session["id"]
print(f"Streaming session: {stream_session_id}")

print("\nStreaming response tokens as they arrive …\n")
full_response = ""
stream_msg_id = None
t0 = time.time()

with requests.post(
f"{API_V1}/chats/{stream_session_id}/completions",
headers=HEADERS,
json=streaming_payload,
stream=True,
timeout=180,
) as stream_resp:
stream_resp.raise_for_status()

for raw_line in stream_resp.iter_lines():
if not raw_line:
continue
line = raw_line if isinstance(raw_line, str) else raw_line.decode("utf-8")
try:
delta = json.loads(line)
except json.JSONDecodeError:
continue

if "error" in delta:
print(f"\n[Stream error] {delta['error']}")
break

chunk = delta.get("body", "")
full_response += chunk
print(chunk, end="", flush=True)

if delta.get("finished"):
stream_msg_id = delta.get("message_id")
break

print(f"\n\n--- Streaming complete in {time.time()-t0:.1f}s ---")
print(f"\n\nStream message_id: {stream_msg_id}")

Send an RLM RAG query (collection-backed)​

RLM RAG (Recursive Language Model) uses an agent that programmatically analyzes documents through Python code execution and follow-up LLM calls. Use RLM RAG when your question requires multi-step reasoning across documents.

RLM RAG uses the same /api/v1/chats/{id}/completions endpoint as other queries. Link the chat session to a collection before sending the request.

rlm_payload = {
"message": "Analyze the key financial trends across all quarterly reports and identify any inconsistencies.",
"llm": DEFAULT_LLM,
"stream": False, # set True for incremental output
"rag_config": {
"rag_type": "rlm_rag"
},
"llm_args": {
"temperature": 0.0,
},
}

print("Sending RLM RAG request (may take several minutes) …")
t0 = time.time()

resp = requests.post(
f"{API_V1}/chats/{session_id_rag}/completions",
headers=HEADERS,
json=rlm_payload,
timeout=600,
)
resp.raise_for_status()
completion = resp.json()

print(f"\nCompleted in {time.time()-t0:.1f}s")
print(f"Message ID : {completion['message_id']}")
print("\n--- RLM RAG Response ---\n")
print(completion["body"][:2000])
tip

RLM RAG requires a longer timeout than RAG queries. Set timeout to at least 600 seconds.

Other agent-backed RAG approaches

To use Agentic RAG or Fast Agentic RAG instead, swap the rag_type value in the examples above:

  • "agentic_rag" — gives the agent a document search tool for multiple search-and-analyze cycles across the collection.
  • "fast_agentic_rag" — pushes document contexts directly into the agent's system prompt for lower latency.

The rest of the request structure stays the same. The system auto-enables agent mode for all three approaches.

Python client library​

With the h2oGPTe Python client library, the equivalent call is:

# use_agent is auto-enabled for rlm_rag — no need to set it explicitly
with client.connect(session_id_rag) as session:
reply = session.query(
message="Analyze the key financial trends across all quarterly reports.",
llm=DEFAULT_LLM,
rag_config={"rag_type": "rlm_rag"},
timeout=600,
)

Control chat message generation​

While a chat query is actively generating a streaming response, you can control the generation state using the following endpoints:

  • Pause Generation: POST /api/v1/messages/{question_id}/pause Halts the message streaming temporarily. The stream can be resumed later.
  • Resume Generation: POST /api/v1/messages/{question_id}/resume Resumes a previously paused message stream.
  • Stop Generation: POST /api/v1/messages/{question_id}/stop Permanently cancels the message generation.
  • Finish Generation: POST /api/v1/messages/{question_id}/finish Signals the LLM to complete its current thought and finish naturally, providing a more coherent ending than an immediate stop.

A successful request to any of these endpoints returns a 204 No Content response.

Example:

# 1. Get the question_id of the currently generating message
# Fetch the recent history and isolate the most recent user prompt
messages = api_get(f"chats/{session_id}/messages", params={"offset": 0, "limit": 20})

# Filter for top-level questions (where reply_to is missing/null)
questions = [m for m in messages if not m.get("reply_to")]

if questions:
# Grab the most recent question's ID (assuming chronological order)
question_id = questions[-1]["id"]
else:
question_id = "fallback-id"

# ------------------------------------------------------------------
# 2. Use the retrieved question_id to control the active stream:
# ------------------------------------------------------------------

# Pause a streaming response
api_post(f"messages/{question_id}/pause")

# Resume it
api_post(f"messages/{question_id}/resume")

# Immediately stop the generation permanently
api_post(f"messages/{question_id}/stop")

# Signal the LLM to naturally complete its thought and finish
api_post(f"messages/{question_id}/finish")

Inspect chat message history​

Use GET /api/v1/chats/{session_id}/messages to retrieve all messages in a session. Messages without reply_to are user messages; those with reply_to are LLM responses.

messages = api_get(f"chats/{session_id_agent}/messages", params={"offset": 0, "limit": 20})

print(f"Messages in session (total shown: {len(messages)}):")
for msg in messages:
role = "USER" if not msg.get("reply_to") else "ASSISTANT"
content_preview = msg.get("content", "")[:120].replace("\n", " ")
has_refs = msg.get("has_references", False)
print(f" [{role}] id={msg['id'][:8]}... refs={has_refs}")
print(f" {content_preview!r}")
print()

Retrieve message metadata​

Use GET /api/v1/messages/{message_id}/meta?info_type=<type> to retrieve metadata attached to a specific message.

Common info_type values:

info_typeContent
usage_statsJSON: token counts and cost
prompt_rawText: final prompt sent to LLM

Agent session metadata info_type values:

info_typeContent
agent_filesJSON: [{doc_id: filename}, ...] — new files generated by the agent
agent_files_oldJSON: same format — files from earlier turns
agent_chat_historyJSON: full agent reasoning trace
agent_chat_history_mdMarkdown: human-readable agent trace
agent_analysisText: agent self-analysis
note

Some deployments accept additional agent_* metadata types that are not listed in the OpenAPI enum.

Get agent-generated file list​

resp = requests.get(
f"{API_V1}/messages/{message_id}/meta",
headers=HEADERS,
params={"info_type": "agent_files"},
)
resp.raise_for_status()
agent_files_meta = resp.json()

# Show raw metadata (truncated)
print(f"agent_files meta (raw): {json.dumps(agent_files_meta, indent=2)[:500]}")

# Parse the content — it's a JSON string of [{doc_id: filename}, ...]
agent_file_map = {}
if agent_files_meta:
try:
file_list = json.loads(agent_files_meta[0]["content"])
for entry in file_list:
agent_file_map.update(entry)
except (KeyError, json.JSONDecodeError) as e:
print(f"Could not parse agent_files content: {e}")

print(f"\nAgent-generated files ({len(agent_file_map)}):")
for doc_id, fname in agent_file_map.items():
print(f" • {fname} (doc_id={doc_id[:16]}...)")

Get usage stats​

resp = requests.get(
f"{API_V1}/messages/{message_id}/meta",
headers=HEADERS,
params={"info_type": "usage_stats"},
)
resp.raise_for_status()

if resp.json():
usage = json.loads(resp.json()[0]["content"])
pprint(usage)

Get agent reasoning trace​

resp = requests.get(
f"{API_V1}/messages/{message_id}/meta",
headers=HEADERS,
params={"info_type": "agent_chat_history_md"},
)
resp.raise_for_status()

if resp.json():
print(resp.json()[0]["content"][:1500])

List agent server files​

Use GET /api/v1/chats/{session_id}/agent_server_files to list all files the agent wrote to its working directory during this session.

Each AgentServerFile record includes: id, filename, bytes, created_at (Unix timestamp), purpose, and object.

server_files = api_get(f"chats/{session_id_agent}/agent_server_files")

print(f"Agent server files ({len(server_files)}):")
print("-" * 70)
for f in server_files:
created = datetime.fromtimestamp(f.get("created_at", 0)).strftime("%Y-%m-%d %H:%M:%S")
size_kb = f.get("bytes", 0) / 1024
print(f" filename : {f.get('filename')}")
print(f" id : {f.get('id')}")
print(f" size : {size_kb:.2f} KB ({f.get('bytes')} bytes)")
print(f" created : {created}")
print(f" purpose : {f.get('purpose')}")
print("-" * 70)

View agent directory statistics​

Use the following three endpoints to inspect the agent's working directory:

  • GET /api/v1/chats/{session_id}/agent_server_directories/stats — per-session directory stats
  • GET /api/v1/chats/{session_id}/agent_server_directories/{dir_name}/stats — single directory detail
  • GET /api/v1/agents/directory_stats — all sessions across the entire account

Per-session directory stats​

dir_stats = api_get(
f"chats/{session_id_agent}/agent_server_directories/stats",
params={"detail_level": 1},
)

print(f"Agent directory stats for session {session_id_agent[:8]}...:")
print(f"Number of directories: {len(dir_stats)}")
print()
for d in dir_stats:
print(f" Directory ID : {d.get('id')}")
print(f" Size : {d.get('size_human_readable', d.get('size_bytes', '?'))}")
print(f" Files : {d.get('file_count', 0)}")
print(f" Directories : {d.get('directory_count', 0)}")
print(f" Created : {d.get('created_date', '?')}")
print(f" Modified : {d.get('modified_date', '?')}")
print(f" Is empty : {d.get('is_empty', '?')}")
print(f" Top contents : {d.get('top_level_contents', [])}")
if d.get("files"):
print(f" Files list:")
for file_info in d["files"]:
print(f" • {file_info.get('name')} "
f"({file_info.get('size_human_readable', '?')}) "
f"modified={file_info.get('modified_date', '?')}")
print()

Use detail_level=1 to also get per-file metadata within each directory.

Stats for a specific directory​

if dir_stats:
first_dir_id = dir_stats[0].get("id")
if first_dir_id:
single_dir = api_get(
f"chats/{session_id_agent}/agent_server_directories/{quote(first_dir_id, safe='')}/stats",
params={"detail_level": 1},
)
print(f"Stats for directory '{first_dir_id}':")
pprint({k: v for k, v in single_dir.items() if k != "files"})
if single_dir.get("files"):
print(f"\nFiles in directory ({len(single_dir['files'])}):")
for fi in single_dir["files"]:
print(f" • {fi.get('name')} size={fi.get('size_human_readable', '?')} "
f"is_dir={fi.get('is_directory', False)}")
else:
print("No directories found — run the agent query cells first.")

Global agent directory stats​

global_stats = api_get("agents/directory_stats", params={"offset": 0, "limit": 10})

print(f"Global agent directory stats ({len(global_stats)} sessions):")
for session_entry in global_stats[:3]: # show first 3 sessions
sess_id = session_entry.get("agent_chat_session_id")
preview = session_entry.get("chat_preview", "")[:80]
dirs = session_entry.get("stats", [])
total_files = sum(d.get("file_count", 0) for d in dirs)
total_size = sum(d.get("size_bytes", 0) for d in dirs)
print(f" Session: {sess_id[:8]}...")
print(f" Preview: {preview!r}")
print(f" Total files: {total_files} Total size: {total_size/1024:.1f} KB")
print()

Download agent-generated files​

You can download agent files from h2oGPTe's object storage using:

GET /file?id={doc_id}&name={filename}
Authorization: Bearer YOUR_API_KEY

The doc_id comes from message metadata (info_type=agent_files).

caution

The /file endpoint is outside of /api/v1.

DOWNLOAD_DIR = Path("./agent_downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)

download_headers = {"Authorization": f"Bearer {API_KEY}"}

if not agent_file_map:
print("No agent files found in message metadata. Re-run sections 10–11 first.")
else:
print(f"Downloading {len(agent_file_map)} agent file(s) to {DOWNLOAD_DIR}/")
print()

for doc_id, filename in agent_file_map.items():
# Build download URL — note: /file is NOT under /api/v1
dl_url = f"{BASE_URL}/file?id={quote(doc_id, safe='')}&name={quote(filename, safe='')}"

dl_resp = requests.get(dl_url, headers=download_headers, timeout=60)

if dl_resp.ok:
safe_name = Path(filename).name # strip any path component for safety
out_path = DOWNLOAD_DIR / safe_name
with open(out_path, "wb") as fp:
fp.write(dl_resp.content)
size_kb = len(dl_resp.content) / 1024
print(f" ✓ Downloaded: {safe_name} ({size_kb:.2f} KB) → {out_path}")
else:
print(f" ✗ Failed to download {filename}: HTTP {dl_resp.status_code}")

Download custom tool files​

For a simpler approach using the Python SDK, see Download a custom tool file.

Get a custom agent tool​

Retrieve details for a single custom agent tool by its ID:

tool_id = "YOUR_TOOL_ID"
tool = api_get(f"agents/custom_tools/{tool_id}")
print(f"Tool: {tool.get('tool_name')} Type: {tool.get('tool_type')}")
print(f"File: {tool.get('file_name')}")

Download a custom agent tool file​

Download the source file (.py or .zip) associated with a custom agent tool. Only local_mcp, browser_action, and general_code tool types support file download.

GET /api/v1/agents/custom_tools/{tool_id}/download
Authorization: Bearer YOUR_API_KEY
tool_id = "YOUR_TOOL_ID"
dl_resp = requests.get(
f"{API_V1}/agents/custom_tools/{tool_id}/download",
headers=HEADERS,
timeout=60,
)

if dl_resp.ok:
cd = dl_resp.headers.get("Content-Disposition", "")
filename = cd.split("filename=")[-1].strip('"') if "filename=" in cd else "tool_file"
out_path = Path("./downloads") / filename
out_path.parent.mkdir(exist_ok=True)
with open(out_path, "wb") as f:
f.write(dl_resp.content)
print(f"Downloaded tool to: {out_path} ({len(dl_resp.content) / 1024:.1f} KB)")
else:
print(f"Download failed: HTTP {dl_resp.status_code}")
note

The download endpoint returns application/octet-stream with a Content-Disposition header containing the original filename. You can download only tools that you own.

Discover agent tools​

List all available agent tools​

tools = api_get("agents/tools")
print(f"Available agent tools ({len(tools)}):")
for tool in tools[:20]: # show first 20
name = tool.get("name") or tool.get("id")
desc = str(tool.get("description", ""))[:80]
print(f" • {name:<40} {desc}")

Get user's tool preference​

try:
tool_pref = api_get("agents/tool_preference")
print(f"Tool preferences ({len(tool_pref)}):")
for t in tool_pref:
print(f" • {t}")
except Exception as e:
print(f"Could not retrieve tool preference: {e}")

Use additional chat endpoints​

Get suggested follow-up questions​

try:
questions = api_get(
f"chats/{session_id_agent}/questions",
params={"limit": 5},
)
print(f"Suggested follow-up questions ({len(questions)}):")
for q in questions:
print(f" • {q.get('question')}")
except Exception as e:
print(f"Suggested questions not available: {e}")

Get session details​

sess_detail = api_get(f"chats/{session_id_agent}")
print("Agent session details:")
pprint({k: v for k, v in sess_detail.items() if k in [
"id", "name", "collection_id", "latest_message_content", "updated_at"
]})

Get message references (RAG citations)​

# Only populated for RAG sessions (collection-backed)
try:
references = api_get(f"messages/{message_id}/references")
print(f"References for message {message_id[:8]}...: ({len(references)})")
for ref in references[:3]:
print(f" • doc={ref.get('document_name')} score={ref.get('score'):.3f} pages={ref.get('pages')}")
except Exception as e:
print(f"References not available (expected for llm_only sessions): {e}")

Delete agent files​

Use DELETE /api/v1/chats/{session_id}/agent_server_files to remove all files the agent wrote to its working directory for this session.

files_before = api_get(f"chats/{session_id_agent}/agent_server_files")
print(f"Files before deletion: {len(files_before)}")
for f in files_before:
print(f" • {f.get('filename')} ({f.get('bytes')} bytes)")

del_resp = requests.delete(
f"{API_V1}/chats/{session_id_agent}/agent_server_files",
headers=HEADERS,
)
print(f"Delete status: {del_resp.status_code}")
# 200 = success, 204 = success (no content), 409 = conflict (deletion in progress)

# Verify deletion
files_after = api_get(f"chats/{session_id_agent}/agent_server_files")
print(f"Files after deletion: {len(files_after)}")

OpenAI-compatible API​

h2oGPTe also exposes OpenAI-compatible endpoints at /openai_api/v1/. You can use any OpenAI-compatible client library (including the standard OpenAI Python client) with these endpoints. For endpoint descriptions, request/response examples, and feature support tables, see OpenAI-compatible REST API.

Clean up resources​

Delete all the resources you created during this session.

sessions_to_delete = [
session_id_agent,
session_id_rag,
stream_session_id,
]

# Delete chat sessions
for sid in sessions_to_delete:
try:
r = requests.delete(
f"{API_V1}/chats/{sid}",
headers=HEADERS,
)
print(f"DELETE chat {sid[:8]}... → {r.status_code}")
except Exception as e:
print(f"Could not delete session {sid[:8]}...: {e}")

# Delete collection
try:
r = requests.delete(
f"{API_V1}/collections/{collection_id}",
headers=HEADERS,
)
print(f"DELETE collection {collection_id[:8]}... → {r.status_code}")
except Exception as e:
print(f"Could not delete collection: {e}")

print("\nCleanup complete.")
note

If you delete a resource that has already been deleted, the API returns 404. This is safe to ignore in cleanup scripts.

Manage memory blocks​

Memory blocks store persistent text that carries across chat sessions. For an overview, see Memory blocks.

Create a memory block​

data = {
"name": "Project Knowledge",
"content": "Project reference: PRJ-2024-0142\nBudget: $50,000",
"system_prompt_amendment": "Save key project decisions and milestones.",
"injection_mode": "system_prompt", # system_prompt | user_instruction | agent_file
"access_mode": "read_write", # read_write | read | write
"max_content_length": 10000, # 0 = unlimited
}
block = api_post("memory_blocks", data)
memory_block_id = block["id"]
print(f"Created memory block: {memory_block_id}")

Request body fields:

FieldTypeRequiredDefaultDescription
namestringYesN/AUnique per user.
contentstringNo""Initial text content.
system_prompt_amendmentstringNo""Instructions for how the LLM or agent uses the memory.
injection_modestringNo"system_prompt"system_prompt, user_instruction, or agent_file.
access_modestringNo"read_write"read, write, or read_write.
max_content_lengthintegerNo10000Maximum characters. 0 = unlimited.

Response: 201 Created with the MemoryBlock object.

List memory blocks​

blocks = api_get("memory_blocks", params={"limit": 10, "name_filter": "Project"})
for b in blocks["items"]:
print(f"{b['name']} (updated {b['updated_at']})")
print(f"Total: {blocks['total']}")

Query parameters:

ParameterTypeDefaultDescription
offsetinteger0Items to skip.
limitinteger100Maximum items to return.
name_filterstringN/ASubstring match on name.
updated_afterstring (ISO 8601)N/AReturn blocks updated after this timestamp.

Get a memory block​

block = api_get(f"memory_blocks/{memory_block_id}")
pprint(block)
tip

To look up by name, use the list endpoint with name_filter:

blocks = api_get("memory_blocks", params={"name_filter": "Project Knowledge", "limit": 1})
block = blocks["items"][0]

Update a memory block​

Include only the fields to change. Omitted fields remain unchanged.

api_put(f"memory_blocks/{memory_block_id}", {
"content": "Project reference: PRJ-2024-0142\nBudget: $75,000 (revised Q2)",
})

Delete a memory block​

Owner only.

api_delete(f"memory_blocks/{memory_block_id}")

Share a memory block​

Grant read and/or edit permissions to a user. Owner only.

api_put(f"memory_blocks/{memory_block_id}/permissions/bob", {
"permissions": ["h2ogpte/memory_block/read", "h2ogpte/memory_block/edit"],
})

Unshare a memory block​

api_delete(f"memory_blocks/{memory_block_id}/permissions/bob")

Set memory block privacy​

Toggle public visibility. Owner only.

api_put(f"memory_blocks/{memory_block_id}/privacy", {"is_public": True})

Get default memory blocks​

defaults = api_get("users/current/default_memory_blocks")
if defaults["defaults"]["llm"]:
print(f"Default LLM block: {defaults['defaults']['llm']['name']}")
if defaults["defaults"]["agent"]:
print(f"Default agent block: {defaults['defaults']['agent']['name']}")

Response:

FieldTypeDescription
defaults.llmMemoryBlock or nullDefault memory block for LLM chats.
defaults.agentMemoryBlock or nullDefault memory block for agent chats.

Set a default memory block​

One default per scope. Setting a new default replaces the previous one. Pass null to clear.

# Set a default for LLM chats
api_put("users/current/default_memory_blocks", {
"scope": "llm",
"memory_block_id": memory_block_id,
})

# Clear the default for agent chats
api_put("users/current/default_memory_blocks", {
"scope": "agent",
"memory_block_id": None,
})

Request body fields:

FieldTypeRequiredDescription
scopestringYesllm or agent.
memory_block_idstring (UUID) or nullNoMemory block to set as default. Omit the field or pass null to clear the current default.

Response: 200 OK with the scope and MemoryBlock object (or null if cleared).

Use a memory block in chat​

Pass memory_block_id or memory_block_name in llm_args:

completion = api_post(f"chats/{session_id}/completions", {
"message": "Summarize our project status.",
"llm_args": {"memory_block_id": memory_block_id},
})

With an agent:

completion = api_post(f"chats/{session_id}/completions", {
"message": "Analyze Q1 data and save key findings to memory.",
"llm_args": {
"memory_block_id": memory_block_id,
"use_agent": True,
"max_time": 90,
},
})

Memory block object​

FieldTypeDescription
idstring (UUID)Unique identifier.
namestringMemory block name.
contentstringText content.
system_prompt_amendmentstringSystem prompt amendment.
injection_modestringsystem_prompt, user_instruction, or agent_file.
access_modestringread, write, or read_write.
user_idstring (UUID)Owner ID.
usernamestringOwner username.
created_atstring (ISO 8601)Creation timestamp.
updated_atstring (ISO 8601)Last update timestamp.
is_publicbooleanPublicly accessible.
is_ownerbooleanAuthenticated user is the owner.
can_editbooleanAuthenticated user has edit permission.
max_content_lengthintegerMaximum characters. 0 = unlimited.

Memory block error responses​

Status codeDescription
401 UnauthorizedMissing or invalid API key.
403 ForbiddenInsufficient permission (for example, deleting a memory block you do not own).
404 Not FoundMemory block does not exist or is not accessible.
409 ConflictA memory block with the same name already exists for this user.

Resources​


Feedback