Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dataforge"
version = "0.1.7"
version = "0.1.8"
description = "Synthetic relational dataset generator for data engineering studies"
authors = [
{name = "Carlos Oliveira", email = "papodedados@gmail.com"}
Expand Down
57 changes: 48 additions & 9 deletions src/dataforge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from pathlib import Path

Expand Down Expand Up @@ -168,6 +169,7 @@ def _write_batch(
batch: int,
recurrence: bool,
partition_map: dict[str, str] | None = None,
max_workers: int = 16,
) -> list[tuple[Path, str]]:
"""Escreve um batch de datasets. Retorna lista de (local_path, remote_sub) onde
remote_sub é o caminho relativo ao prefix: 'dataset/table_name/[col=val/]filename.ext'."""
Expand Down Expand Up @@ -205,19 +207,48 @@ def _write_batch(
partition_by = partition_map.get(name) or partition_map.get("*")

if partition_by and partition_by in df.columns:
# Escrita particionada: uma subpasta por valor da coluna
# Escrita particionada: uma subpasta por valor da coluna — paralela
appendable = fmt in ("csv", "json")
for val, group in df.groupby(partition_by):

def _write_partition(
val_group,
_fmt=fmt,
_ext=ext,
_name=name,
_partition_by=partition_by,
_ts=ts,
_recurrence=recurrence,
_appendable=appendable,
):
val, group = val_group
safe_val = str(val).replace("/", "-").replace(" ", "_")
if recurrence and not appendable:
filename = f"{name}_{ts}{ext}"
else:
filename = f"{name}{ext}"
filename = (
f"{_name}_{_ts}{_ext}"
if _recurrence and not _appendable
else f"{_name}{_ext}"
)
dest = (
out_path / schema.name / name / f"{partition_by}={safe_val}" / filename
out_path
/ schema.name
/ _name
/ f"{_partition_by}={safe_val}"
/ filename
)
_write_df_direct(group, fmt, dest, json_mode)
remote_sub = f"{schema.name}/{name}/{partition_by}={safe_val}/{filename}"
_write_df_direct(group, _fmt, dest, json_mode)
remote_sub = f"{schema.name}/{_name}/{_partition_by}={safe_val}/{filename}"
return dest, remote_sub

partitions = list(df.groupby(partition_by))
max_workers = min(max_workers, len(partitions))
results: list[tuple[Path, str]] = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_write_partition, vg): vg for vg in partitions}
for fut in as_completed(futures):
dest, remote_sub = fut.result()
results.append((dest, remote_sub))

