API Endpoints Implementation
This document describes the implementation details of the HTTP API endpoints.
Router Setup
// gateway-http/src/main.rs
pub fn create_router(state: AppState) -> Router {
let api_routes = Router::new()
// Health & Status
.route("/health", get(routes::health))
.route("/stats", get(routes::stats))
// Server Management
.route("/servers", get(routes::list_servers))
.route("/servers/:id", get(routes::get_server))
.route("/servers/:id/start", post(routes::start_server))
.route("/servers/:id/stop", post(routes::stop_server))
// MCP Communication
.route("/mcp", post(routes::mcp_request))
.route("/rpc", post(routes::rpc_request))
// Admin
.route("/admin/reload", post(routes::reload_catalog));
// Apply middleware
Router::new()
.merge(api_routes)
.layer(TraceLayer::new_for_http())
.layer(CorsLayer::permissive())
.layer(auth_layer(state.config.api_key.clone()))
.with_state(state)
}
Application State
// gateway-http/src/state.rs
#[derive(Clone)]
pub struct AppState {
pub manager: Arc<ServerManager>,
pub watcher: Arc<CatalogWatcher>,
pub config: Arc<Config>,
pub start_time: Instant,
}
impl AppState {
pub fn new(config: Config, catalog: Catalog) -> Self {
let registry = Arc::new(ServerRegistry::new(catalog.servers));
let manager = Arc::new(ServerManager::new(
registry.clone(),
ManagerConfig::from(&config),
));
let watcher = Arc::new(CatalogWatcher::new(
config.catalog_path.clone(),
manager.clone(),
));
Self {
manager,
watcher,
config: Arc::new(config),
start_time: Instant::now(),
}
}
}
Handler Implementations
Health Check
// gateway-http/src/routes.rs
pub async fn health() -> impl IntoResponse {
Json(json!({
"status": "ok"
}))
}
List Servers
pub async fn list_servers(
State(state): State<AppState>,
Query(params): Query<ListParams>,
) -> Result<impl IntoResponse, AppError> {
let registry = state.manager.registry().await;
let stats = state.manager.get_stats().await;
let servers: Vec<ServerInfo> = registry
.all()
.filter(|s| {
// Filter by tag if specified
params.tag.as_ref().map_or(true, |tag| s.tags.contains(tag))
})
.map(|server| {
let server_stats = stats.servers.get(&server.id);
ServerInfo {
id: server.id.clone(),
display_name: server.display_name.clone(),
description: server.description.clone(),
status: server_stats
.map(|s| s.status.clone())
.unwrap_or(ServerStatus::Stopped),
tags: server.tags.clone(),
runtime_type: server.runtime.type_name(),
started_at: server_stats.and_then(|s| s.started_at),
last_activity: server_stats.map(|s| s.last_activity),
request_count: server_stats.map(|s| s.request_count).unwrap_or(0),
}
})
.collect();
Ok(Json(servers))
}
#[derive(Deserialize)]
pub struct ListParams {
tag: Option<String>,
}
#[derive(Serialize)]
pub struct ServerInfo {
id: String,
display_name: String,
description: Option<String>,
status: ServerStatus,
tags: Vec<String>,
runtime_type: String,
started_at: Option<DateTime<Utc>>,
last_activity: Option<DateTime<Utc>>,
request_count: u64,
}
Get Server
pub async fn get_server(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, AppError> {
let registry = state.manager.registry().await;
let server = registry
.get(&id)
.ok_or(AppError::NotFound(format!("Server '{}' not found", id)))?;
let stats = state.manager.get_stats().await;
let server_stats = stats.servers.get(&id);
Ok(Json(ServerDetail {
id: server.id.clone(),
display_name: server.display_name.clone(),
description: server.description.clone(),
status: server_stats
.map(|s| s.status.clone())
.unwrap_or(ServerStatus::Stopped),
runtime: RuntimeInfo::from(&server.runtime),
env: server.env.clone(),
tags: server.tags.clone(),
started_at: server_stats.and_then(|s| s.started_at),
last_activity: server_stats.map(|s| s.last_activity),
request_count: server_stats.map(|s| s.request_count).unwrap_or(0),
uptime_seconds: server_stats
.and_then(|s| s.uptime)
.map(|d| d.as_secs()),
}))
}
Start Server
pub async fn start_server(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, AppError> {
// Check if server exists
let registry = state.manager.registry().await;
if !registry.contains(&id) {
return Err(AppError::NotFound(format!("Server '{}' not found", id)));
}
// Check current status
let status = state.manager.get_status(&id).await?;
if status == ServerStatus::Running {
return Ok(Json(json!({
"id": id,
"status": "running",
"message": "Server already running"
})));
}
// Start the server
state.manager.start_server(&id).await?;
Ok(Json(json!({
"id": id,
"status": "running",
"message": "Server started successfully"
})))
}
Stop Server
pub async fn stop_server(
State(state): State<AppState>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, AppError> {
let status = state.manager.get_status(&id).await?;
if status == ServerStatus::Stopped {
return Ok(Json(json!({
"id": id,
"status": "stopped",
"message": "Server already stopped"
})));
}
state.manager.stop_server(&id).await?;
Ok(Json(json!({
"id": id,
"status": "stopped",
"message": "Server stopped successfully"
})))
}
MCP Request
pub async fn mcp_request(
State(state): State<AppState>,
Json(request): Json<McpRequest>,
) -> Result<impl IntoResponse, AppError> {
// Extract server ID and message
let server_id = request.server_id.clone();
let message = McpMessage {
jsonrpc: request.jsonrpc,
id: request.id,
method: request.method,
params: request.params,
result: None,
error: None,
};
// Get connection (auto-starts if needed)
let mut connection = state.manager.get_connection(&server_id).await?;
// Send request
connection.send(&message).await?;
// Receive response
let response = connection.recv().await?
.ok_or(AppError::Internal("No response from server".to_string()))?;
Ok(Json(response))
}
#[derive(Deserialize)]
pub struct McpRequest {
server_id: String,
jsonrpc: String,
id: Option<serde_json::Value>,
method: Option<String>,
params: Option<serde_json::Value>,
}
Raw RPC Request
pub async fn rpc_request(
State(state): State<AppState>,
headers: HeaderMap,
Json(message): Json<McpMessage>,
) -> Result<impl IntoResponse, AppError> {
// Get server ID from header
let server_id = headers
.get("X-Server-ID")
.and_then(|v| v.to_str().ok())
.ok_or(AppError::BadRequest("X-Server-ID header required".to_string()))?
.to_string();
// Get connection
let mut connection = state.manager.get_connection(&server_id).await?;
// Send and receive
connection.send(&message).await?;
let response = connection.recv().await?
.ok_or(AppError::Internal("No response from server".to_string()))?;
Ok(Json(response))
}
Statistics
pub async fn stats(
State(state): State<AppState>,
) -> Result<impl IntoResponse, AppError> {
let manager_stats = state.manager.get_stats().await;
Ok(Json(json!({
"uptime_seconds": state.start_time.elapsed().as_secs(),
"total_servers": manager_stats.total_servers,
"active_servers": manager_stats.active_servers,
"last_reload": state.watcher.last_reload().await,
"servers": manager_stats.servers.iter().map(|(id, s)| {
(id.clone(), json!({
"status": s.status,
"request_count": s.request_count,
"error_count": s.error_count,
"uptime_seconds": s.uptime.map(|d| d.as_secs()),
}))
}).collect::<HashMap<_, _>>()
})))
}
Reload Catalog
pub async fn reload_catalog(
State(state): State<AppState>,
) -> Result<impl IntoResponse, AppError> {
let result = state.watcher.reload().await?;
Ok(Json(json!({
"status": "ok",
"message": "Catalog reloaded",
"servers_loaded": result.total,
"servers_added": result.added,
"servers_removed": result.removed
})))
}
Error Handling
// gateway-http/src/routes.rs
pub enum AppError {
NotFound(String),
BadRequest(String),
Unauthorized(String),
Internal(String),
BadGateway(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match self {
AppError::NotFound(msg) => (StatusCode::NOT_FOUND, msg),
AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
AppError::Unauthorized(msg) => (StatusCode::UNAUTHORIZED, msg),
AppError::Internal(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
AppError::BadGateway(msg) => (StatusCode::BAD_GATEWAY, msg),
};
let body = Json(json!({
"error": status.canonical_reason().unwrap_or("Error"),
"message": message
}));
(status, body).into_response()
}
}
impl From<GatewayError> for AppError {
fn from(e: GatewayError) -> Self {
match e {
GatewayError::ServerNotFound(id) => {
AppError::NotFound(format!("Server '{}' not found", id))
}
GatewayError::RuntimeError(e) => {
AppError::BadGateway(e.to_string())
}
GatewayError::ConfigError(e) => {
AppError::Internal(e)
}
_ => AppError::Internal(e.to_string()),
}
}
}
Authentication Middleware
// gateway-http/src/auth.rs
pub fn auth_layer(api_key: Option<String>) -> Option<AuthLayer> {
api_key.map(|key| AuthLayer::new(key))
}
pub struct AuthLayer {
api_key: String,
}
impl AuthLayer {
pub fn new(api_key: String) -> Self {
Self { api_key }
}
}
impl<S> Layer<S> for AuthLayer {
type Service = AuthMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
AuthMiddleware {
inner,
api_key: self.api_key.clone(),
}
}
}
impl<S, B> Service<Request<B>> for AuthMiddleware<S> {
// ... implementation that checks Authorization or X-API-Key headers
// Skips auth for /health endpoint
}