Skip to content

vincent067/qdata-adapter-base

Repository files navigation

QData Adapter Base

异步优先的外部应用适配器开发框架

广东轻亿云软件科技有限公司 开发
「轻易云数据集成平台」核心组件

Python 3.11+ License: MIT PyPI version Downloads CI Coverage


关于轻易云数据集成平台

数据集成,简单看得见

轻易云是一款企业级数据集成平台,专注于帮助企业快速、高效地打通各类业务系统之间的数据通道。

轻易云
亮点 描述
即插即用 无需复杂开发,配置即连接,支持 500+ 主流应用系统对接
全程可视 数据流动、转换过程、执行状态一目了然
高性能引擎 基于优化的异步执行引擎,轻松处理海量数据同步
企业级可靠 完善的认证管理、重试机制、错误追踪

QData Adapter Base 是轻易云数据集成平台的适配器开发框架,为对接金蝶、用友、吉客云等 ERP/CRM/WMS 系统提供标准化的开发基座。


特性

  • 异步优先 — 基于 httpx 的高性能异步 HTTP 客户端
  • 多认证方式 — OAuth2、HMAC 签名、API Key、Session 登录
  • 自动重试 — 基于 tenacity 的指数退避重试,5xx / 网络错误自动重试
  • 组合器模式 — 一个平台多套接口体系(如标准 API + 奇门 API)
  • 插件架构 — Entry Points 动态发现适配器,即装即用
  • 类型安全 — 完整类型标注 + Pydantic 数据验证 + py.typed
  • 连接测试 — 结构化的 TestConnectionResult 连接检测

安装

pip install qdata-adapter-base

开发模式:

pip install qdata-adapter-base[dev]

源码安装:

git clone https://github.com/qeasy/qdata-adapter-base.git
cd qdata-adapter-base
pip install -e ".[dev]"

快速开始

创建适配器

from qdata_adapter import BaseAppAdapter, ConnectorContext, TestConnectionResult
from typing import AsyncIterator, Any
import time


class MyERPAdapter(BaseAppAdapter):
    app_code = "my_erp"
    adapter_version = "1.0.0"

    async def authenticate(self) -> dict[str, Any]:
        response = await self.http_client.post(
            "/api/auth/login",
            json={
                "username": self.context.auth_config["username"],
                "password": self.context.auth_config["password"],
            },
        )
        return {"access_token": response["token"], "expires_in": 3600}

    async def refresh_token(self) -> dict[str, Any]:
        return await self.authenticate()

    async def list_objects(
        self, object_type: str, filters: dict | None = None, page_size: int = 100
    ) -> AsyncIterator[dict[str, Any]]:
        await self.ensure_authenticated()
        page = 1
        while True:
            response = await self.http_client.get(
                f"/api/{object_type}",
                params={"page": page, "page_size": page_size, **(filters or {})},
            )
            for item in response.get("data", []):
                yield item
            if len(response.get("data", [])) < page_size:
                break
            page += 1

    async def get_object(self, object_type: str, object_id: str) -> dict[str, Any]:
        await self.ensure_authenticated()
        return await self.http_client.get(f"/api/{object_type}/{object_id}")

    async def test_connection(self) -> TestConnectionResult:
        start = time.time()
        try:
            await self.authenticate()
            return TestConnectionResult.connected(
                duration_ms=int((time.time() - start) * 1000),
            )
        except Exception as e:
            return TestConnectionResult.network_error(str(e))

使用适配器

import asyncio
from qdata_adapter import ConnectorContext, AdapterRegistry

# 注册适配器
AdapterRegistry.register(MyERPAdapter)

async def main():
    context = ConnectorContext(
        connector_id="my-connector-001",
        app_software_code="my_erp",
        base_url="https://api.my-erp.com",
        auth_config={"username": "admin", "password": "secret"},
    )

    adapter = AdapterRegistry.create_adapter("my_erp", context)

    # 测试连接
    result = await adapter.test_connection()
    print(f"连接状态: {result.status}, 耗时: {result.duration_ms}ms")

    # 查询数据
    async for order in adapter.list_objects("orders", {"status": "pending"}):
        print(f"Order: {order['id']}")

asyncio.run(main())

核心组件

组件 说明
ConnectorContext 连接器上下文 — Pydantic 数据验证,敏感信息脱敏
BaseAppAdapter 适配器抽象基类 — 定义标准接口,管理认证生命周期
AdapterRegistry 适配器注册中心 — 手动注册 + Entry Points 自动发现
HttpClient 异步 HTTP 客户端 — 重试、超时、错误转换
TestConnectionResult 连接测试结果 — 结构化的健康检查响应
TokenCacheProtocol Token 缓存协议 — 可插拔(内存 / Redis / 自定义)

异常体系

AdapterError
├── AuthenticationError
│   └── TokenExpiredError
├── AdapterConnectionError
│   └── AdapterTimeoutError
├── ResponseError
│   ├── RateLimitError
│   └── NotFoundError
├── ValidationError
├── ConfigurationError
└── NodeError
    ├── NodeConfigError
    ├── NodeExecutionError
    ├── NodeInputError / NodeOutputError
    └── ConnectorNotFoundError / ConnectorInactiveError

组合器模式

当一个平台有多套接口体系时(如吉客云的标准接口 + 奇门接口),使用组合器模式:

JkyAdapter(唯一外部入口)
├── settings["interface"] = "standard" → JkyStandardInterface
└── settings["interface"] = "qimen"   → JkyQimenInterface

适配器包通过 Entry Points 注册,即装即用:

# pyproject.toml
[project.entry-points."qdata.adapters"]
jky = "qdata_adapter_jky:JkyAdapter"
kingdee = "qdata_adapter_kingdee:KingdeeAdapter"

开发

git clone https://github.com/qeasy/qdata-adapter-base.git
cd qdata-adapter-base

python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"

make test          # 运行测试
make lint          # 代码检查
make format        # 代码格式化
make check         # lint + 类型检查
make test-cov      # 测试 + 覆盖率
make build         # 构建 wheel

许可证

本项目采用 MIT License — 详见 LICENSE


关于我们

广东轻亿云软件科技有限公司 专注数据集成与处理,提供企业级 ETL/ELT 解决方案

官网 https://www.qeasy.cloud
开源项目 opensource@qeasy.cloud
商业咨询 vincent@qeasy.cloud

Powered by 广东轻亿云软件科技有限公司

About

QData Adapter Base is an async-first Python framework for building enterprise application adapters. It standardizes integrations with ERP/CRM/WMS systems via OAuth2/HMAC/API Key auth, auto-retry mechanisms, and plugin-based discovery. Built on httpx + tenacity + Pydantic for type-safe, production-ready connectors.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors