This document describes MicroAPI's internal architecture, wire protocol, and design decisions.
┌──────────────────────────────────────────────────────────────┐
│ MicroAPI App │
│ ┌──────────┐ ┌────────────┐ ┌──────────┐ ┌───────────┐ │
│ │ Services │ │ Middleware │ │ Router │ │ Lifecycle │ │
│ │ @method │ │ Chain │ │ dispatch │ │ hooks │ │
│ └──────────┘ └────────────┘ └──────────┘ └───────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ Protocol │ │
│ │ Request/Resp │ │
│ └───────┬───────┘ │
│ │ │
│ ┌─────────────────────┴─────────────────────┐ │
│ │ Transport Layer │ │
│ │ ┌──────┐ ┌──────┐ ┌────┐ ┌─────┐ ┌────┐ │ │
│ │ │ gRPC │ │ HTTP │ │ WS │ │Kafka│ │AMQP│ │ │
│ │ └──────┘ └──────┘ └────┘ └─────┘ └────┘ │ │
│ └───────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
│
┌────┴────┐
│ Network │
└────┬────┘
│
┌──────────────────────────────────────────────────────────────┐
│ Client Side │
│ ┌───────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ Generated Lib │ │ Connection │ │ Transport Client │ │
│ │ users.get_user │──│ set_current│──│ HTTP/gRPC/WS/... │ │
│ └───────────────┘ └────────────┘ └──────────────────┘ │
└──────────────────────────────────────────────────────────────┘
The central application object. Manages:
- Service registration
- Middleware chain
- Lifespan/lifecycle hooks
- Server startup and shutdown
- Code generation (on demand)
A named group of RPC methods. The @service.method decorator introspects function signatures to detect:
- Input type (Pydantic
Schema) - Output type
- Stream input type (
types.Stream[T]) - RPC pattern (unary, server/client/bidi streaming)
- Dependencies (
Depends())
Alias for a configured Pydantic BaseModel:
class Schema(BaseModel):
model_config = ConfigDict(
from_attributes=True,
populate_by_name=True,
)Dispatches incoming Request objects to the correct service method:
- Look up the service by name
- Look up the method within the service
- Apply middleware chain
- Validate input via Pydantic
- Resolve dependencies
- Call the handler
- Serialize the response
A chain of middleware functions that wrap request handling:
MW1 → MW2 → MW3 → Handler
Each middleware receives the request and a call_next function. The chain is built once and reused for all requests.
Resolves Depends() markers in handler signatures:
- Inspects function parameters
- Detects
Dependsdefaults - Resolves dependencies recursively
- Caches resolved values per-request
- Injects
Requestif type-hinted
Defines the internal message format:
@dataclass
class Request:
service: str
method: str
payload: dict | list | None
metadata: dict[str, str]
id: str # auto-generated UUID
@dataclass
class Response:
payload: dict | None
error: str | None
status_code: StatusCode
metadata: dict[str, str]The wire-level message format used by transports:
@dataclass
class Envelope:
type: MessageType # REQUEST, RESPONSE, STREAM_PUSH, STREAM_END
id: str
service: str | None
method: str | None
payload: dict | list | None
metadata: dict[str, str]
error: str | None
status_code: intEnvelopes are serialized to JSON using orjson for performance.
Fast JSON serialization using orjson:
serialize(data) → bytes # dict/model → JSON bytes
deserialize(data) → dict # bytes/str → dict
to_dict(obj) → dict # model/dict → dictEach transport implements two interfaces:
class TransportServer(ABC):
@abstractmethod
async def start(self, router: Router) -> None: ...
@abstractmethod
async def stop(self) -> None: ...class TransportClient(ABC):
@abstractmethod
async def connect(self) -> None: ...
@abstractmethod
async def close(self) -> None: ...
@abstractmethod
async def request(
self, service: str, method: str,
payload: dict | None = None,
metadata: dict[str, str] | None = None,
) -> dict: ...
async def request_stream(
self, service: str, method: str,
payload: dict | None = None,
metadata: dict[str, str] | None = None,
) -> AsyncIterator[dict]: ...class Transport(ABC):
@abstractmethod
def create_server(self) -> TransportServer: ...
@abstractmethod
def create_client(self) -> TransportClient: ...All transports use JSON envelopes for communication:
{
"type": "request",
"id": "550e8400-e29b-41d4-a716-446655440000",
"service": "users",
"method": "get_user",
"payload": {"user_id": 42},
"metadata": {"authorization": "Bearer xxx"},
"error": null,
"status_code": 0
}| Type | Direction | Description |
|---|---|---|
request |
Client → Server | Initial RPC request |
response |
Server → Client | Final response |
stream_push |
Server → Client | One item in a stream |
stream_end |
Server → Client | End of stream |
- HTTP: JSON body for requests, NDJSON for streaming responses
- gRPC: Length-prefixed frames over HTTP/2
- WebSocket: JSON text frames with multiplexing via
id - Kafka: JSON messages on configured topics
- RabbitMQ: JSON messages with correlation IDs
- Collect schemas: Walk all services and collect unique Pydantic models
- Generate types.py: Convert models to
ClientSchemasubclasses - Generate service modules: For each method, generate:
- Unary → async function
- Server streaming → async generator function
- Client streaming →
ClientStreamsubclass - Bidirectional →
ClientStream[T]subclass withnext()andsend()
- Generate init.py: Re-export modules and schema types
- Map Python types to Protobuf types
- Generate
messagedefinitions from schemas - Generate
servicedefinitions from methods - Handle streaming annotations (
streamkeyword)
Stream[T] wraps an asyncio.Queue:
class Stream[T]:
def __init__(self):
self._queue = asyncio.Queue()
self._closed = False
async def _feed(self, item: T) -> None:
"""Used by transport to feed incoming items."""
self._queue.put_nowait(item)
async def _close(self) -> None:
"""Signal end of stream."""
self._queue.put_nowait(_SENTINEL)
async def __anext__(self) -> T:
item = await self._queue.get()
if item is _SENTINEL:
raise StopAsyncIteration
return itemClientStream buffers messages and sends them on end():
class ClientStream:
async def _send_raw(self, payload):
self._buffer.append(payload)
async def end(self):
# Send all buffered messages as a list
return await self._transport.request(
service=self._service,
method=self._method,
payload=self._buffer,
)microapi/
├── __init__.py # Public API exports
├── app.py # MicroAPI application class
├── service.py # Service & MethodInfo
├── schema.py # Schema (Pydantic BaseModel)
├── types.py # Stream[T], Streaming[T]
├── middleware.py # Middleware & MiddlewareChain
├── dependencies.py # Depends() resolver
├── protocol.py # Request, Response, Envelope, StatusCode
├── serialization.py # JSON serialization (orjson)
├── routing.py # Router & request dispatch
├── lifecycle.py # Lifespan type & default
├── exceptions.py # Exception hierarchy
├── _logging.py # Structured logging
├── hot_reload.py # File watcher
├── transport/
│ ├── __init__.py # Lazy import helpers
│ ├── base.py # Abstract Transport/Server/Client
│ ├── grpc/
│ │ ├── __init__.py
│ │ ├── transport.py # GRPCTransport factory
│ │ ├── server.py # HTTP/2 server (h2)
│ │ └── client.py # HTTP/2 client
│ ├── http/
│ │ ├── __init__.py
│ │ ├── transport.py # HTTPTransport factory
│ │ ├── server.py # aiohttp server
│ │ └── client.py # aiohttp client
│ ├── websocket/
│ │ ├── __init__.py
│ │ ├── transport.py # WebSocketTransport factory
│ │ ├── server.py # websockets server
│ │ └── client.py # websockets client
│ ├── kafka/
│ │ ├── __init__.py
│ │ └── transport.py # KafkaTransport (server + client)
│ └── rabbitmq/
│ ├── __init__.py
│ └── transport.py # RabbitMQTransport (server + client)
├── client/
│ ├── __init__.py
│ ├── base.py # Connection, ClientSchema
│ └── stream.py # ClientStream, ClientStreaming
├── generator/
│ ├── __init__.py
│ ├── python_gen.py # Python client generator
│ └── protobuf_gen.py # Protobuf generator
├── cli/
│ ├── __init__.py
│ └── main.py # typer CLI app
└── utils/
├── __init__.py
└── validators.py # Host/port validators
The standard grpcio library:
- Has a complex C extension that can be hard to install
- Doesn't integrate well with asyncio
- Has its own thread pool that conflicts with async frameworks
MicroAPI's custom gRPC implementation uses h2 (pure Python HTTP/2) and is fully async-native.
- JSON is human-readable (easier to debug)
- No
.protocompilation step required - Works naturally with Pydantic models
orjsonmakes JSON serialization very fast.protofiles are still generated for cross-language interop
Client streaming sends all buffered messages at once (as a JSON array) rather than true streaming. This simplifies:
- Transport implementations (all use the same
request()method) - Code generation (no transport-specific streaming protocols)
- Error handling (single request/response cycle)
For true bidirectional streaming, use the WebSocket transport.
MicroAPI accepts lifespans as both raw async generators and @asynccontextmanager-decorated functions. It uses inspect.isasyncgenfunction() to detect the difference and wraps raw generators automatically.