Skip to main content

Runtime Abstraction

The runtime layer provides a unified interface for connecting to MCP servers regardless of how they're deployed.

Core Traits

Runtime Trait

// gateway-core/src/runtime/mod.rs

#[async_trait]
pub trait Runtime: Send + Sync {
/// Connect to the MCP server and return a connection handle
async fn connect(&self) -> Result<Box<dyn BackendConnection>, GatewayError>;
}

The Runtime trait is implemented by each runtime type (local process, remote SSE, Docker, etc.).

BackendConnection Trait

#[async_trait]
pub trait BackendConnection: Send + Sync {
/// Send a message to the MCP server
async fn send(&mut self, message: &McpMessage) -> Result<(), GatewayError>;

/// Receive the next message from the MCP server
async fn recv(&mut self) -> Result<Option<McpMessage>, GatewayError>;

/// Close the connection gracefully
async fn close(&mut self) -> Result<(), GatewayError>;
}

McpMessage

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpMessage {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<serde_json::Value>,
}

impl McpMessage {
pub fn is_request(&self) -> bool {
self.method.is_some() && self.id.is_some()
}

pub fn is_response(&self) -> bool {
self.result.is_some() || self.error.is_some()
}

pub fn is_notification(&self) -> bool {
self.method.is_some() && self.id.is_none()
}
}

Local Process Runtime

Spawns MCP servers as child processes with stdio communication.

Implementation

// gateway-core/src/runtime/local_process.rs

pub struct LocalProcessRuntime {
command: String,
args: Vec<String>,
working_dir: Option<PathBuf>,
env: HashMap<String, String>,
}

#[async_trait]
impl Runtime for LocalProcessRuntime {
async fn connect(&self) -> Result<Box<dyn BackendConnection>, GatewayError> {
let mut cmd = Command::new(&self.command);
cmd.args(&self.args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());

if let Some(dir) = &self.working_dir {
cmd.current_dir(dir);
}

for (key, value) in &self.env {
cmd.env(key, value);
}

let child = cmd.spawn()?;

Ok(Box::new(LocalProcessConnection::new(child)))
}
}

Connection Implementation

pub struct LocalProcessConnection {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}

#[async_trait]
impl BackendConnection for LocalProcessConnection {
async fn send(&mut self, message: &McpMessage) -> Result<(), GatewayError> {
let json = serde_json::to_string(message)?;
self.stdin.write_all(json.as_bytes()).await?;
self.stdin.write_all(b"\n").await?;
self.stdin.flush().await?;
Ok(())
}

async fn recv(&mut self) -> Result<Option<McpMessage>, GatewayError> {
let mut line = String::new();
let bytes = self.stdout.read_line(&mut line).await?;
if bytes == 0 {
return Ok(None);
}
let message = serde_json::from_str(&line)?;
Ok(Some(message))
}

async fn close(&mut self) -> Result<(), GatewayError> {
self.child.kill().await?;
Ok(())
}
}

Remote SSE Runtime

Connects to HTTP-based MCP servers.

Implementation

// gateway-core/src/runtime/remote_sse.rs

pub struct RemoteSseRuntime {
url: String,
headers: HashMap<String, String>,
client: reqwest::Client,
}

#[async_trait]
impl Runtime for RemoteSseRuntime {
async fn connect(&self) -> Result<Box<dyn BackendConnection>, GatewayError> {
// Verify server is reachable
let mut request = self.client.get(&self.url);
for (key, value) in &self.headers {
request = request.header(key, value);
}
request.send().await?.error_for_status()?;

Ok(Box::new(RemoteSseConnection::new(
self.url.clone(),
self.headers.clone(),
self.client.clone(),
)))
}
}

Connection Implementation

pub struct RemoteSseConnection {
url: String,
headers: HashMap<String, String>,
client: reqwest::Client,
}

#[async_trait]
impl BackendConnection for RemoteSseConnection {
async fn send(&mut self, message: &McpMessage) -> Result<(), GatewayError> {
let mut request = self.client.post(&self.url);
for (key, value) in &self.headers {
request = request.header(key, value);
}
request
.json(message)
.send()
.await?
.error_for_status()?;
Ok(())
}

async fn recv(&mut self) -> Result<Option<McpMessage>, GatewayError> {
// For SSE, messages come via the same connection
// Implementation depends on the specific SSE protocol
todo!("SSE message receiving")
}

async fn close(&mut self) -> Result<(), GatewayError> {
// HTTP connections are stateless
Ok(())
}
}

Creating a Runtime from Config

// gateway-core/src/runtime/mod.rs

impl RuntimeConfig {
pub fn create_runtime(&self) -> Result<Box<dyn Runtime>, GatewayError> {
match self {
RuntimeConfig::LocalProcess { command, args, working_dir } => {
Ok(Box::new(LocalProcessRuntime::new(
command.clone(),
args.clone(),
working_dir.clone(),
)))
}
RuntimeConfig::RemoteSse { url, headers } => {
Ok(Box::new(RemoteSseRuntime::new(
url.clone(),
headers.clone(),
)))
}
RuntimeConfig::Docker { .. } => {
Err(GatewayError::UnsupportedRuntime("docker".to_string()))
}
// ... other runtime types
}
}
}

Adding a New Runtime

To add a new runtime type:

1. Define Configuration

// In config.rs
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum RuntimeConfig {
LocalProcess { ... },
RemoteSse { ... },
// Add your new runtime
MyRuntime {
#[serde(default)]
option1: String,
option2: Option<i32>,
},
}

2. Implement Runtime Trait

// Create gateway-core/src/runtime/my_runtime.rs

pub struct MyRuntime {
option1: String,
option2: Option<i32>,
}

impl MyRuntime {
pub fn new(option1: String, option2: Option<i32>) -> Self {
Self { option1, option2 }
}
}

#[async_trait]
impl Runtime for MyRuntime {
async fn connect(&self) -> Result<Box<dyn BackendConnection>, GatewayError> {
// Your implementation
Ok(Box::new(MyConnection::new()))
}
}

3. Implement BackendConnection

pub struct MyConnection {
// Connection state
}

#[async_trait]
impl BackendConnection for MyConnection {
async fn send(&mut self, message: &McpMessage) -> Result<(), GatewayError> {
// Send implementation
}

async fn recv(&mut self) -> Result<Option<McpMessage>, GatewayError> {
// Receive implementation
}

async fn close(&mut self) -> Result<(), GatewayError> {
// Cleanup implementation
}
}

4. Update Module Exports

// In runtime/mod.rs
mod my_runtime;
pub use my_runtime::MyRuntime;

impl RuntimeConfig {
pub fn create_runtime(&self) -> Result<Box<dyn Runtime>, GatewayError> {
match self {
// ... existing cases
RuntimeConfig::MyRuntime { option1, option2 } => {
Ok(Box::new(MyRuntime::new(option1.clone(), *option2)))
}
}
}
}

5. Add Tests

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_my_runtime_connect() {
let runtime = MyRuntime::new("test".to_string(), Some(42));
let connection = runtime.connect().await;
assert!(connection.is_ok());
}
}

Runtime Selection Guidelines

Use CaseRecommended Runtime
Local developmentlocal-process
Testinglocal-process
Cloud-hosted MCP serviceremote-sse
Self-hosted remote serverremote-sse
Isolated executiondocker (v0.2)
Kubernetes deploymentk8s-* (v0.3)