Skip to content

ab-dx/pathway-platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

292 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Velora

A comprehensive, real-time data streaming and processing platform that ingests multi-source data (news, stock market, and social media) through a medallion architecture (Bronze → Silver → Gold layers) using Pathway, Kafka, with agentic AI capabilities.

Table of Contents

Overview

Velora is an enterprise-grade market research platform designed to handle real-time streaming of diverse data sources with intelligent processing pipelines. The system leverages:

  • Pathway: A Python data stream processing framework for real-time analytics
  • Apache Kafka: Distributed event streaming platform for data transport
  • Firecrawl: Web scraping and data extraction service
  • Vector Stores: Semantic search and RAG (Retrieval Augmented Generation) capabilities
  • Prometheus & Grafana: Comprehensive monitoring and observability
  • Docker & Kubernetes: Container orchestration and deployment

Use Cases

  • Financial market intelligence and stock analysis
  • News sentiment analysis and company monitoring
  • Social media insights for brand/competitor tracking
  • Real-time data enrichment and vectorization
  • AI-powered semantic search and retrieval

UI UI

Architecture

Architecture Flow Chart

┌─────────────────────────────────────────────────────────────────┐
│                     DATA SOURCES                                │
│  (GDELT News, Stock Feeds, Reddit/Social Media, NSE filings)    │
└────────────────┬────────────────────────────────┬───────────────┘
                 │                                │
         ┌───────▼──────────┐          ┌──────────▼─────────┐
         │  BRONZE LAYER    │          │  BRONZE LAYER      │
         │  (Raw Ingestion) │          │  (Raw Ingestion)   │
         └────────┬─────────┘          └──────────┬─────────┘
                  │                               │
                  └───────────────┬───────────────┘
                                  │
                        ┌─────────▼────────┐
                        │    KAFKA TOPICS  │
                        └─────────┬────────┘
                                  │
         ┌────────────────────────┼────────────────────────┐
         │                        │                        │
    ┌────▼───────┐          ┌──────▼──────┐          ┌─────▼─────┐
    │   SILVER   │          │   SILVER    │          │  SILVER   │
    │   LAYER    │          │   LAYER     │          │  LAYER    │
    │(Enrichment)│          │(Enrichment) │          │(Enrichment)
    └────┬───────┘          └──────┬──────┘          └─────┬─────┘
         │                         │                       │
         └────────────────┬────────┴───────────────────────┘
                          │
                 ┌────────▼────────┐
                 │    KAFKA TOPICS │
                 └────────┬────────┘
                          │
    ┌─────────────────────┼─────────────────────┐
    │                     │                     │
┌───▼────────┐      ┌──────▼──────┐     ┌──────▼──────┐
│    GOLD    │      │    GOLD     │     │    GOLD     │
│   LAYER    │      │   LAYER     │     │   LAYER     │
│(Vector DB) │      │ (Vector DB) │     │ (Analytics) │
└────────────┘      └─────────────┘     └─────────────┘
       │                  │                    │
       └──────────────────┼────────────────────┘
                          │
              ┌───────────▼────────────┐
              │ Vector Store / APIs    │
              │ (Query & Retrieval)    │
              └────────────────────────┘
                          │
              ┌───────────▼────────────┐
              │  AI/ML Applications    │
              │  (Chatbots, Research)  │
              └────────────────────────┘

Project Structure

