The pgEdge RAG Server provides a REST API for querying RAG pipelines.
By default, the server listens on http://localhost:8080. All endpoints use
the /v1 API version prefix.
The server implements RFC 8631
for API documentation discovery. All JSON responses include a Link header:
Link: </v1/openapi.json>; rel="service-desc"
This allows tools like restish to automatically discover and use the API schema.
Get the OpenAPI v3 specification for the API.
GET /v1/openapi.json
Returns an OpenAPI 3.0.3 specification document describing all API endpoints, request/response schemas, and error formats.
| Status Code | Description |
|---|---|
| 200 | OpenAPI specification |
Check if the server is running and healthy.
GET /v1/health
{
"status": "healthy"
}| Status Code | Description |
|---|---|
| 200 | Server is healthy |
Get a list of all available RAG pipelines.
GET /v1/pipelines
{
"pipelines": [
{
"name": "my-docs",
"description": "Search my documentation"
},
{
"name": "knowledge-base",
"description": "Corporate knowledge base"
}
]
}| Status Code | Description |
|---|---|
| 200 | List of pipelines |
Execute a RAG query against a specific pipeline.
POST /v1/pipelines/{name}
| Parameter | Description |
|---|---|
name |
Pipeline name (from config) |
{
"query": "How do I configure replication?",
"stream": false,
"top_n": 10,
"filter": {
"conditions": [
{"column": "product", "operator": "=", "value": "pgEdge"},
{"column": "version", "operator": "=", "value": "v5.0"}
],
"logic": "AND"
},
"include_sources": true,
"messages": [
{"role": "user", "content": "What is pgEdge?"},
{"role": "assistant", "content": "pgEdge is a distributed PostgreSQL platform..."}
]
}| Field | Type | Required | Description |
|---|---|---|---|
query |
string | Yes | The question to answer |
stream |
boolean | No | Enable streaming response (SSE) |
top_n |
integer | No | Override default result limit |
filter |
object | No | Structured filter to apply to results |
include_sources |
boolean | No | Include source documents (default: false) |
messages |
array | No | Previous conversation history for context |
The filter parameter accepts a structured filter object with conditions
and operators. This is useful when your data contains multiple products or
versions and you want to restrict results. API filters must use this
structured format for security (parameterized queries prevent SQL injection).
If the pipeline configuration also specifies a filter, both filters are combined using AND logic.
Filter examples:
Single condition:
{
"conditions": [
{"column": "product", "operator": "=", "value": "pgAdmin"}
]
}Multiple conditions with AND:
{
"conditions": [
{"column": "product", "operator": "=", "value": "pgAdmin"},
{"column": "version", "operator": ">=", "value": "v8.0"}
],
"logic": "AND"
}Multiple conditions with OR:
{
"conditions": [
{"column": "status", "operator": "=", "value": "published"},
{"column": "status", "operator": "=", "value": "draft"}
],
"logic": "OR"
}Supported operators: =, !=, <, >, <=, >=, LIKE, ILIKE,
IN, NOT IN, IS NULL, IS NOT NULL
| Field | Type | Description |
|---|---|---|
role |
string | Message role: user or assistant |
content |
string | Message content |
{
"answer": "To configure replication, you need to...",
"tokens_used": 1523
}When include_sources: true:
{
"answer": "To configure replication, you need to...",
"sources": [
{
"id": "doc-123",
"content": "Replication is configured by...",
"score": 0.95
},
{
"id": "doc-456",
"content": "The replication settings include...",
"score": 0.87
}
],
"tokens_used": 1523
}| Field | Type | Description |
|---|---|---|
answer |
string | The generated answer |
sources |
array | Source documents (only if requested) |
tokens_used |
integer | Total tokens consumed by the request |
| Field | Type | Description |
|---|---|---|
id |
string | Document identifier (if available) |
content |
string | Document text content |
score |
number | Relevance score (higher is better) |
When stream: true, the response uses Server-Sent Events (SSE).
Headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Event Format:
Each event is a JSON object sent as an SSE data line:
data: {"type": "chunk", "content": "To configure "}
data: {"type": "chunk", "content": "replication, "}
data: {"type": "chunk", "content": "you need to..."}
data: {"type": "done"}
| Type | Description | Fields |
|---|---|---|
chunk |
Partial response content | content |
done |
Stream completed successfully | - |
error |
An error occurred | error |
{
"error": {
"code": "PIPELINE_NOT_FOUND",
"message": "pipeline not found: unknown-pipeline"
}
}| Status Code | Error Code | Description |
|---|---|---|
| 400 | INVALID_REQUEST |
Invalid request body or query |
| 404 | PIPELINE_NOT_FOUND |
Pipeline does not exist |
| 405 | METHOD_NOT_ALLOWED |
Wrong HTTP method |
| 500 | EXECUTION_ERROR |
Pipeline execution failed |
| 500 | INTERNAL_ERROR |
Unexpected server error |
List pipelines:
curl http://localhost:8080/v1/pipelinesSimple query:
curl -X POST http://localhost:8080/v1/pipelines/my-docs \
-H "Content-Type: application/json" \
-d '{"query": "How do I get started?"}'Query with filter:
curl -X POST http://localhost:8080/v1/pipelines/my-docs \
-H "Content-Type: application/json" \
-d '{
"query": "How do I configure backups?",
"filter": {
"conditions": [
{"column": "product", "operator": "=", "value": "pgAdmin"},
{"column": "version", "operator": "=", "value": "v9.0"}
],
"logic": "AND"
}
}'Streaming query:
curl -X POST http://localhost:8080/v1/pipelines/my-docs \
-H "Content-Type: application/json" \
-N \
-d '{"query": "Explain the architecture", "stream": true}'Non-streaming:
import requests
response = requests.post(
"http://localhost:8080/v1/pipelines/my-docs",
json={"query": "How do I configure SSL?"}
)
data = response.json()
print(data["answer"])
for source in data["sources"]:
print(f"- {source['content'][:100]}... (score: {source['score']:.2f})")Streaming:
import requests
response = requests.post(
"http://localhost:8080/v1/pipelines/my-docs",
json={"query": "Explain the setup process", "stream": True},
stream=True
)
for line in response.iter_lines():
if line and line.startswith(b"data: "):
import json
event = json.loads(line[6:])
if event["type"] == "chunk":
print(event["content"], end="", flush=True)
elif event["type"] == "done":
print() # newline at endNon-streaming:
const response = await fetch("http://localhost:8080/v1/pipelines/my-docs", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: "How do I get started?" }),
});
const data = await response.json();
console.log(data.answer);Streaming with EventSource:
// Using fetch for SSE
const response = await fetch("http://localhost:8080/v1/pipelines/my-docs", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query: "Explain the setup", stream: true }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const event = JSON.parse(line.slice(6));
if (event.type === "chunk") {
process.stdout.write(event.content);
}
}
}
}The server does not implement rate limiting. If needed, use a reverse proxy (nginx, Caddy, etc.) or API gateway in front of the server.
The server does not implement authentication. For production deployments, place the server behind an authenticating proxy or API gateway.