Agent Basic Utilities
Object Storage
This document provides guidance on how to use the ObjectStoreAPI class to interact with an S3-compatible object storage backend (such as AWS S3, MinIO, Ceph, etc.). The class supports uploading and downloading files using various data formats (file path, bytes, or in-memory buffers).
Introduction
The ObjectStoreAPI class provides a simple and extensible interface to:
- Upload files to an S3-compatible bucket.
- Download files from a bucket to a path, a buffer, or directly as bytes.
It supports:
- Credentials and region-based authentication.
- Multiple input and output types.
- Logging for success and error cases.
This can be extended or used with AWS S3, or with custom object storage systems that implement the S3 API (e.g., MinIO).
Import
from utils.object_store import ObjectStoreAPI
Usage
Configuration
config = {
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-key",
"region_name": "your-region"
}
Initialize the client
store = ObjectStoreAPI(config)
Upload file from file path
store.upload_file(bucket_name="my-bucket", object_key="my-folder/my-file.txt", data="/path/to/local/file.txt")
Upload file from bytes
binary_data = b"Hello, world!"
store.upload_file(bucket_name="my-bucket", object_key="data/hello.txt", data=binary_data)
Upload file from a BytesIO buffer
import io
buffer = io.BytesIO()
buffer.write(b"Buffered content")
buffer.seek(0)
store.upload_file(bucket_name="my-bucket", object_key="buffer/file.txt", data=buffer)
Download file to local path
store.download_file(bucket_name="my-bucket", object_key="my-folder/my-file.txt", destination="/path/to/save/file.txt")
Download file to a BytesIO buffer
import io
output_buffer = io.BytesIO()
store.download_file(bucket_name="my-bucket", object_key="data/hello.txt", destination=output_buffer)
print(output_buffer.getvalue())
Download file and get content as bytes
data = store.download_file(bucket_name="my-bucket", object_key="data/hello.txt")
print(data.decode())
Custom Object Storage
To use a custom object storage system like MinIO or Ceph:
- Ensure it supports the S3 API.
- Replace the configuration:
config = {
"aws_access_key_id": "your-key",
"aws_secret_access_key": "your-secret",
"region_name": "us-east-1"
}
# If using a custom endpoint:
import boto3
store = ObjectStoreAPI(config)
store.s3_client = boto3.client(
's3',
endpoint_url="https://your-custom-object-store.local",
aws_access_key_id=config["aws_access_key_id"],
aws_secret_access_key=config["aws_secret_access_key"],
region_name=config["region_name"]
)
This allows you to seamlessly use the same ObjectStoreAPI with private or self-hosted storage systems.
Agent Application Metrics
This document describes the AgentSpaceMetrics class, which provides a unified interface to collect, expose, and push runtime metrics in agent-based applications. It supports Prometheus-compatible counters, gauges, and histograms, and streams these metrics both via HTTP and Redis.
Introduction
AgentSpaceMetrics enables:
- Prometheus-compatible metrics registration and management.
- Push-based metric streaming to Redis for centralized collection.
- Custom labeling with
subjectId,instanceId, andnodeId. - Built-in exporter via HTTP for Prometheus scraping.
It is designed for AI agents or modular services operating in distributed systems where real-time and historical visibility into system behavior is required.
Importing
from agent_sdk.metrics import AgentSpaceMetrics
Metric Types
| Metric Type | Description | Use Case |
|---|---|---|
| Counter | A cumulative metric that increases over time. It cannot be decreased. | Request counts, error events, retries. |
| Gauge | A metric that represents a value that can go up and down. | Active connections, memory usage, queue length. |
| Histogram | A metric that samples observations into configurable buckets, and provides count, sum, and distribution. | Response latency, payload size, execution duration. |
Usage Examples
1. Initialize
metrics = AgentSpaceMetrics(subject_id="subject-xyz")
If subject_id is not provided, it will use:
SUBJECT_IDfrom environment variables (default:default_subject)INSTANCE_IDfrom environment variables (default:executor)- Redis host from
METRICS_REDIS_HOST(default:localhost)
2. Register Metrics
metrics.register_counter("block_executions", "Number of executions", labelnames=["blockId"])
metrics.register_gauge("active_sessions", "Active user sessions")
metrics.register_histogram("execution_latency", "Execution latency in seconds", buckets=[0.1, 0.5, 1, 2, 5])
3. Update Metrics
metrics.increment_counter("block_executions", {"blockId": "abc123"})
metrics.set_gauge("active_sessions", 7)
metrics.observe_histogram("execution_latency", 0.9)
Each metric automatically includes the following labels:
| Label | Source | Description |
|---|---|---|
subjectId |
arg / SUBJECT_ID |
Identifier for the agent subject |
instanceId |
INSTANCE_ID env |
Unique name of the running agent |
nodeId |
detect_node_id() |
Identifier of the physical/logical node |
Server
Start Prometheus Exporter + Redis Writer
metrics.start_http_server(port=8889)
This performs:
- Starting a Prometheus-compatible HTTP endpoint at
http://<host>:8889/metrics - Launching a background Redis writer thread that pushes current metrics every 5 seconds to:
Redis List: NODE_METRICS
Each Redis entry is a JSON string with this structure:
{
"block_executions": {
"sdk.instances.adhoc.block_executions_total": 5.0
},
"active_sessions": {
"sdk.instances.adhoc.active_sessions": 3.0
},
"execution_latency": {
"sdk.instances.adhoc.execution_latency_bucket": 2.0,
...
},
"subjectId": "subject-xyz",
"instanceId": "executor",
"nodeId": "agent-node-123"
}
Summary
| Feature | Description |
|---|---|
| Prometheus Metrics | Exposed via HTTP endpoint |
| Redis Integration | Streams labeled metrics to NODE_METRICS queue |
| Auto Labeling | Adds subjectId, instanceId, nodeId to every sample |
| Threaded Execution | Non-blocking exporter and writer threads |
This utility provides a production-grade monitoring layer for agent systems and works seamlessly with Prometheus, Grafana, and Redis-based observability stacks.
Here's a more comprehensive version of the documentation for your graph database framework, including:
- A summary of methods under each client class
- A detailed example of porting a custom backend (Neo4j)
- Maintains structure and extensibility
Graph Database Integration โ agent_sdk.graphs
This module provides a modular and extensible interface for working with graph databases. It supports:
- Remote Graph DBs like JanusGraph and ArangoDB
- Local in-memory graph engine for testing or transient execution
- A common abstract base class to enable custom database backends
Using Remote DBs
Remote DB clients are imported from:
from agent_sdk.graphs.db import JanusGraphDBClient, ArangoDBClient
JanusGraphDBClient
Connects via: Gremlin WebSocket Target DB: JanusGraph, Apache TinkerPop-compatible stores
Initialization
client = JanusGraphDBClient(host="localhost", port=8182)
Methods
| Method | Description |
|---|---|
add_vertex(label, properties) |
Add a vertex with given label and properties |
add_edge(out_v, in_v, label, properties) |
Add an edge between two vertices |
get_vertex_by_id(vertex_id) |
Get vertex by ID |
get_edge_by_id(edge_id) |
Get edge by ID |
delete_vertex(vertex_id) |
Remove vertex from the graph |
delete_edge(edge_id) |
Remove edge from the graph |
execute_query(query, bindings=None) |
Execute custom Gremlin query |
close() |
Close connection |
ArangoDBClient
Connects via: HTTP Target DB: ArangoDB (Multi-model DB with AQL)
Initialization
client = ArangoDBClient(
host="localhost", port=8529,
username="root", password="openSesame",
db_name="graph_db"
)
Methods
| Method | Description |
|---|---|
add_document(collection, document) |
Add document to collection |
get_document(collection, doc_id) |
Get document by ID |
update_document(collection, doc_id, update_fields) |
Update document fields |
delete_document(collection, doc_id) |
Delete document |
execute_query(query, bind_vars=None) |
Execute AQL query |
close() |
Close connection |
Using Local In-Memory Graph
Imported from:
from agent_sdk.graphs.local import InMemoryGraphDB
This client offers lightweight, dependency-free graph functionality, perfect for testing, simulations, or agents without external storage access.
Initialization
graph = InMemoryGraphDB(directed=True)
Methods
| Method | Description |
|---|---|
add_vertex(vertex) |
Add vertex to the graph |
add_edge(src, dest, **properties) |
Add edge (with optional properties) |
remove_vertex(vertex) |
Delete a vertex and all connected edges |
remove_edge(src, dest) |
Delete edge between two nodes |
find_edges_by_property(**filters) |
Query edges by property match |
find_edges_between(src, dest) |
Find edges between two nodes |
dfs(start_vertex) |
Depth-first traversal |
bfs(start_vertex) |
Breadth-first traversal |
find_path(start_vertex, end_vertex) |
Returns one path (if exists) between two nodes |
__str__() |
String representation of graph |
Porting and Using Custom Graph DBs
All custom graph databases must inherit from:
from agent_sdk.graphs.abs import GraphDBInterface
Abstract Interface
class GraphDBInterface(ABC):
def add_vertex(...): pass
def add_edge(...): pass
def get_vertex_by_id(...): pass
def get_edge_by_id(...): pass
def delete_vertex(...): pass
def delete_edge(...): pass
def execute_query(...): pass
def close(): pass
Each method must be implemented to conform to your backend.
Example: Porting Neo4j
Hereโs how to port Neo4j using the neo4j Python driver:
Installation
pip install neo4j
Implementation
from neo4j import GraphDatabase
from agent_sdk.graphs.abs import GraphDBInterface
class Neo4jDBClient(GraphDBInterface):
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def _run(self, query, parameters=None):
with self.driver.session() as session:
result = session.run(query, parameters or {})
return [record.data() for record in result]
def add_vertex(self, label, properties):
props = ', '.join(f"{k}: ${k}" for k in properties)
query = f"CREATE (n:{label} {{{props}}}) RETURN n"
return self._run(query, properties)
def add_edge(self, out_v, in_v, label, properties=None):
props = ', '.join(f"{k}: ${k}" for k in (properties or {}))
query = (
f"MATCH (a), (b) WHERE id(a)=$out_id AND id(b)=$in_id "
f"CREATE (a)-[r:{label} {{{props}}}]->(b) RETURN r"
)
return self._run(query, {**(properties or {}), "out_id": int(out_v), "in_id": int(in_v)})
def get_vertex_by_id(self, vertex_id):
return self._run("MATCH (n) WHERE id(n)=$id RETURN n", {"id": int(vertex_id)})
def get_edge_by_id(self, edge_id):
return self._run("MATCH ()-[r]->() WHERE id(r)=$id RETURN r", {"id": int(edge_id)})
def delete_vertex(self, vertex_id):
return self._run("MATCH (n) WHERE id(n)=$id DETACH DELETE n", {"id": int(vertex_id)})
def delete_edge(self, edge_id):
return self._run("MATCH ()-[r]->() WHERE id(r)=$id DELETE r", {"id": int(edge_id)})
def execute_query(self, query, bindings=None):
return self._run(query, bindings)
Usage
neo_client = Neo4jDBClient(uri="bolt://localhost:7687", user="neo4j", password="password")
neo_client.add_vertex("Person", {"name": "Neo"})
neo_client.close()
Agent Config Store โ agent_sdk.config_store
The Agent Config Store provides a modular configuration management system for agent-based applications. It allows developers to register configurable modules, query/update parameters, and execute module-specific commands through a simple REST API.
This is ideal for agents that need dynamic configurability during runtime.
Introduction
Modern AI agents and services often require fine-grained, runtime-tunable configuration โ for example, thresholds, operational modes, toggles, or restart/reset hooks.
This module helps expose and manage these configurations via:
- Module registration
- Key-value based config management
- Command execution (e.g., reset, print, toggle)
- RESTful API on port 7777
Importing
from agent_sdk.config_store import AgentConfigManager, start_server
Config Store Functions
Class: AgentConfigManager
This class manages all registered modules and provides helper methods to interact with them.
Initialization
manager = AgentConfigManager()
Methods
| Method | Description |
|---|---|
register(module_name, module_object) |
Register a module with the config manager |
unregister(module_name) |
Unregister a module |
get(module_name, key) |
Retrieve a keyโs value from the module |
set(module_name, key, value) |
Set a keyโs value inside the module |
list() |
List all modules and their available config/commands |
list_commands(module_name) |
List commands supported by the module |
execute_config_command(module_name, command, command_data) |
Execute a config command with input |
Registering Modules
To register a module, it must implement the following methods:
- get(key: str) -> Any
- set(key: str, value: Any) -> None
- list() -> List[str]
- list_commands() -> List[str]
- execute_config_command(command: str, command_data: dict) -> Any
Example
class ExampleModule:
def __init__(self):
self.params = {"mode": "auto", "threshold": 0.75}
def get(self, key):
return self.params[key]
def set(self, key, value):
self.params[key] = value
def list(self):
return list(self.params.keys())
def list_commands(self):
return ["reset", "print_config"]
def execute_config_command(self, command, command_data):
if command == "reset":
self.params = {"mode": "auto", "threshold": 0.75}
return {"status": "reset"}
elif command == "print_config":
return {"config": self.params}
raise ValueError("Invalid command")
manager = AgentConfigManager()
manager.register("example", ExampleModule())
Starting the Config Server
from agent_sdk.config_store import start_server
start_server(port=7777)
This launches a Flask app on a background thread, making the REST APIs available at http://localhost:7777.
API Endpoints and Usage
Each API returns a JSON response with success: true/false and optionally data or message.
POST /execute
Executes a configuration command on the specified module.
Request body:
{
"module_name": "example",
"command": "reset",
"command_data": {}
}
Curl example:
curl -X POST http://localhost:7777/execute \
-H "Content-Type: application/json" \
-d '{"module_name": "example", "command": "reset", "command_data": {}}'
GET /get/<module_name>/<key>
Retrieves a config keyโs value.
Curl example:
curl http://localhost:7777/get/example/mode
POST /set/<module_name>/<key>/<value>
Sets a config key to a new value.
Curl example:
curl -X POST http://localhost:7777/set/example/mode/manual
GET /list
Lists all registered modules along with their parameters and commands.
Curl example:
curl http://localhost:7777/list
GET /list_commands/<module_name>
Lists all supported commands for a given module.
Curl example:
curl http://localhost:7777/list_commands/example
Usage Example
from agent_sdk.config_store import AgentConfigManager, start_server
class ThresholdModule:
def __init__(self):
self.config = {"threshold": 0.5, "enabled": True}
def get(self, key):
return self.config[key]
def set(self, key, value):
self.config[key] = value
def list(self):
return list(self.config.keys())
def list_commands(self):
return ["toggle", "status"]
def execute_config_command(self, command, command_data):
if command == "toggle":
self.config["enabled"] = not self.config["enabled"]
return {"enabled": self.config["enabled"]}
elif command == "status":
return self.config
else:
raise ValueError("Invalid command")
manager = AgentConfigManager()
manager.register("threshold", ThresholdModule())
start_server(port=7777)
Once the server starts, you can now interact with this module using REST APIs.
Known Subjects Registry
Introduction
The Known Subjects Registry is a dynamic agent registry that manages and tracks runtime subjects (agents, processors, components, etc.) in an intelligent system. Each subject is a versioned, semantically described runtime unit that can be discovered and invoked.
This registry supports:
- Fetching and maintaining runtime metadata from a centralized REST API
- Registering agents by name and subject ID
- Querying and listing all available runtime subjects
- Providing LLM-compatible searchable representations for intelligent planners
Each subject includes details like its version, tags, role, supported DSLs, and runtime traits, making it highly suitable for use in autonomous workflows and agent orchestration.
Usage
Initialize the Registry
from agent_sdk.known_subjects import KnownSubjects
known_subjects = KnownSubjects(base_url="http://runtime-db-service")
Add an Agent
# subject_id = "block.image_processor:1.0-stable"
known_subjects.add_agent("image_agent", subject_id)
Remove an Agent
known_subjects.remove_agent("image_agent")
List Registered Agents
agents = known_subjects.list_agents()
for name, subject in agents.items():
print(f"{name}: {subject.subject_description}")
Query All Runtime Subjects (from backend)
all_subjects = known_subjects.list_runtime_subjects()
Run a Custom Query
query = {"subject_type": "block", "subject_search_tags": ["text", "nlp"]}
results = known_subjects.query_runtime_subjects(query)
Searchable Representation
The method get_searchable_representation() returns a list of structured records that describe each known agent. This is particularly useful for use in LLM-based planners and agent selectors.
searchable = known_subjects.get_searchable_representation()
Each entry includes:
| Field | Description |
|---|---|
id |
Unique subject identifier (type.name:version-tag) |
name |
Subject name |
type |
Subject type (e.g., block, processor, etc.) |
description |
Human-readable description of what the subject does |
tags |
Semantic search tags |
traits |
Execution/environmental traits (e.g., gpu, batching) |
role |
Subject's system role (generator, router, evaluator, etc.) |
schema |
DSL input schema (if defined) |
agent_name |
Name used in the registry (user-defined alias) |
This output can be directly used by an LLM planner to select appropriate agents for downstream tasks.
Example
# Initialize the registry
known_subjects = KnownSubjects("http://runtime-db-service")
# Register an agent by its unique subject_id
known_subjects.add_agent("text_summarizer", "block.summarizer:1.0-stable")
# View registered agents
print("Registered Agents:")
for name, agent in known_subjects.list_agents().items():
print(f"{name} -> {agent.subject_description}")
# Get structured view for LLM integration
llm_input = known_subjects.get_searchable_representation()
for entry in llm_input:
print(f"Agent {entry['agent_name']} - Role: {entry['role']} - Tags: {entry['tags']}")
Agent Chat Server
Introduction
The Agents Chat Server is a WebSocket-based real-time messaging server that enables multi-user, session-specific chat functionality. It supports persistent storage via a pluggable cache backend (context_cache) and real-time message broadcasting to all connected clients in a session. The server integrates with a ChatManager that handles message storage, retrieval, filtering, and broadcast logic.
Importing
To import the required modules:
from agents_sdk.chat import start_chat_server
from agents_sdk.chat.utils.chat import ChatManager
Usage Guide
1. Initialize the Chat Manager
chat_manager = ChatManager()
Optionally, pass a cache backend that supports .GET(key, namespace) and .SET(key, value, namespace).
2. Start the Chat Server
start_chat_server(chat_manager, host="0.0.0.0", port=8765)
This launches the WebSocket server in a daemon thread at ws://0.0.0.0:8765.
3. WebSocket Client Communication
Clients must connect to the server and send messages using JSON format:
{
"session_id": "abc123",
"role": "user",
"content": "Hello, agent!"
}
The server will respond with the same data format after storing the message.
4. Server-Initiated Message Broadcast
You can send a message from the server to all clients in a session:
chat_manager.send_message("abc123", "System broadcast message.")
To enable this, you must set the WebSocketServer as the sender after startup:
# Optional: register sender if you want server-initiated messages
server = WebSocketServer(chat_manager)
chat_manager.set_sender(server)
Full Example
from agents_sdk.chat import start_chat_server, WebSocketServer
from agents_sdk.chat.utils.chat import ChatManager
import time
# Step 1: Create ChatManager
chat_manager = ChatManager()
# Step 2: Start chat server in background thread
start_chat_server(chat_manager)
# Optional: Allow time for the server to boot
time.sleep(2)
# Step 3: Send a message to connected clients
chat_manager.send_message("abc123", "Welcome to the session!")
WebSocketServer API
| Method | Description |
|---|---|
start() |
Starts the WebSocket server on the specified host and port. |
send_message(session_id, message) |
Broadcasts a message to all clients connected to the session. |
handler(websocket, path) |
Internal coroutine for handling client messages. |
ChatManager API
| Method | Description |
|---|---|
add_message(session_id, role, content, timestamp) |
Appends a message to session history. |
list_messages(session_id) |
Lists all messages in a session. |
get_message(session_id, index) |
Returns message at specified index. |
delete_message(session_id, index) |
Deletes message at index. |
get_user_messages(session_id) |
Filters messages with role "user". |
get_system_messages(session_id) |
Filters messages with role "system". |
get_messages_by_timestamp(session_id, start, end) |
Filters messages within the timestamp range. |
set_sender(sender) |
Registers a sender for system messages. |
send_message(session_id, content) |
Sends system message to all clients in the session. |
Notes
- Clients must handle WebSocket reconnects and JSON parsing.
- Extend
ChatManagerto integrate audit logs or external databases. - To use Redis as
context_cache, implement an object withGETandSETmethods per session.
Agent P2P Module
Introduction
The Agent P2P module enables decentralized communication between AI agents using WebSocket and HTTP-based messaging. It provides abstractions for sending messages, dispatching API calls, exchanging files, and remote task execution through a common interface.
This module is designed to support real-time agent-to-agent coordination in a distributed AI system, where each agent can expose WebSocket and REST endpoints for peer communication.
Importing
To use the module, import everything from:
from agent_sdk.p2p import *
This gives you access to all core classes, methods, and helper functions required to initiate peer-to-peer messaging.
Usage Guide
1. Start P2P Servers
Each agent should start its WebSocket and API servers to receive tasks and messages:
start_p2p_servers(
api_port=6666,
ws_host="0.0.0.0",
ws_port=8766,
max_threads=4,
handler_function=my_task_handler # Optional callback
)
2. Send and Receive Messages
p2p = P2PManager()
response = p2p.send_recv("agent_123", {"type": "ping"})
print(response)
3. Send API Requests
data = {"task": "classify", "payload": {...}}
response = p2p.send_api("agent_123", data)
4. Send Files
file = P2PFile("image.jpg", open("image.jpg", "rb").read())
response = p2p.send_api("agent_123", {"meta": "image"}, files={"file": file.file})
5. Estimation API Usage
estimator = AgentEstimator(base_url="http://agent_123.local:6666")
result = estimator.estimate(query={"task": "search", "keywords": ["AI", "agents"]})
Estimator API
The AgentEstimator provides a wrapper around a standard REST endpoint (/estimate) exposed by agents.
Example:
estimator = AgentEstimator(base_url="http://localhost:6666")
result = estimator.estimate(query={"input": "some data"})
Response:
Returns:
dataifsuccess: true- Raises exception otherwise
Classes and Methods
AgentEstimator
| Method | Description |
|---|---|
__init__(base_url) |
Initializes with the base API URL of the remote agent. |
estimate(query) |
Sends a POST request to /estimate endpoint. Returns result if success is true. |
P2PFile
| Attribute | Description |
|---|---|
filename |
Name of the file |
file |
A BytesIO buffer holding file content |
| Method | Description |
|---|---|
__init__(filename, source) |
Accepts bytes, BytesIO, or file path and loads it into memory. |
P2PManager
Provides methods for agent-to-agent messaging.
| Method | Description |
|---|---|
send_recv(subject_id, message) |
Sends a message and waits for response (WebSocket). |
send(subject_id, message) |
Sends a fire-and-forget message (WebSocket). |
send_api(subject_id, data, files=None) |
Sends an API request (HTTP). |
estimate(subject_id, query) |
Sends a query to the estimation endpoint of another agent. |
start_p2p_servers(...)
| Parameter | Description |
|---|---|
api_port |
Port on which REST API server listens |
ws_host |
WebSocket server host (usually "0.0.0.0" or "localhost") |
ws_port |
WebSocket server port |
max_threads |
Max number of threads to handle tasks |
handler_function |
Optional task handler callback |
register_p2p_handler_callback(name, callback_func)
Registers a handler function under a name for dynamic invocation.