From 098a5b51952106a59916ac6e98cc8006044e7012 Mon Sep 17 00:00:00 2001 From: Carlos Oliveira Date: Sat, 11 Apr 2026 13:21:12 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20distribui=C3=A7=C3=A3o=20realista=20de?= =?UTF-8?q?=20datas=20e=20floats,=20workers=20configur=C3=A1vel=20e=20bump?= =?UTF-8?q?=200.1.8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Float usa distribuição log-normal (mediana ~20% do range, cauda longa) - Date usa distribuição realista: ruído log-normal + efeito dia da semana + spike fim de mês - Escrita de partições paralelizada com ThreadPoolExecutor (flag --workers, default 16) - Campo Partition Workers na UI junto à seção de particionamento - Aviso laranja acima de 32 threads, vermelho acima de 64 - Correção: computedDbUrl movido para após declaração do runConfig - Remoção de import não utilizado (Upload) no App.tsx - Bump de versão: 0.1.7 -> 0.1.8 --- pyproject.toml | 2 +- src/dataforge/cli.py | 57 +++++++++++-- src/dataforge/core/registry.py | 101 +++++++++++++++++++---- src/dataforge/frontend/src/App.tsx | 25 ++++-- src/dataforge/frontend/src/vite-env.d.ts | 9 +- src/dataforge/frontend/vite.config.ts | 14 +++- 6 files changed, 169 insertions(+), 39 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 44a3ccd..22ada9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dataforge" -version = "0.1.7" +version = "0.1.8" description = "Synthetic relational dataset generator for data engineering studies" authors = [ {name = "Carlos Oliveira", email = "papodedados@gmail.com"} diff --git a/src/dataforge/cli.py b/src/dataforge/cli.py index dfcc321..48728e7 100644 --- a/src/dataforge/cli.py +++ b/src/dataforge/cli.py @@ -2,6 +2,7 @@ import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from pathlib import Path @@ -168,6 +169,7 @@ def _write_batch( batch: int, recurrence: bool, partition_map: dict[str, str] | None = None, + max_workers: int = 16, ) -> list[tuple[Path, str]]: """Escreve um batch de datasets. Retorna lista de (local_path, remote_sub) onde remote_sub é o caminho relativo ao prefix: 'dataset/table_name/[col=val/]filename.ext'.""" @@ -205,19 +207,48 @@ def _write_batch( partition_by = partition_map.get(name) or partition_map.get("*") if partition_by and partition_by in df.columns: - # Escrita particionada: uma subpasta por valor da coluna + # Escrita particionada: uma subpasta por valor da coluna — paralela appendable = fmt in ("csv", "json") - for val, group in df.groupby(partition_by): + + def _write_partition( + val_group, + _fmt=fmt, + _ext=ext, + _name=name, + _partition_by=partition_by, + _ts=ts, + _recurrence=recurrence, + _appendable=appendable, + ): + val, group = val_group safe_val = str(val).replace("/", "-").replace(" ", "_") - if recurrence and not appendable: - filename = f"{name}_{ts}{ext}" - else: - filename = f"{name}{ext}" + filename = ( + f"{_name}_{_ts}{_ext}" + if _recurrence and not _appendable + else f"{_name}{_ext}" + ) dest = ( - out_path / schema.name / name / f"{partition_by}={safe_val}" / filename + out_path + / schema.name + / _name + / f"{_partition_by}={safe_val}" + / filename ) - _write_df_direct(group, fmt, dest, json_mode) - remote_sub = f"{schema.name}/{name}/{partition_by}={safe_val}/{filename}" + _write_df_direct(group, _fmt, dest, json_mode) + remote_sub = f"{schema.name}/{_name}/{_partition_by}={safe_val}/{filename}" + return dest, remote_sub + + partitions = list(df.groupby(partition_by)) + max_workers = min(max_workers, len(partitions)) + results: list[tuple[Path, str]] = [] + with ThreadPoolExecutor(max_workers=max_workers) as pool: + futures = {pool.submit(_write_partition, vg): vg for vg in partitions} + for fut in as_completed(futures): + dest, remote_sub = fut.result() + results.append((dest, remote_sub)) + + # Exibe e registra em ordem determinística + for dest, remote_sub in sorted(results, key=lambda x: str(x[0])): click.echo(f" [{fmt}] {dest}") written.append((dest, remote_sub)) else: @@ -421,6 +452,12 @@ def cli(): "Example: orders:created_at:1:days or sales:amount:100:value" ), ) +@click.option( + "--workers", + default=16, + type=int, + help="Max parallel threads for partitioned writes. Default: 16.", +) def generate( domain, config, @@ -442,6 +479,7 @@ def generate( recurrence, count, increment, + workers, ): """Generate synthetic datasets and optionally write to files, cloud or SQL. @@ -501,6 +539,7 @@ def generate( batch=batch, recurrence=is_recurrent, partition_map=_parse_partition_by(partition_by) if partition_by else None, + max_workers=workers, ) # Upload diff --git a/src/dataforge/core/registry.py b/src/dataforge/core/registry.py index cb0947f..c5fe5fb 100644 --- a/src/dataforge/core/registry.py +++ b/src/dataforge/core/registry.py @@ -1,7 +1,9 @@ from __future__ import annotations +import random import re from datetime import date as _date +from datetime import timedelta as _timedelta # Dtypes that support min/max ranges RANGE_SUPPORTED_DTYPES = {"int", "float", "date"} @@ -20,29 +22,98 @@ def _normalize_date_value(value: str) -> str: ) +def _generate_floats(n: int, min_value: float, max_value: float) -> list[float]: + """Gera floats com distribuição log-normal, espelhando valores monetários reais. + + Valores monetários reais seguem uma distribuição log-normal: muitas transações + pequenas/médias e poucas muito grandes. A distribuição é calibrada para que + a mediana fique em ~20% do range e a média em ~35%, com cauda longa para cima. + """ + import math + + span = max_value - min_value + if span <= 0: + return [round(min_value, 2)] * n + + # Calibra mu/sigma para que a mediana fique em ~20% do span + # log-normal: mediana = exp(mu), mean = exp(mu + sigma²/2) + target_median = min_value + span * 0.20 + sigma = 1.1 # cauda longa — realista para transações + mu = math.log(max(target_median - min_value, 1e-9)) + + results = [] + for _ in range(n): + raw = random.lognormvariate(mu, sigma) + # Clamp dentro do range e arredonda para 2 casas + value = round(min(max(raw + min_value, min_value), max_value), 2) + results.append(value) + return results + + +def _resolve_date(f, value) -> _date: + """Converte min/max para objeto date.""" + if not isinstance(value, str): + return _date.fromisoformat(str(value)) + normalized = _normalize_date_value(value) + return f.date_between(start_date=normalized, end_date=normalized) + + +def _generate_dates(f, n: int, min_value, max_value) -> list[str]: + """Gera n datas simulando o padrão de transações reais. + + Combina três efeitos independentes para cada data do range: + + 1. Ruído base aleatório (log-normal) — dias "quentes" e "frios" surgem + organicamente, sem padrão fixo. + 2. Dia da semana — seg-sex recebem ~2× mais volume que sáb-dom, + refletindo o comportamento típico de transações comerciais. + 3. Fim de mês — os últimos 3 dias do mês recebem um spike de 1.5×, + simulando fechamentos, pagamentos e renovações. + + O produto dos três fatores forma o peso final de cada data. + """ + import math + + start = _resolve_date(f, min_value) + end = _resolve_date(f, max_value) + total_days = max((end - start).days, 1) + + # Gera todas as datas do range + all_dates = [start + _timedelta(days=d) for d in range(total_days + 1)] + + weights = [] + for d in all_dates: + # 1. Ruído log-normal: media=0, sigma=0.6 → maioria entre 0.5x e 2x + noise = math.exp(random.gauss(0, 0.6)) + + # 2. Efeito dia da semana: seg=0 ... dom=6; fim de semana leva 50% do volume + weekday_factor = 1.0 if d.weekday() < 5 else 0.5 + + # 3. Spike de fim de mês: últimos 3 dias do mês + import calendar + + last_day = calendar.monthrange(d.year, d.month)[1] + month_end_factor = 1.5 if d.day >= last_day - 2 else 1.0 + + weights.append(noise * weekday_factor * month_end_factor) + + return [random.choices(all_dates, weights=weights, k=1)[0].isoformat() for _ in range(n)] + + FAKER_REGISTRY: dict[str, callable] = { "uuid": lambda f, n, **kw: [f.uuid4() for _ in range(n)], "int_seq": lambda f, n, seq_start=1, **kw: list(range(seq_start, seq_start + n)), "int": lambda f, n, min_value=0, max_value=100_000, **kw: [ f.random_int(min=int(min_value), max=int(max_value)) for _ in range(n) ], - "float": lambda f, n, min_value=0, max_value=10_000, **kw: [ - round(f.pyfloat(min_value=float(min_value), max_value=float(max_value)), 2) - for _ in range(n) - ], + "float": lambda f, n, min_value=0, max_value=10_000, **kw: _generate_floats( + n, float(min_value), float(max_value) + ), "str": lambda f, n, **kw: [f.word() for _ in range(n)], "bool": lambda f, n, **kw: [f.boolean() for _ in range(n)], - "date": lambda f, n, min_value="-3y", max_value="today", **kw: [ - f.date_between( - start_date=_normalize_date_value(min_value) - if isinstance(min_value, str) - else _date.fromisoformat(str(min_value)), - end_date=_normalize_date_value(max_value) - if isinstance(max_value, str) - else _date.fromisoformat(str(max_value)), - ).isoformat() - for _ in range(n) - ], + "date": lambda f, n, min_value="-3y", max_value="today", **kw: _generate_dates( + f, n, min_value, max_value + ), "name": lambda f, n, **kw: [f.name() for _ in range(n)], "email": lambda f, n, **kw: [f.email() for _ in range(n)], "phone": lambda f, n, **kw: [f.phone_number() for _ in range(n)], diff --git a/src/dataforge/frontend/src/App.tsx b/src/dataforge/frontend/src/App.tsx index d2ee125..0262dec 100644 --- a/src/dataforge/frontend/src/App.tsx +++ b/src/dataforge/frontend/src/App.tsx @@ -2,8 +2,8 @@ import React, { useState, useRef, useCallback, useEffect } from 'react'; import dagre from 'dagre'; import ReactFlow, { Background, Controls, ConnectionLineType, useNodesState, useEdgesState, MarkerType } from 'reactflow'; import type { Edge, Node } from 'reactflow'; -import 'reactflow/dist/style.css'; -import { Plus, Download, FileJson, Upload, Trash2, Key, Link as LinkIcon, X, Network, Play, BookOpen, Search, Sparkles } from 'lucide-react'; +import 'reactflow/dist/style.css'; // ReactFlow v11 has no non-dist CSS export — necessary exception +import { Plus, Download, FileJson, Trash2, Key, Link as LinkIcon, X, Network, Play, BookOpen, Search, Sparkles } from 'lucide-react'; const FAKER_CATALOG: { category: string; color: string; methods: { name: string; example: string }[] }[] = [ { category: 'Person', color: '#60a5fa', methods: [ @@ -441,8 +441,6 @@ export default function App() { return `${driver}://${creds}${form.host}${port}/${form.database}`; }; - const computedDbUrl = dbAdvanced ? runConfig.dbUrl : buildDbUrl(dbForm); - const handleTestDbConnection = async () => { setDbTestStatus('testing'); setDbTestError(''); @@ -670,6 +668,7 @@ export default function App() { count: string, tablesToInclude: string[], columnsFilter: string, + workers: string, increments: Array<{ table: string; column: string; step: string; unit: string }>, cloudCreds: { gcsJson: string, @@ -697,6 +696,7 @@ export default function App() { tablesToInclude: [], columnsFilter: '', increments: [], + workers: '16', cloudCreds: { gcsJson: '', s3AccessKey: '', @@ -705,6 +705,8 @@ export default function App() { azureConnStr: '', }, }); + const computedDbUrl = dbAdvanced ? runConfig.dbUrl : buildDbUrl(dbForm); + const [runLogs, setRunLogs] = useState(''); const [isRunning, setIsRunning] = useState(false); const runAbortRef = useRef(null); @@ -739,6 +741,7 @@ export default function App() { tables: runConfig.tablesToInclude.length > 0 ? runConfig.tablesToInclude : undefined, columns: runConfig.columnsFilter.trim() ? runConfig.columnsFilter.trim().split('\n').filter(Boolean) : undefined, increments: runConfig.increments.filter(i => i.table && i.column && i.step !== ''), + workers: runConfig.workers !== '' ? parseInt(runConfig.workers) : 16, cloudCreds: runConfig.destination === 'cloud' ? runConfig.cloudCreds : undefined, }) }); @@ -805,7 +808,7 @@ export default function App() {
- v{import.meta.env.VITE_APP_VERSION} + v{__APP_VERSION__}
)} + {Object.values(runConfig.partitionByTable).some(v => v) && ( +
+ + setRunConfig(r => ({...r, workers: e.target.value}))} style={{ width: '100%', padding: '0.5rem' }} placeholder="16" min="1" /> + {(() => { + const w = parseInt(runConfig.workers); + if (w > 64) return

⚠ Above 64 threads may cause diminishing returns or instability. Recommended: 16–32 for local SSD, up to 64 for network storage.

; + if (w > 32) return

⚠ Above 32 threads only helps with slow network storage (Docker volumes, S3). Local SSD saturates earlier.

; + return null; + })()} +
+ )} {/* Column Filters disabled */} diff --git a/src/dataforge/frontend/src/vite-env.d.ts b/src/dataforge/frontend/src/vite-env.d.ts index 4c755bf..245d782 100644 --- a/src/dataforge/frontend/src/vite-env.d.ts +++ b/src/dataforge/frontend/src/vite-env.d.ts @@ -1,9 +1,4 @@ /// -interface ImportMetaEnv { - readonly VITE_APP_VERSION: string -} - -interface ImportMeta { - readonly env: ImportMetaEnv -} +// Injected at build time via vite.config.ts `define` from pyproject.toml +declare const __APP_VERSION__: string diff --git a/src/dataforge/frontend/vite.config.ts b/src/dataforge/frontend/vite.config.ts index a1eec92..d69f356 100644 --- a/src/dataforge/frontend/vite.config.ts +++ b/src/dataforge/frontend/vite.config.ts @@ -83,7 +83,7 @@ const cliRunnerPlugin = () => ({ req.on('end', () => { try { const data = JSON.parse(body); - const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds } = data; + const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds, workers } = data; const baseDir = resolve(__dirname, '../../../'); // Cloud and database-only runs use a temp dir that is cleaned up after @@ -184,6 +184,9 @@ const cliRunnerPlugin = () => ({ } } } + if (workers !== undefined && workers !== null && workers !== '') { + args.push('--workers', String(workers)); + } const venvPath = resolve(baseDir, '.venv', 'Scripts', 'python.exe'); const pyExec = existsSync(venvPath) ? venvPath : 'python'; @@ -780,7 +783,9 @@ DATASET DESCRIPTION: export default defineConfig({ plugins: [react(), cliRunnerPlugin()], define: { - 'import.meta.env.VITE_APP_VERSION': JSON.stringify(appVersion), + // Injected at build time from pyproject.toml — not available as a .env file + // because the version is the Python package's single source of truth. + __APP_VERSION__: JSON.stringify(appVersion), }, server: { host: '0.0.0.0', // Required for Docker @@ -789,6 +794,11 @@ export default defineConfig({ allow: ['../schemas', '.'] } }, + // dagre is a CommonJS library — pre-bundle it so Vite doesn't fail in dev mode + // See: https://vitejs.dev/config/dep-optimization-options#optimizedeps-include + optimizeDeps: { + include: ['dagre'], + }, build: { rollupOptions: { output: {