diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..a4ca38c --- /dev/null +++ b/.flake8 @@ -0,0 +1,14 @@ +[flake8] +max-line-length = 79 +extend-ignore = E501, W503, E203, E402, E712 +exclude = + .git, + backend/alembic/versions/*, + backend/db/base.py, + __pycache__, + .tox, + .eggs, + *.egg, + .venv, + venv, + alembic/ \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..4b42419 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,93 @@ +name: Derbit Client workflow + +on: + push: + branches: + - "**" + pull_request: + +jobs: + lint: + name: Lint & Tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Black check + run: black --check . + + - name: Isort check + run: isort --check-only . + + - name: Flake8 check + run: flake8 . + + - name: Run pytest + run: pytest -v + + push_branch_dev_to_docker_hub: + name: Build and Push Docker(dev) + runs-on: ubuntu-latest + needs: lint + + if: github.ref == 'refs/heads/dev' + + steps: + - name: Check out the repo + uses: actions/checkout@v4 + + - name: Setup Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Push to Docker Hub + uses: docker/build-push-action@v5 + with: + push: true + tags: | + dmsn/derbit_client:dev + + push_branch_main_to_docker_hub: + name: Build and Push Docker(prod) + runs-on: ubuntu-latest + needs: lint + + if: github.ref == 'refs/heads/main' + + steps: + - name: Check out the repo + uses: actions/checkout@v4 + + - name: Setup Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Push to Docker Hub + uses: docker/build-push-action@v5 + with: + push: true + tags: | + dmsn/derbit_client:prod \ No newline at end of file diff --git a/.gitignore b/.gitignore index b7faf40..42eec05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Byte-compiled / optimized / DLL files +__pycache__ __pycache__/ *.py[codz] *$py.class @@ -205,3 +206,24 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + + +logs/ +*.log + + +*.sqlite +*.db +*.sqlite3 + +*.sql +*.dump +pgdata/ +/postgres-data/ + + +# IDE +.vscode/ +.idea/ + +.pytest_cache/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..de5c2d0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.11-slim +LABEL maintainer="Dmitry Titenkov " +LABEL version="1.0" +RUN mkdir /app +COPY requirements.txt /app +RUN pip3 install -r /app/requirements.txt --no-cache-dir -vvv +COPY . /app +WORKDIR /app +CMD [ "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload" ] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..65eb0da --- /dev/null +++ b/Makefile @@ -0,0 +1,17 @@ +all: build run-services migrate up + +build: + @echo "Сборка образов..." + docker-compose build + +run-services: + @echo "Запуск сервисов..." + docker-compose up -d db redis backend + +migrate: + @echo "Применение миграций..." + docker-compose exec backend alembic upgrade head + +up: + @echo "Запуск всех сервисов..." + docker-compose up -d \ No newline at end of file diff --git a/README.md b/README.md index 09dfbf1..4f68912 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,138 @@ -# Derbit Client + -Клиент каждую минуту забирает с биржи текущую цену btc_usd и eth_usd (index price валюты) после чего сохраняет в базу данных тикер валюты, текущую цену и время в UNIX timestamp +## Derbit Client + +Клиент для криптобиржи Deribit + +[![Derbit Client Lint annd Tests](https://img.shields.io/github/actions/workflow/status/dmsnback/derbit_client/main.yml?branch=main&style=flat-square&label=Derbit_Client%20Lint)](https://github.com/dmsnback/derbit_client/actions/workflows/main.yml) +[![Derbit Client Docker Dev](https://img.shields.io/github/actions/workflow/status/dmsnback/derbit_client/main.yml?branch=dev&style=flat-square&label=Derbit_Client%20Docker%20Dev)](https://github.com/dmsnback/derbit_client/actions/workflows/main.yml) +[![Derbit Client Docker Prod](https://img.shields.io/github/actions/workflow/status/dmsnback/derbit_client/main.yml?branch=main&style=flat-square&label=Derbit_Client%20Docker%20Prod)](https://github.com/dmsnback/derbit_client/actions/workflows/main.yml) + + +- [Описание](#Описание) +- [Технологии](#Технологии) +- [Тестирование](#Тестирование) +- [Таблица эндпоинтов](#Таблица) +- [Шаблон заполнения .env-файла](#Шаблон) +- [Запуск проекта на локальной машине](#Запуск) +- [Автор](#Автор) + + + +### Описание + +Асинхронный клиент для криптобиржи Deribit. +Сервис периодически получает ```index price``` BTC_USD и ETH_USD, +сохраняет данные в базу тикер валюты, текущую цену и время в ```UNIX timestamp```. + +Приложение написано с использованием **асинхронного FastAPI**, **SQLAlchemy**, **PostgreSQL**, **Celery** и **Redis**. + +В проекте настроен **CI/CD pipeline** с использованием **GitHub Actions**: + +```md +- Автоматическая проверка кода (black, isort, flake8) +- Запуск unit-тестов (`pytest`) +- Сборка Docker-образа +- Публикация образа в **Docker Hub** при пуше в соответствующие ветки +``` + +```md +Проект адаптирован для использования **PostgreSQL** и развёртывания в контейнерах **Docker**. +``` + +> [Вернуться в начало](#Начало) + + + +### Технологии + +[![Python](https://img.shields.io/badge/Python-1000?style=for-the-badge&logo=python&logoColor=ffffff&labelColor=000000&color=000000)](https://www.python.org) +[![FastAPI](https://img.shields.io/badge/FastAPI-1000?style=for-the-badge&logo=fastapi&logoColor=ffffff&labelColor=000000&color=000000)](https://fastapi.tiangolo.com) +[![Celery](https://img.shields.io/badge/Celery-1000?style=for-the-badge&logo=celery&logoColor=ffffff&labelColor=000000&color=000000)](https://docs.celeryq.dev/en/stable/index.html) +[![Redis](https://img.shields.io/badge/Redis-1000?style=for-the-badge&logo=redis&logoColor=ffffff&labelColor=000000&color=000000)](https://redis-docs.ru) +[![aiohttp](https://img.shields.io/badge/aiohttp-1000?style=for-the-badge&logo=aiohttp&logoColor=ffffff&labelColor=000000&color=000000)](https://github.com/aio-libs/aiohttp?ysclid=mkqid6e88x702921033) +[![SQLAlchemy](https://img.shields.io/badge/SQLAlchemy-1000?style=for-the-badge&logo=sqlalchemy&logoColor=ffffff&labelColor=000000&color=000000)](https://www.sqlalchemy.org) +[![Pydantic](https://img.shields.io/badge/Pydantic_V2-1000?style=for-the-badge&logo=Pydantic&logoColor=ffffff&labelColor=000000&color=000000)](https://docs.pydantic.dev/latest/) +[![Docker](https://img.shields.io/badge/Docker-1000?style=for-the-badge&logo=docker&logoColor=ffffff&labelColor=000000&color=000000)](https://www.docker.com) +[![Postgres](https://img.shields.io/badge/Postgres-1000?style=for-the-badge&logo=postgresql&logoColor=ffffff&labelColor=000000&color=000000)](https://www.postgresql.org) +[![Pytest](https://img.shields.io/badge/Pytest-1000?style=for-the-badge&logo=pytest&logoColor=ffffff&labelColor=000000&color=000000)](https://docs.pytest.org/en/stable/index.htmlc) +[![GitHub Actions](https://img.shields.io/badge/github%20actions-%232671E5.svg?style=for-the-badge&logo=githubactions&logoColor=ffffff&labelColor=000000&color=000000)](https://github.com/features/actions) + +> [Вернуться в начало](#Начало) + + + +### Тестирование + +В проекте реализованы **unit-тесты** с использованием `pytest` и `pytest-asyncio`. + +- Тестируется CRUD-логика работы с ценами +- Асинхронные операции с базой данных +- Для тестов используется изолированная база данных (SQLite) + +Запуск тестов локально: + +```python +pytest -v +``` + +> [Вернуться в начало](#Начало) + + + +### Таблица эндпоинтов + +**Prices** + +|Метод|URL|Описание| +|:-:|:-:|:-:| +|GET|/all/{ticker}|Получение всех сохраненных данных по указанной валюте| +|GET|/latest/{ticker}|Получение последней цены валюты| +|GET|/filter_by_date/{ticker}|Получение цены валюты с фильтром по дате| + +> [Вернуться в начало](#Начало) + + + +### Шаблон заполнения .env-файла + +> `env.example` с дефолтнными значениями расположен в корневой папке + +```python +POSTGRES_DB = derbit_db # Имя базы дданнных +POSTGRES_USER = postgres # Имя юзера PostgreSQL +POSTGRES_PASSWORD = yourpassword # Пароль юзера PostgreSQL +DATABASE_URL = postgresql+asyncpg://postgres:yourpassword@db:5432/derbit_db # Указываем адрес БД +``` + +> [Вернуться в начало](#Начало) + + + +### Запуск проекта на локальной машине + +- Склонируйте репозиторий + +```python +git clone git@github.com:dmsnback/derbit_client.git +``` + +- Запускаем проект в **Docker** + +```python +make all +``` + +- Документация к API станет доступна по адресу: + +[http://localhost:8000/docs/](http://localhost:8000/docs/) + +> [Вернуться в начало](#Начало) + + + +### Автор + +- [Титенков Дмитрий](https://github.com/dmsnback) + +> [Вернуться в начало](#Начало) diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..df80d65 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,149 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s +# Or organize into date-based subdirectories (requires recursive_version_locations = true) +# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the tzdata library which can be installed by adding +# `alembic[tz]` to the pip requirements. +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..e0d0858 --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration with an async dbapi. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..6c25da4 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,95 @@ +import asyncio +import os +from logging.config import fileConfig + +from dotenv import load_dotenv +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context +from app.core.base import Base + +load_dotenv('.env') + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config +config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"]) + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/d32199ed3463_price_migration.py b/alembic/versions/d32199ed3463_price_migration.py new file mode 100644 index 0000000..0af6e75 --- /dev/null +++ b/alembic/versions/d32199ed3463_price_migration.py @@ -0,0 +1,42 @@ +"""Price migration + +Revision ID: d32199ed3463 +Revises: +Create Date: 2026-01-19 11:06:51.963663 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = 'd32199ed3463' +down_revision: Union[str, Sequence[str], None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('prices', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('ticker', sa.String(), nullable=False), + sa.Column('price', sa.Float(), nullable=False), + sa.Column('timestamp', sa.BigInteger(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_prices_ticker'), 'prices', ['ticker'], unique=False) + op.create_index(op.f('ix_prices_timestamp'), 'prices', ['timestamp'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_prices_timestamp'), table_name='prices') + op.drop_index(op.f('ix_prices_ticker'), table_name='prices') + op.drop_table('prices') + # ### end Alembic commands ### diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/celery_app.py b/app/celery_app.py new file mode 100644 index 0000000..239aa9c --- /dev/null +++ b/app/celery_app.py @@ -0,0 +1,22 @@ +from celery import Celery + +app = Celery( + "derbit_worker", + broker="redis://redis:6379/0", + backend="redis://redis:6379/0", +) + +app.conf.worker_pool = "solo" +app.conf.worker_concurrency = 1 +app.conf.task_acks_late = False + +app.autodiscover_tasks(["app.worker"]) + +app.conf.beat_schedule = { + "fetch_prices_every_minute": { + "task": "app.worker.tasks.fetch_prices", + "schedule": 60.0, + } +} + +app.conf.timezone = "UTC" diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/base.py b/app/core/base.py new file mode 100644 index 0000000..5a3eb96 --- /dev/null +++ b/app/core/base.py @@ -0,0 +1,2 @@ +from app.core.database import Base # noqa +from app.models.prices import Price # noqa diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..3e09595 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,14 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + app_title: str = "Клиент для криптобиржи Deribit" + description: str = ( + "Клиент каждую минуту забирает с биржи текущую цену btc_usd и eth_usd (index price валюты) после чего сохраняет в базу данных тикер валюты, текущую цену и время в UNIX timestamp" + ) + database_url: str + + model_config = {"env_file": ".env", "extra": "ignore"} + + +settings = Settings() diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..fa2b68a --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,5 @@ +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + pass diff --git a/app/core/logging.py b/app/core/logging.py new file mode 100644 index 0000000..78004f0 --- /dev/null +++ b/app/core/logging.py @@ -0,0 +1,28 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + +LOG_DIR = Path("logs") +LOG_DIR.mkdir(exist_ok=True) + +LOG_FILE = LOG_DIR / "app.log" + + +def setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s: - |%(levelname)s| %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[ + logging.StreamHandler(), + RotatingFileHandler( + filename=LOG_FILE, + maxBytes=1024 * 1024 * 5, + backupCount=5, + encoding="utf-8", + ), + ], + ) + + +logging.getLogger("uvicorn.access").setLevel(logging.WARNING) diff --git a/app/core/session.py b/app/core/session.py new file mode 100644 index 0000000..cb7a8ee --- /dev/null +++ b/app/core/session.py @@ -0,0 +1,12 @@ +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from app.core.config import settings + +engine = create_async_engine(settings.database_url, future=True) + +async_session = async_sessionmaker(engine, expire_on_commit=False) + + +async def get_session(): + async with async_session() as session: + yield session diff --git a/app/crud/__init__.py b/app/crud/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/crud/price_crud.py b/app/crud/price_crud.py new file mode 100644 index 0000000..ad26da7 --- /dev/null +++ b/app/crud/price_crud.py @@ -0,0 +1,100 @@ +import logging + +from sqlalchemy import select +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.prices import Price + +logger = logging.getLogger(__name__) + + +class CRUDPrice: + async def save_price( + self, session: AsyncSession, ticker: str, price: float, timestamp: int + ): + try: + new_price = Price(ticker=ticker, price=price, timestamp=timestamp) + session.add(new_price) + await session.flush() + await session.commit() + logger.info(f"Получение данных по валюте {ticker}") + return new_price + except SQLAlchemyError as e: + logger.error( + f"Ошибка при получении данных по валюте {ticker}: {e}" + ) + raise + + async def get_all( + self, ticker: str, size: int, offset: int, session: AsyncSession + ) -> list[Price]: + try: + query = ( + select(Price) + .where(Price.ticker == ticker) + .order_by(Price.timestamp.desc()) + .limit(size) + .offset(offset) + ) + result = await session.execute(query) + prices = result.scalars().all() + logger.info( + f"Получение всех сохраненных данных по валюте: {ticker}" + ) + return prices + except SQLAlchemyError as e: + logger.error( + f"Ошибка при получении всех сохраненных данных по валюте {ticker}: {e}" + ) + raise + + async def get_latest(sellf, ticker: str, session: AsyncSession): + try: + query = ( + select(Price) + .where(Price.ticker == ticker) + .order_by(Price.timestamp.desc()) + .limit(1) + ) + result = await session.execute(query) + price = result.scalar_one_or_none() + logger.info(f"Получение последней цены валюты: {ticker}") + return price + except SQLAlchemyError as e: + logger.error( + f"Ошибка получении последней цены валюты {ticker}: {e}" + ) + raise + + async def get_by_date( + self, + ticker: str, + size: int, + offset: int, + session: AsyncSession, + start: int | None = None, + end: int | None = None, + ) -> list[Price]: + try: + query = select(Price).where(Price.ticker == ticker) + if start is not None and end is not None and start > end: + raise ValueError("Start не может быть больше end") + if start is not None: + query = query.where(Price.timestamp >= start) + if end is not None: + query = query.where(Price.timestamp <= end) + query = ( + query.order_by(Price.timestamp.desc()) + .limit(size) + .offset(offset) + ) + result = await session.execute(query) + prices = result.scalars().all() + logger.info(f"Получение цены валюты {ticker} с фильтром по дате") + return prices + except SQLAlchemyError as e: + logger.error( + f"Ошибка при получении цены валюты {ticker} с фильтром по дате: {e}" + ) + raise diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..e0c9f85 --- /dev/null +++ b/app/main.py @@ -0,0 +1,28 @@ +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from app.core.config import settings +from app.core.logging import setup_logging +from app.routers.prices import price_router + +setup_logging() + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("Приложение derbitclient запущено") + yield + logger.info("Приложение derbitclient остановлено") + + +app = FastAPI( + title=settings.app_title, + description=settings.description, + lifespan=lifespan, +) + +app.include_router(price_router) diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/prices.py b/app/models/prices.py new file mode 100644 index 0000000..13767cd --- /dev/null +++ b/app/models/prices.py @@ -0,0 +1,15 @@ +from sqlalchemy import BigInteger, Float, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from app.core.database import Base + + +class Price(Base): + __tablename__ = "prices" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + ticker: Mapped[str] = mapped_column(String, index=True, nullable=False) + price: Mapped[float] = mapped_column(Float, nullable=False) + timestamp: Mapped[int] = mapped_column( + BigInteger, index=True, nullable=False + ) diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/prices.py b/app/routers/prices.py new file mode 100644 index 0000000..9280fcc --- /dev/null +++ b/app/routers/prices.py @@ -0,0 +1,111 @@ +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.session import get_session +from app.crud.price_crud import CRUDPrice +from app.schemas.prices import PriceReadSchema + +crud_price = CRUDPrice() + +price_router = APIRouter( + prefix="/prices", + tags=["Prices"], +) + + +@price_router.get( + "/all/{ticker}", + response_model=list[PriceReadSchema], + summary="Получение всех сохраненных данных по указанной валюте", + description=""" + Возвращает все записи цен для указанного тикера. + + Параметры: + - ticker: Название тикера (BTC_USD, ETH_USD) + """, +) +async def get_all( + ticker: str, + page: int = Query(1, ge=1), + size: int = Query(10, ge=1, le=100), + session: AsyncSession = Depends(get_session), +) -> list[PriceReadSchema]: + ticker = ticker.upper() + offset = (page - 1) * size + prices = await crud_price.get_all(ticker, size, offset, session) + try: + if not prices: + raise HTTPException(status_code=404, detail="Данные не найдены") + return prices + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Внутренняя ошибка сервера: {e}" + ) + + +@price_router.get( + "/latest/{ticker}", + response_model=PriceReadSchema, + summary="Получение последней цены валюты", + description=""" + Возвращает последнюю запись цены для указанного тикера. + + Параметры: + - ticker: Название тикера (BTC_USD, ETH_USD) + """, +) +async def get_latest( + ticker: str, session: AsyncSession = Depends(get_session) +) -> PriceReadSchema: + ticker = ticker.upper() + prices = await crud_price.get_latest(ticker, session) + try: + if not prices: + raise HTTPException(status_code=404, detail="Данные не найдены") + return prices + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Внутренняя ошибка сервера: {e}" + ) + + +@price_router.get( + "/filter_by_date/{ticker}", + response_model=list[PriceReadSchema], + summary="Получение цены валюты с фильтром по дате", + description=""" + Возвращает цены для указанного тикера с фильтрацией по дате. + + Параметры: + - ticker: Название тикера (BTC_USD, ETH_USD) + - start: Начальная дата в формате timestamp + - end: Конечная дата в формате timestamp + """, +) +async def get_by_date( + ticker: str, + page: int = Query(1, ge=1), + size: int = Query(10, ge=1, le=100), + start: int | None = Query( + default=None, description="Начальная дата в формате timestamp" + ), + end: int | None = Query( + default=None, description="Конечная дата в формате timestamp" + ), + session: AsyncSession = Depends(get_session), +) -> list[PriceReadSchema]: + try: + ticker = ticker.upper() + offset = (page - 1) * size + prices = await crud_price.get_by_date( + ticker, size, offset, session, start, end + ) + if not prices: + raise HTTPException(status_code=404, detail="Данные не найдены") + return prices + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Внутренняя ошибка сервера: {e}" + ) diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/schemas/prices.py b/app/schemas/prices.py new file mode 100644 index 0000000..38cf802 --- /dev/null +++ b/app/schemas/prices.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel + + +class PriceBaseSchema(BaseModel): + id: int + ticker: str + price: float + timestamp: int + + +class PriceReadSchema(PriceBaseSchema): + pass diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/derbit_client.py b/app/services/derbit_client.py new file mode 100644 index 0000000..61774f2 --- /dev/null +++ b/app/services/derbit_client.py @@ -0,0 +1,19 @@ +import aiohttp + + +class DerbitClient: + BASE_URL = "https://test.deribit.com/api/v2/public" + + def __init__(self, session: aiohttp.ClientSession): + self.session = session + + async def get_index_price(self, ticker: str) -> float: + """Получает index price для указаного тикера (btc_usd или eth_usd)""" + url = f"{self.BASE_URL}/get_index_price" + params = {"index_name": ticker.lower()} + async with self.session.get(url=url, params=params) as response: + data = await response.json() + return data["result"]["index_price"] + + async def close(self): + await self.session.close() diff --git a/app/worker/__init__.py b/app/worker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/worker/tasks.py b/app/worker/tasks.py new file mode 100644 index 0000000..a6994f2 --- /dev/null +++ b/app/worker/tasks.py @@ -0,0 +1,42 @@ +import asyncio +import logging +import time + +import aiohttp + +from app.celery_app import app +from app.core.session import get_session +from app.crud.price_crud import CRUDPrice +from app.services.derbit_client import DerbitClient + +logger = logging.getLogger(__name__) + + +@app.task(ignore_result=True) +def fetch_prices(): + asyncio.get_event_loop().run_until_complete(run()) + + +async def run(): + async with aiohttp.ClientSession() as http_session: + crud_price = CRUDPrice() + client = DerbitClient(http_session) + + async for db_session in get_session(): + for ticker in ["btc_usd", "eth_usd"]: + try: + price = await client.get_index_price(ticker) + + await crud_price.save_price( + ticker=ticker.upper(), + price=price, + timestamp=int(time.time()), + session=db_session, + ) + logger.info( + f"Получение данных по валюте: {ticker.upper()}" + ) + except Exception as e: + logger.error( + f"Ошибка получения данных по валюте {ticker.upper()}: {e}" + ) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..1b8f14b --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,51 @@ +version: '3.8' + +services: + db: + image: postgres:15.0-alpine + volumes: + - postgres_data:/var/lib/postgresql/data/ + ports: + - "5432:5432" + env_file: + - ./.env + redis: + image: redis:7 + ports: + - "6379:6379" + + backend: + image: dmsn/derbit_client:prod + restart: always + depends_on: + - db + - redis + env_file: + - ./.env + ports: + - "8000:8000" + volumes: + - .:/app + command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + + worker: + build: . + user: "1001:1001" + command: celery -A app.celery_app.app worker --loglevel=info + volumes: + - .:/app + depends_on: + - redis + - db + + beat: + build: . + command: celery -A app.celery_app.app beat --loglevel=info + volumes: + - .:/app + depends_on: + - redis + - db + +volumes: + postgres_data: \ No newline at end of file diff --git a/env.example b/env.example new file mode 100644 index 0000000..370301b --- /dev/null +++ b/env.example @@ -0,0 +1,7 @@ +DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/derbit_db + +POSTGRES_DB = derbit_db +POSTGRES_USER = postgres +POSTGRES_PASSWORD = postgres +POSTGRES_HOST=db +POSTGRES_PORT=5432 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b6fed53 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[tool.black] +line-length = 79 +target-version = ['py38', 'py39', 'py310','py311'] +extend-exclude = ''' +( + \.venv + | venv + | build + | dist + | alembic +) +''' +[tool.isort] +profile = "black" +line_length = 79 +multi_line_output = 3 +force_grid_wrap = 0 +skip_gitignore = true +skip_glob = ["**/alembic/*"] +src_paths = ["app"] \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d280de0 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..93c1066 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,67 @@ +aiohappyeyeballs==2.6.1 +aiohttp==3.13.3 +aiosignal==1.4.0 +aiosqlite==0.22.1 +alembic==1.18.1 +amqp==5.3.1 +annotated-doc==0.0.4 +annotated-types==0.7.0 +anyio==4.12.1 +asyncpg==0.31.0 +attrs==25.4.0 +billiard==4.2.4 +black==25.12.0 +celery==5.6.2 +click==8.3.1 +click-didyoumean==0.3.1 +click-plugins==1.1.1.2 +click-repl==0.3.0 +coverage==7.13.1 +fastapi==0.128.0 +flake8==7.3.0 +frozenlist==1.8.0 +greenlet==3.3.0 +h11==0.16.0 +httptools==0.7.1 +idna==3.11 +iniconfig==2.3.0 +isort==7.0.0 +kombu==5.6.2 +Mako==1.3.10 +MarkupSafe==3.0.3 +mccabe==0.7.0 +multidict==6.7.0 +mypy_extensions==1.1.0 +packaging==25.0 +pathspec==1.0.3 +platformdirs==4.5.1 +pluggy==1.6.0 +prompt_toolkit==3.0.52 +propcache==0.4.1 +pycodestyle==2.14.0 +pydantic==2.12.5 +pydantic-settings==2.12.0 +pydantic_core==2.41.5 +pyflakes==3.4.0 +Pygments==2.19.2 +pytest==9.0.2 +pytest-asyncio==1.3.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.2.1 +pytokens==0.3.0 +PyYAML==6.0.3 +redis==7.1.0 +six==1.17.0 +SQLAlchemy==2.0.45 +starlette==0.50.0 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +tzdata==2025.3 +tzlocal==5.3.1 +uvicorn==0.40.0 +uvloop==0.22.1 +vine==5.1.0 +watchfiles==1.1.1 +wcwidth==0.2.14 +websockets==16.0 +yarl==1.22.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_crud_price.py b/tests/test_crud_price.py new file mode 100644 index 0000000..c4377c9 --- /dev/null +++ b/tests/test_crud_price.py @@ -0,0 +1,93 @@ +import pytest +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker + +from app.crud.price_crud import CRUDPrice +from app.models.prices import Base + +DATABASE_URL = "sqlite+aiosqlite:///:memory:" + +engine = create_async_engine(DATABASE_URL, echo=False) +AsyncSessionLocal = sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False +) + +crud_price = CRUDPrice() + + +@pytest.fixture +async def session(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with AsyncSessionLocal() as session_local: + yield session_local + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + await engine.dispose() + + +@pytest.mark.asyncio +async def test_save_price(session): + """Тест сохранения в БД""" + price = await crud_price.save_price( + session, ticker="BTC_USD", price=5000.00, timestamp=100 + ) + assert price.ticker == "BTC_USD" + assert price.price == 5000.00 + assert price.timestamp == 100 + + +@pytest.mark.asyncio +async def test_get_all(session): + """Тест получеения всех записей и работы пагинации""" + for i in range(5): + await crud_price.save_price( + session, ticker="BTC_USD", price=5000.00 + i, timestamp=100 + i + ) + + prices = await crud_price.get_all( + "BTC_USD", session=session, size=3, offset=0 + ) + assert len(prices) == 3 + + +@pytest.mark.asyncio +async def test_get_latest(session): + """Тест получения последней цены.""" + await crud_price.save_price( + session, ticker="BTC_USD", price=6000.00, timestamp=500 + ) + latest_price = await crud_price.get_latest("BTC_USD", session=session) + assert latest_price.ticker == "BTC_USD" + assert latest_price.price == 6000.00 + assert latest_price.timestamp == 500 + + +@pytest.mark.asyncio +async def test_get_by_date(session): + """Тест получения цены с фильтрацией по дате, ValueError когда start > end""" + await crud_price.save_price( + session, ticker="BTC_USD", price=6000.00, timestamp=700 + ) + await crud_price.save_price( + session, ticker="BTC_USD", price=5000.00, timestamp=800 + ) + await crud_price.save_price( + session, ticker="BTC_USD", price=4000.00, timestamp=900 + ) + + prices = await crud_price.get_by_date( + "BTC_USD", size=3, offset=0, start=750, end=900, session=session + ) + assert len(prices) == 2 + + for price in prices: + assert 750 <= price.timestamp <= 900 + + with pytest.raises(ValueError): + await crud_price.get_by_date( + "BTC_USD", size=3, offset=0, start=900, end=750, session=session + )