pathway-platform/
├── agentic-pipeline/              # AI-powered agentic systems
│   ├── chatbot/                   # LLM-based chatbot interface
│   ├── research_endpoint/         # Research tool endpoint
│   └── segraph/                   # SEO/semantic graph analysis
├── news-data-pipeline/            # GDELT News Processing
│   ├── gdelt_fetcher.py           # GDELT API data fetcher
│   ├── bronze_pipeline.py         # Raw data ingestion
│   ├── silver_pipeline.py         # Data enrichment with Firecrawl
│   ├── gold_pipeline.py           # Vector DB & storage
│   └── pathway_vector_pipeline.py # Vector embedding & storage
├── stock-data-pipelines/          # Stock Market Data Processing
│   ├── bronze-tier-pipeline/      # Raw stock data ingestion
│   ├── silver-tier-pipeline/      # Technical indicators & enrichment
│   ├── gold-tier-pipeline/        # Aggregation & analysis
│   ├── kite-stream-service/       # Real-time stock feed (Kite API)
│   ├── replay-stream-service/     # Historical data replay
│   └── backtesting-python/        # Backtesting & simulation engine
├── social-media-web-scrapping/    # Reddit & Social Media Analysis
│   ├── bronze_layer.py            # Raw post/comment ingestion
│   ├── silver_layer.py            # Text processing & cleaning
│   ├── gold_layer.py              # Company/sector analytics
│   └── pathway_vector_pipeline.py # Vector indexing
├── filings-pipeline/              # NSE Corporate Filings Processing
│   ├── nse_fetcher.py             # NSE filings data fetcher
│   ├── pathway_processor.py       # PDF parsing & data extraction
│   ├── pathway_rag.py             # Vector store & RAG implementation
│   ├── config.yaml                # Pipeline configuration
│   ├── create_topics.sh           # Kafka topic creation script
│   └── pathway_state/             # Persistent state storage
├── tradekit_rust/                 # High-Performance Backtesting Engine
│   ├── src/
│   │   ├── backtest_engine.rs     # Core backtesting engine (Rust)
│   │   ├── indicators.rs          # Technical indicators library
│   │   └── lib.rs                 # PyO3 Python bindings
│   ├── main.py                    # FastAPI server for backtest API
│   ├── agent.py                   # Backtesting agent orchestration
│   ├── worker.py                  # Background task worker
│   ├── controller.py              # Task management controller
│   ├── builder.py                 # Strategy builder utilities
│   ├── Cargo.toml                 # Rust dependencies
│   └── historical_data/           # Historical stock data storage
├── mcp-server/                    # Model Context Protocol Server
│   ├── main.py                    # MCP server implementation
│   ├── search_tool.py             # Web search integration
│   └── schemas/                   # Data schemas
├── nginx/                         # Nginx reverse proxy configuration
├── grafana/                       # Grafana dashboards & provisioning
├── jmx/                           # JMX monitoring for Kafka
├── k8s/                           # Kubernetes deployment manifests
├── docker/                        # Docker helper scripts
├── docker-compose.yml             # Complete development stack
├── prometheus.yml                 # Prometheus scrape configuration
└── imp-note.md                    # Important notes & gotchas

Features

Core Data Pipelines

  • GDELT News Pipeline: Real-time ingestion of global events, news, and trends from GDELT 2.0
  • Stock Market Pipeline: High-frequency stock data processing with technical indicators
  • Social Media Pipeline: Reddit and social media sentiment analysis
  • Web Scraping: Advanced scraping with Firecrawl for content extraction
  • Company Filings Pipeline: Automated ingestion of NSE filing for data extraction and indexing.

Processing & Enrichment

  • Real-time Stream Processing: Using Pathway for low-latency transformations
  • Data Enrichment: Automated content scraping, extraction, and enhancement
  • Semantic Search: Vector embeddings and similarity-based retrieval
  • Company Tagging: Automatic company/ticker identification in content

AI & Analytics

  • AI-Powered Chatbot: Conversational interface with LLM integration
  • MCP Server: Model Context Protocol for seamless tool integration
  • Research Endpoint: Advanced analysis and insights generation
  • Vector Stores: Multiple vector stores for news and social media

Observability

  • Prometheus Monitoring: Metrics collection from all services
  • Grafana Dashboards: Visual monitoring and analytics
  • Health Checks: Service health verification and auto-restart
  • Structured Logging: Comprehensive logging across all pipelines

Deployment

  • Docker Compose: Complete local development environment
  • Kubernetes: Production-ready k8s manifests for all services
  • Multi-network: Isolated networks for different service tiers

Prerequisites

Software Requirements

  • Docker Desktop (version 20.10+)
  • Docker Compose (version 1.29+)
  • Python 3.12+ (for local development without Docker)
  • Git
  • Node.js 18+ (for chatbot interface)

Required API Keys & Credentials

Create a .env file in the project root, similar to the following example:

KITE_API_KEY=
KITE_ACCESS_TOKEN=
HIST_MONGODB_URI=


# Dev Details
NODE_ENV=DEVELOPMENT
BACKEND_URL=
FRONTEND_URL=

# Authentication Secret
BETTER_AUTH_SECRET=

# Google Authentication
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=

CHATBOT_SECRET_KEY=

# Database
MONGO_BACKEND_URL=
MONGO_HISTORIAL_DATA_URL=

PATHWAY_MCP_URL=
BACKTEST_REMOTE_URL=
FIRECRAWL_URL=