# Exibe e registra em ordem determinística
for dest, remote_sub in sorted(results, key=lambda x: str(x[0])):
click.echo(f" [{fmt}] {dest}")
written.append((dest, remote_sub))
else:
Expand Down Expand Up @@ -421,6 +452,12 @@ def cli():
"Example: orders:created_at:1:days or sales:amount:100:value"
),
)
@click.option(
"--workers",
default=16,
type=int,
help="Max parallel threads for partitioned writes. Default: 16.",
)
def generate(
domain,
config,
Expand All @@ -442,6 +479,7 @@ def generate(
recurrence,
count,
increment,
workers,
):
"""Generate synthetic datasets and optionally write to files, cloud or SQL.

Expand Down Expand Up @@ -501,6 +539,7 @@ def generate(
batch=batch,
recurrence=is_recurrent,
partition_map=_parse_partition_by(partition_by) if partition_by else None,
max_workers=workers,
)

# Upload
Expand Down
101 changes: 86 additions & 15 deletions src/dataforge/core/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import random
import re
from datetime import date as _date
from datetime import timedelta as _timedelta

# Dtypes that support min/max ranges
RANGE_SUPPORTED_DTYPES = {"int", "float", "date"}
Expand All @@ -20,29 +22,98 @@ def _normalize_date_value(value: str) -> str:
)


def _generate_floats(n: int, min_value: float, max_value: float) -> list[float]:
"""Gera floats com distribuição log-normal, espelhando valores monetários reais.

Valores monetários reais seguem uma distribuição log-normal: muitas transações
pequenas/médias e poucas muito grandes. A distribuição é calibrada para que
a mediana fique em ~20% do range e a média em ~35%, com cauda longa para cima.
"""
import math

span = max_value - min_value
if span <= 0:
return [round(min_value, 2)] * n

# Calibra mu/sigma para que a mediana fique em ~20% do span
# log-normal: mediana = exp(mu), mean = exp(mu + sigma²/2)
target_median = min_value + span * 0.20
sigma = 1.1 # cauda longa — realista para transações
mu = math.log(max(target_median - min_value, 1e-9))

results = []
for _ in range(n):
raw = random.lognormvariate(mu, sigma)
# Clamp dentro do range e arredonda para 2 casas
value = round(min(max(raw + min_value, min_value), max_value), 2)
results.append(value)
return results


def _resolve_date(f, value) -> _date:
"""Converte min/max para objeto date."""
if not isinstance(value, str):
return _date.fromisoformat(str(value))
normalized = _normalize_date_value(value)
return f.date_between(start_date=normalized, end_date=normalized)


def _generate_dates(f, n: int, min_value, max_value) -> list[str]:
"""Gera n datas simulando o padrão de transações reais.

Combina três efeitos independentes para cada data do range:

1. Ruído base aleatório (log-normal) — dias "quentes" e "frios" surgem
organicamente, sem padrão fixo.
2. Dia da semana — seg-sex recebem ~2× mais volume que sáb-dom,
refletindo o comportamento típico de transações comerciais.
3. Fim de mês — os últimos 3 dias do mês recebem um spike de 1.5×,
simulando fechamentos, pagamentos e renovações.

O produto dos três fatores forma o peso final de cada data.
"""
import math

start = _resolve_date(f, min_value)
end = _resolve_date(f, max_value)
total_days = max((end - start).days, 1)

# Gera todas as datas do range
all_dates = [start + _timedelta(days=d) for d in range(total_days + 1)]

weights = []
for d in all_dates:
# 1. Ruído log-normal: media=0, sigma=0.6 → maioria entre 0.5x e 2x
noise = math.exp(random.gauss(0, 0.6))

# 2. Efeito dia da semana: seg=0 ... dom=6; fim de semana leva 50% do volume
weekday_factor = 1.0 if d.weekday() < 5 else 0.5

# 3. Spike de fim de mês: últimos 3 dias do mês
import calendar

last_day = calendar.monthrange(d.year, d.month)[1]
month_end_factor = 1.5 if d.day >= last_day - 2 else 1.0

weights.append(noise * weekday_factor * month_end_factor)

return [random.choices(all_dates, weights=weights, k=1)[0].isoformat() for _ in range(n)]


FAKER_REGISTRY: dict[str, callable] = {
"uuid": lambda f, n, **kw: [f.uuid4() for _ in range(n)],
"int_seq": lambda f, n, seq_start=1, **kw: list(range(seq_start, seq_start + n)),
"int": lambda f, n, min_value=0, max_value=100_000, **kw: [
f.random_int(min=int(min_value), max=int(max_value)) for _ in range(n)
],
"float": lambda f, n, min_value=0, max_value=10_000, **kw: [
round(f.pyfloat(min_value=float(min_value), max_value=float(max_value)), 2)
for _ in range(n)
],
"float": lambda f, n, min_value=0, max_value=10_000, **kw: _generate_floats(
n, float(min_value), float(max_value)
),
"str": lambda f, n, **kw: [f.word() for _ in range(n)],
"bool": lambda f, n, **kw: [f.boolean() for _ in range(n)],
"date": lambda f, n, min_value="-3y", max_value="today", **kw: [
f.date_between(
start_date=_normalize_date_value(min_value)
if isinstance(min_value, str)
else _date.fromisoformat(str(min_value)),
end_date=_normalize_date_value(max_value)
if isinstance(max_value, str)
else _date.fromisoformat(str(max_value)),
).isoformat()
for _ in range(n)
],
"date": lambda f, n, min_value="-3y", max_value="today", **kw: _generate_dates(
f, n, min_value, max_value
),
"name": lambda f, n, **kw: [f.name() for _ in range(n)],
"email": lambda f, n, **kw: [f.email() for _ in range(n)],
"phone": lambda f, n, **kw: [f.phone_number() for _ in range(n)],
Expand Down
25 changes: 20 additions & 5 deletions src/dataforge/frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import React, { useState, useRef, useCallback, useEffect } from 'react';
import dagre from 'dagre';
import ReactFlow, { Background, Controls, ConnectionLineType, useNodesState, useEdgesState, MarkerType } from 'reactflow';
import type { Edge, Node } from 'reactflow';
import 'reactflow/dist/style.css';
import { Plus, Download, FileJson, Upload, Trash2, Key, Link as LinkIcon, X, Network, Play, BookOpen, Search, Sparkles } from 'lucide-react';
import 'reactflow/dist/style.css'; // ReactFlow v11 has no non-dist CSS export — necessary exception
import { Plus, Download, FileJson, Trash2, Key, Link as LinkIcon, X, Network, Play, BookOpen, Search, Sparkles } from 'lucide-react';

const FAKER_CATALOG: { category: string; color: string; methods: { name: string; example: string }[] }[] = [
{ category: 'Person', color: '#60a5fa', methods: [
Expand Down Expand Up @@ -441,8 +441,6 @@ export default function App() {
return `${driver}://${creds}${form.host}${port}/${form.database}`;
};

const computedDbUrl = dbAdvanced ? runConfig.dbUrl : buildDbUrl(dbForm);

const handleTestDbConnection = async () => {
setDbTestStatus('testing');
setDbTestError('');
Expand Down Expand Up @@ -670,6 +668,7 @@ export default function App() {
count: string,
tablesToInclude: string[],
columnsFilter: string,
workers: string,
increments: Array<{ table: string; column: string; step: string; unit: string }>,
cloudCreds: {
gcsJson: string,
Expand Down Expand Up @@ -697,6 +696,7 @@ export default function App() {
tablesToInclude: [],
columnsFilter: '',
increments: [],
workers: '16',
cloudCreds: {
gcsJson: '',
s3AccessKey: '',
Expand All @@ -705,6 +705,8 @@ export default function App() {
azureConnStr: '',
},
});
const computedDbUrl = dbAdvanced ? runConfig.dbUrl : buildDbUrl(dbForm);

const [runLogs, setRunLogs] = useState('');
const [isRunning, setIsRunning] = useState(false);
const runAbortRef = useRef<AbortController | null>(null);
Expand Down Expand Up @@ -739,6 +741,7 @@ export default function App() {
tables: runConfig.tablesToInclude.length > 0 ? runConfig.tablesToInclude : undefined,
columns: runConfig.columnsFilter.trim() ? runConfig.columnsFilter.trim().split('\n').filter(Boolean) : undefined,
increments: runConfig.increments.filter(i => i.table && i.column && i.step !== ''),
workers: runConfig.workers !== '' ? parseInt(runConfig.workers) : 16,
cloudCreds: runConfig.destination === 'cloud' ? runConfig.cloudCreds : undefined,
})
});
Expand Down Expand Up @@ -805,7 +808,7 @@ export default function App() {
</div>
<div style={{ marginLeft: 'auto', display: 'flex', alignItems: 'center', gap: '1rem' }}>
<span style={{ fontFamily: 'var(--font-mono)', fontSize: '0.72rem', color: 'var(--text-subtle)', background: 'rgba(34,211,238,0.06)', border: '1px solid rgba(34,211,238,0.12)', borderRadius: '999px', padding: '0.2rem 0.75rem', letterSpacing: '0.04em' }}>
v{import.meta.env.VITE_APP_VERSION}
v{__APP_VERSION__}
</span>
<a href="https://ckoliveiraa.github.io/DataForge/" target="_blank" rel="noopener noreferrer"
style={{ display: 'flex', alignItems: 'center', gap: '0.4rem', color: 'var(--text-muted)', textDecoration: 'none', fontSize: '0.85rem', transition: 'color var(--duration-base) var(--ease-out)' }}
Expand Down Expand Up @@ -1823,6 +1826,18 @@ export default function App() {
</div>
</div>
)}
{Object.values(runConfig.partitionByTable).some(v => v) && (
<div>
<label style={{ display: 'block', marginBottom: '0.5rem', color: '#cbd5e1', fontSize: '0.85rem' }}>Partition Workers <span style={{ color: '#64748b' }}>(parallel threads)</span></label>
<input type="number" value={runConfig.workers} onChange={e => setRunConfig(r => ({...r, workers: e.target.value}))} style={{ width: '100%', padding: '0.5rem' }} placeholder="16" min="1" />
{(() => {
const w = parseInt(runConfig.workers);
if (w > 64) return <p style={{ margin: '0.35rem 0 0', fontSize: '0.75rem', color: '#f87171' }}>⚠ Above 64 threads may cause diminishing returns or instability. Recommended: 16–32 for local SSD, up to 64 for network storage.</p>;
if (w > 32) return <p style={{ margin: '0.35rem 0 0', fontSize: '0.75rem', color: '#fb923c' }}>⚠ Above 32 threads only helps with slow network storage (Docker volumes, S3). Local SSD saturates earlier.</p>;
return null;
})()}
</div>
)}
{/* Column Filters disabled */}
</div>

Expand Down
9 changes: 2 additions & 7 deletions src/dataforge/frontend/src/vite-env.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
/// <reference types="vite/client" />

interface ImportMetaEnv {
readonly VITE_APP_VERSION: string
}

interface ImportMeta {
readonly env: ImportMetaEnv
}
// Injected at build time via vite.config.ts `define` from pyproject.toml
declare const __APP_VERSION__: string
14 changes: 12 additions & 2 deletions src/dataforge/frontend/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const cliRunnerPlugin = () => ({
req.on('end', () => {
try {
const data = JSON.parse(body);
const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds } = data;
const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds, workers } = data;

const baseDir = resolve(__dirname, '../../../');
// Cloud and database-only runs use a temp dir that is cleaned up after
Expand Down Expand Up @@ -184,6 +184,9 @@ const cliRunnerPlugin = () => ({
}
}
}
if (workers !== undefined && workers !== null && workers !== '') {
args.push('--workers', String(workers));
}

const venvPath = resolve(baseDir, '.venv', 'Scripts', 'python.exe');
const pyExec = existsSync(venvPath) ? venvPath : 'python';
Expand Down Expand Up @@ -780,7 +783,9 @@ DATASET DESCRIPTION:
export default defineConfig({
plugins: [react(), cliRunnerPlugin()],
define: {
'import.meta.env.VITE_APP_VERSION': JSON.stringify(appVersion),
// Injected at build time from pyproject.toml — not available as a .env file
// because the version is the Python package's single source of truth.
__APP_VERSION__: JSON.stringify(appVersion),
},
server: {
host: '0.0.0.0', // Required for Docker
Expand All @@ -789,6 +794,11 @@ export default defineConfig({
allow: ['../schemas', '.']
}
},
// dagre is a CommonJS library — pre-bundle it so Vite doesn't fail in dev mode
// See: https://vitejs.dev/config/dep-optimization-options#optimizedeps-include
optimizeDeps: {
include: ['dagre'],
},
build: {
rollupOptions: {
output: {
Expand Down
Loading