异步优先的外部应用适配器开发框架
由 广东轻亿云软件科技有限公司 开发
「轻易云数据集成平台」核心组件
数据集成,简单看得见
轻易云是一款企业级数据集成平台,专注于帮助企业快速、高效地打通各类业务系统之间的数据通道。
| 亮点 | 描述 |
|---|---|
| 即插即用 | 无需复杂开发,配置即连接,支持 500+ 主流应用系统对接 |
| 全程可视 | 数据流动、转换过程、执行状态一目了然 |
| 高性能引擎 | 基于优化的异步执行引擎,轻松处理海量数据同步 |
| 企业级可靠 | 完善的认证管理、重试机制、错误追踪 |
QData Adapter Base 是轻易云数据集成平台的适配器开发框架,为对接金蝶、用友、吉客云等 ERP/CRM/WMS 系统提供标准化的开发基座。
- 异步优先 — 基于
httpx的高性能异步 HTTP 客户端 - 多认证方式 — OAuth2、HMAC 签名、API Key、Session 登录
- 自动重试 — 基于
tenacity的指数退避重试,5xx / 网络错误自动重试 - 组合器模式 — 一个平台多套接口体系(如标准 API + 奇门 API)
- 插件架构 — Entry Points 动态发现适配器,即装即用
- 类型安全 — 完整类型标注 + Pydantic 数据验证 + py.typed
- 连接测试 — 结构化的
TestConnectionResult连接检测
pip install qdata-adapter-base开发模式:
pip install qdata-adapter-base[dev]源码安装:
git clone https://github.com/qeasy/qdata-adapter-base.git
cd qdata-adapter-base
pip install -e ".[dev]"from qdata_adapter import BaseAppAdapter, ConnectorContext, TestConnectionResult
from typing import AsyncIterator, Any
import time
class MyERPAdapter(BaseAppAdapter):
app_code = "my_erp"
adapter_version = "1.0.0"
async def authenticate(self) -> dict[str, Any]:
response = await self.http_client.post(
"/api/auth/login",
json={
"username": self.context.auth_config["username"],
"password": self.context.auth_config["password"],
},
)
return {"access_token": response["token"], "expires_in": 3600}
async def refresh_token(self) -> dict[str, Any]:
return await self.authenticate()
async def list_objects(
self, object_type: str, filters: dict | None = None, page_size: int = 100
) -> AsyncIterator[dict[str, Any]]:
await self.ensure_authenticated()
page = 1
while True:
response = await self.http_client.get(
f"/api/{object_type}",
params={"page": page, "page_size": page_size, **(filters or {})},
)
for item in response.get("data", []):
yield item
if len(response.get("data", [])) < page_size:
break
page += 1
async def get_object(self, object_type: str, object_id: str) -> dict[str, Any]:
await self.ensure_authenticated()
return await self.http_client.get(f"/api/{object_type}/{object_id}")
async def test_connection(self) -> TestConnectionResult:
start = time.time()
try:
await self.authenticate()
return TestConnectionResult.connected(
duration_ms=int((time.time() - start) * 1000),
)
except Exception as e:
return TestConnectionResult.network_error(str(e))import asyncio
from qdata_adapter import ConnectorContext, AdapterRegistry
# 注册适配器
AdapterRegistry.register(MyERPAdapter)
async def main():
context = ConnectorContext(
connector_id="my-connector-001",
app_software_code="my_erp",
base_url="https://api.my-erp.com",
auth_config={"username": "admin", "password": "secret"},
)
adapter = AdapterRegistry.create_adapter("my_erp", context)
# 测试连接
result = await adapter.test_connection()
print(f"连接状态: {result.status}, 耗时: {result.duration_ms}ms")
# 查询数据
async for order in adapter.list_objects("orders", {"status": "pending"}):
print(f"Order: {order['id']}")
asyncio.run(main())| 组件 | 说明 |
|---|---|
ConnectorContext |
连接器上下文 — Pydantic 数据验证,敏感信息脱敏 |
BaseAppAdapter |
适配器抽象基类 — 定义标准接口,管理认证生命周期 |
AdapterRegistry |
适配器注册中心 — 手动注册 + Entry Points 自动发现 |
HttpClient |
异步 HTTP 客户端 — 重试、超时、错误转换 |
TestConnectionResult |
连接测试结果 — 结构化的健康检查响应 |
TokenCacheProtocol |
Token 缓存协议 — 可插拔(内存 / Redis / 自定义) |
AdapterError
├── AuthenticationError
│ └── TokenExpiredError
├── AdapterConnectionError
│ └── AdapterTimeoutError
├── ResponseError
│ ├── RateLimitError
│ └── NotFoundError
├── ValidationError
├── ConfigurationError
└── NodeError
├── NodeConfigError
├── NodeExecutionError
├── NodeInputError / NodeOutputError
└── ConnectorNotFoundError / ConnectorInactiveError
当一个平台有多套接口体系时(如吉客云的标准接口 + 奇门接口),使用组合器模式:
JkyAdapter(唯一外部入口)
├── settings["interface"] = "standard" → JkyStandardInterface
└── settings["interface"] = "qimen" → JkyQimenInterface
适配器包通过 Entry Points 注册,即装即用:
# pyproject.toml
[project.entry-points."qdata.adapters"]
jky = "qdata_adapter_jky:JkyAdapter"
kingdee = "qdata_adapter_kingdee:KingdeeAdapter"git clone https://github.com/qeasy/qdata-adapter-base.git
cd qdata-adapter-base
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
make test # 运行测试
make lint # 代码检查
make format # 代码格式化
make check # lint + 类型检查
make test-cov # 测试 + 覆盖率
make build # 构建 wheel本项目采用 MIT License — 详见 LICENSE。
广东轻亿云软件科技有限公司 专注数据集成与处理,提供企业级 ETL/ELT 解决方案
| 官网 | https://www.qeasy.cloud |
| 开源项目 | opensource@qeasy.cloud |
| 商业咨询 | vincent@qeasy.cloud |
Powered by 广东轻亿云软件科技有限公司