# SMTP Details
SMTP_HOST=
SMTP_PORT=
SMTP_USER=
SMTP_PASS=
SMTP_FROM=
OPENAI_API_KEY=

NEXT_PUBLIC_FRONTEND=
NEXT_PUBLIC_BACKEND=
NEXT_PUBLIC_CHATBOT_WS_URL=
NEXT_PUBLIC_WS=

Installation & Setup

Option 1: Docker Compose

Step 1: Clone Repository

Step 2: Configure Environment

# Copy example environment file and fill with appropriate keys
cp .env.example .env

Step 3: Start Services

# Start all services in background
docker-compose up -d --build

# Or, to see logs in real-time
docker-compose up --build

Step 5: Verify Services

# Check all containers are running
docker-compose ps

# View logs for specific service
docker-compose logs -f gdelt-fetcher
docker-compose logs -f news-gold-pipeline

# Wait 30-60 seconds for services to be fully ready

Option 2: Kubernetes Deployment

Prerequisites

  • kubectl configured to access your cluster
  • Docker images built and pushed to registry

Step 1: Apply Manifests

# Apply services
kubectl apply -f k8s/

Step 2: Monitor Deployment

kubectl get pods
kubectl get deployments
kubectl get svc

Quick Start

Running the Complete Stack

# Start all services
docker-compose up -d

Access Monitoring Dashboards

(Additionally, this is reverse proxied via NGINX in the production environment)

Service Details

Kafka Network

Purpose: Core message bus for all data streams

  • Image: confluentinc/cp-kafka:7.5.0
  • Port: 9092 (internal), 29092 (host)

GDELT News Pipeline

Components

  1. gdelt-fetcher (8014)

    • Polls GDELT API every 15 minutes
    • Publishes to raw_gdelt_events topic
    • Metrics: Event count, API errors
  2. news-bronze-pipeline (8011)

    • Validates and normalizes raw events
    • Extracts key entities (companies, locations)
    • Publishes to processed_gdelt_events
  3. news-silver-pipeline (8012)

    • Fetches full article content via Firecrawl
    • Enriches with metadata and sentiment
    • Publishes to enriched_gdelt_events
  4. news-gold-pipeline (8765, 8013)

    • Creates vector embeddings
    • Stores in Pathway Vector Store

Key Endpoints:

POST /v1/retrieve - Semantic search with metadata filtering
GET /v1/statistics - Vector store statistics

Stock Data Pipeline

Components

  1. replay-stream-service or kite-stream-service

    • Generates/streams stock OHLC data
    • Publishes to raw_stock_data
  2. stock-bronze-tier-pipeline

    • Raw data validation and formatting
    • Deduplication
  3. stock-silver-tier-pipeline

    • Calculates technical indicators (MA, RSI, MACD, Bollinger Bands)
    • Stores recent data in Redis for quick access
  4. stock-gold-tier-pipeline

    • Aggregates signals across multiple timeframes
    • Generates trading signals

Social Media Pipeline

Components

  1. bronze-layer (8001)

    • Scrapes Reddit using PRAW
    • Ingests posts and comments
    • Publishes to bronze_reddit_* topics
  2. silver-layer (8003)

    • Text cleaning and preprocessing
    • Sentiment analysis
    • Company tagging
    • Publishes to silver_reddit_* topics
  3. gold-layer (8002)

    • Aggregates company sentiment
    • Sector-level analytics
    • Publishes analytics to gold_company_analytics

MCP Server

Purpose: Model Context Protocol server for LLM integration

  • Port: 8123
  • Language: Python
  • Key Tools:
    • News retrieval with semantic search
    • Social media sentiment queries
    • Web search integration
    • Stock data queries

PostgreSQL (NUQ)

Purpose: Structured data storage

  • Port: 5433
  • Credentials: postgres / postgres
  • Databases:
    • Firecrawl data
    • Job queues
    • Rate limiting

Monitoring & Observability

Prometheus Metrics

All services expose metrics on dedicated ports:

Grafana Dashboards

Access at http://localhost:3010

Grafana Dashboard

Default credentials: admin / admin

Pre-configured Dashboards:

  • Kafka Cluster Overview (Kafbat)
  • News Pipeline Health
  • Stock Pipeline Performance
  • Agent Metrics
  • System Resource Usage

About

A Unified Platform for Actionable Market Intelligence (Inter IIT Tech Meet 14.0)

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors