Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 114 additions & 8 deletions src/dataforge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,33 @@ def _parse_partition_by(partition_by: tuple[str, ...]) -> dict[str, str]:
return result


def _parse_date_granularity(specs: tuple[str, ...]) -> dict[str, str]:
"""Converte 'table:gran' ou 'gran' em {table: gran}. Chave '*' = todas as tabelas."""
result: dict[str, str] = {}
for spec in specs:
if ":" in spec:
table, gran = spec.split(":", 1)
result[table.strip()] = gran.strip()
else:
result["*"] = spec.strip()
return result


def _truncate_date_value(val, granularity: str) -> str:
"""Trunca um valor de data conforme granularidade ('year' → YYYY, 'month' → YYYY-MM)."""
import pandas as pd

try:
ts = pd.Timestamp(val)
if granularity == "year":
return str(ts.year)
elif granularity == "month":
return f"{ts.year:04d}-{ts.month:02d}"
except Exception:
pass
return str(val).replace("/", "-").replace(" ", "_")


def _write_batch(
datasets: dict,
formats: tuple[str, ...],
Expand All @@ -170,6 +197,7 @@ def _write_batch(
recurrence: bool,
partition_map: dict[str, str] | None = None,
max_workers: int = 16,
date_granularity_map: dict[str, str] | None = None,
) -> list[tuple[Path, str]]:
"""Escreve um batch de datasets. Retorna lista de (local_path, remote_sub) onde
remote_sub é o caminho relativo ao prefix: 'dataset/table_name/[col=val/]filename.ext'."""
Expand All @@ -188,12 +216,16 @@ def _write_batch(
written.append((path, f"{schema.name}/{name}/{path.name}"))

elif fmt == "json" and recurrence and batch > 1:
paths = _append_json_flat(datasets, out_path, schema.name, partition_map)
paths = _append_json_flat(
datasets, out_path, schema.name, partition_map, date_granularity_map
)
for p, remote_sub in paths:
written.append((p, remote_sub))

elif fmt == "csv" and recurrence and batch > 1:
paths = _append_csv(datasets, out_path, schema.name, partition_map)
paths = _append_csv(
datasets, out_path, schema.name, partition_map, date_granularity_map
)
for p, remote_sub in paths:
written.append((p, remote_sub))

Expand All @@ -206,6 +238,13 @@ def _write_batch(
if partition_map:
partition_by = partition_map.get(name) or partition_map.get("*")

# Resolve granularidade para esta tabela
date_granularity: str | None = None
if date_granularity_map:
date_granularity = date_granularity_map.get(name) or date_granularity_map.get(
"*"
)

if partition_by and partition_by in df.columns:
# Escrita particionada: uma subpasta por valor da coluna — paralela
appendable = fmt in ("csv", "json")
Expand All @@ -219,9 +258,13 @@ def _write_partition(
_ts=ts,
_recurrence=recurrence,
_appendable=appendable,
_date_granularity=date_granularity,
):
val, group = val_group
safe_val = str(val).replace("/", "-").replace(" ", "_")
if _date_granularity:
safe_val = _truncate_date_value(val, _date_granularity)
else:
safe_val = str(val).replace("/", "-").replace(" ", "_")
filename = (
f"{_name}_{_ts}{_ext}"
if _recurrence and not _appendable
Expand All @@ -238,7 +281,17 @@ def _write_partition(
remote_sub = f"{schema.name}/{_name}/{_partition_by}={safe_val}/{filename}"
return dest, remote_sub

partitions = list(df.groupby(partition_by))
if date_granularity:
_grp_col = f"__partition_key_{partition_by}__"
df[_grp_col] = df[partition_by].apply(
lambda v, _g=date_granularity: _truncate_date_value(v, _g)
)
partitions = list(df.groupby(_grp_col))
df.drop(columns=[_grp_col], inplace=True)
for _key, _grp in partitions:
_grp.drop(columns=[_grp_col], inplace=True, errors="ignore")
else:
partitions = list(df.groupby(partition_by))
max_workers = min(max_workers, len(partitions))
results: list[tuple[Path, str]] = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
Expand Down Expand Up @@ -274,15 +327,34 @@ def _append_csv(
out_path: Path,
dataset_name: str,
partition_map: dict[str, str] | None = None,
date_granularity_map: dict[str, str] | None = None,
) -> list[tuple[Path, str]]:
written: list[tuple[Path, str]] = []
for name, df in datasets.items():
partition_by = (
(partition_map.get(name) or partition_map.get("*")) if partition_map else None
)
date_granularity = (
(date_granularity_map.get(name) or date_granularity_map.get("*"))
if date_granularity_map
else None
)
if partition_by and partition_by in df.columns:
for val, group in df.groupby(partition_by):
safe_val = str(val).replace("/", "-").replace(" ", "_")
if date_granularity:
_grp_col = f"__pk_{partition_by}__"
df = df.copy()
df[_grp_col] = df[partition_by].apply(
lambda v, _g=date_granularity: _truncate_date_value(v, _g)
)
groups = [(k, g.drop(columns=[_grp_col])) for k, g in df.groupby(_grp_col)]
else:
groups = list(df.groupby(partition_by))
for val, group in groups:
safe_val = (
str(val).replace("/", "-").replace(" ", "_")
if not date_granularity
else str(val)
)
dest_dir = out_path / dataset_name / name / f"{partition_by}={safe_val}"
dest_dir.mkdir(parents=True, exist_ok=True)
path = dest_dir / f"{name}.csv"
Expand All @@ -306,15 +378,34 @@ def _append_json_flat(
out_path: Path,
dataset_name: str,
partition_map: dict[str, str] | None = None,
date_granularity_map: dict[str, str] | None = None,
) -> list[tuple[Path, str]]:
written: list[tuple[Path, str]] = []
for name, df in datasets.items():
partition_by = (
(partition_map.get(name) or partition_map.get("*")) if partition_map else None
)
date_granularity = (
(date_granularity_map.get(name) or date_granularity_map.get("*"))
if date_granularity_map
else None
)
if partition_by and partition_by in df.columns:
for val, group in df.groupby(partition_by):
safe_val = str(val).replace("/", "-").replace(" ", "_")
if date_granularity:
_grp_col = f"__pk_{partition_by}__"
df = df.copy()
df[_grp_col] = df[partition_by].apply(
lambda v, _g=date_granularity: _truncate_date_value(v, _g)
)
groups = [(k, g.drop(columns=[_grp_col])) for k, g in df.groupby(_grp_col)]
else:
groups = list(df.groupby(partition_by))
for val, group in groups:
safe_val = (
str(val).replace("/", "-").replace(" ", "_")
if not date_granularity
else str(val)
)
dest_dir = out_path / dataset_name / name / f"{partition_by}={safe_val}"
dest_dir.mkdir(parents=True, exist_ok=True)
path = dest_dir / f"{name}.json"
Expand Down Expand Up @@ -414,6 +505,17 @@ def cli():
multiple=True,
help="Partition output Hive-style. Use 'column' (all tables) or 'table:column' (per table). Repeatable.",
)
@click.option(
"--partition-date-granularity",
"partition_date_granularity",
multiple=True,
help=(
"Truncate date partition values. "
"Use 'granularity' (all tables) or 'table:granularity' (per table). "
"Granularity options: year (→ YYYY) or month (→ YYYY-MM). "
"Example: --partition-date-granularity month or --partition-date-granularity orders:year"
),
)
# SQL loading
@click.option(
"--db-url",
Expand Down Expand Up @@ -473,6 +575,7 @@ def generate(
prefix,
credentials,
partition_by,
partition_date_granularity,
db_url,
if_exists,
db_schema,
Expand Down Expand Up @@ -540,6 +643,9 @@ def generate(
recurrence=is_recurrent,
partition_map=_parse_partition_by(partition_by) if partition_by else None,
max_workers=workers,
date_granularity_map=_parse_date_granularity(partition_date_granularity)
if partition_date_granularity
else None,
)

# Upload
Expand Down
108 changes: 84 additions & 24 deletions src/dataforge/frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ const VALID_DTYPES = [

const nodeTypes = { tableNode: TableNode };

function newId(): string {
if (typeof crypto !== 'undefined' && typeof (crypto as any).randomUUID === 'function') {
return (crypto as any).randomUUID() as string;
}
// fallback for non-secure contexts (HTTP via network IP)
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => {
const r = Math.random() * 16 | 0;
return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
}

const DATE_DTYPES = new Set(['date', 'date_of_birth', 'past_date', 'future_date', 'iso8601']);
const DATE_FAKER_METHODS = new Set(['date', 'date_of_birth', 'past_date', 'future_date', 'iso8601', 'date_time', 'date_time_between']);

function isDateColumn(col: Column): boolean {
return DATE_DTYPES.has(col.dtype) || DATE_FAKER_METHODS.has(col.fakerProvider);
}


export default function App() {
const [domain, setDomain] = useState("custom");
Expand Down Expand Up @@ -234,7 +252,7 @@ export default function App() {
columns: [
...table.columns,
{
id: crypto.randomUUID(),
id: newId(),
name: `col_${table.columns.length + 1}`,
dtype: 'str',
isPrimaryKey: false,
Expand Down Expand Up @@ -276,11 +294,11 @@ export default function App() {
}, []);

const addTable = () => {
const newId = crypto.randomUUID();
const tableId = newId();
setTables([
...tables,
{
id: newId,
id: tableId,
name: `table_${tables.length + 1}`,
rows: 1000,
columns: [],
Expand Down Expand Up @@ -659,6 +677,7 @@ export default function App() {
bucket: string,
prefix: string,
partitionByTable: Record<string, string>,
partitionDateGranularity: Record<string, string>,
jsonMode: string,
seed: string,
dbUrl: string,
Expand Down Expand Up @@ -686,6 +705,7 @@ export default function App() {
bucket: '',
prefix: 'datasets/',
partitionByTable: {},
partitionDateGranularity: {},
jsonMode: 'flat',
seed: '',
dbUrl: '',
Expand Down Expand Up @@ -731,6 +751,7 @@ export default function App() {
bucket: runConfig.bucket.trim(),
prefix: runConfig.prefix.trim() || domain,
partitionByTable: Object.keys(runConfig.partitionByTable).length > 0 ? runConfig.partitionByTable : undefined,
partitionDateGranularity: Object.keys(runConfig.partitionDateGranularity).length > 0 ? runConfig.partitionDateGranularity : undefined,
jsonMode: runConfig.jsonMode,
seed: runConfig.seed !== '' ? parseInt(runConfig.seed) : undefined,
dbUrl: runConfig.destination === 'database' ? computedDbUrl || undefined : undefined,
Expand Down Expand Up @@ -1802,27 +1823,66 @@ export default function App() {
{tables.length > 0 && (
<div style={{ marginBottom: '0.75rem' }}>
<label style={{ display: 'block', marginBottom: '0.5rem', color: '#cbd5e1', fontSize: '0.85rem' }}>Partition By <span style={{ color: '#64748b' }}>(per table — Hive-style)</span></label>
<div style={{ display: 'flex', flexDirection: 'column', gap: '0.4rem' }}>
{tables.map(t => (
<div key={t.id} style={{ display: 'flex', alignItems: 'center', gap: '0.5rem' }}>
<span style={{ width: '120px', fontSize: '0.8rem', color: '#94a3b8', overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap', flexShrink: 0 }}>{t.name}</span>
<select
value={runConfig.partitionByTable[t.name] || ''}
onChange={e => {
const col = e.target.value;
const next = { ...runConfig.partitionByTable };
if (col) next[t.name] = col; else delete next[t.name];
setRunConfig(r => ({ ...r, partitionByTable: next }));
}}
style={{ flex: 1, padding: '0.35rem 0.5rem', background: 'rgba(255,255,255,0.04)', border: '1px solid rgba(255,255,255,0.1)', borderRadius: '6px', color: runConfig.partitionByTable[t.name] ? '#e2e8f0' : '#475569', fontSize: '0.82rem' }}
>
<option value="">— no partition —</option>
{t.columns.map(c => (
<option key={c.id} value={c.name} style={{ color: 'black' }}>{c.name}</option>
))}
</select>
</div>
))}
<div style={{ display: 'flex', flexDirection: 'column', gap: '0.6rem' }}>
{tables.map(t => {
const col = runConfig.partitionByTable[t.name] || '';
const gran = runConfig.partitionDateGranularity[t.name] || '';
const colObj = t.columns.find(c => c.name === col);
const showGranularity = !!col && !!colObj && isDateColumn(colObj);
return (
<div key={t.id} style={{ display: 'flex', flexDirection: 'column', gap: '0.3rem' }}>
<div style={{ display: 'flex', alignItems: 'center', gap: '0.5rem' }}>
<span style={{ width: '120px', fontSize: '0.8rem', color: '#94a3b8', overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap', flexShrink: 0 }}>{t.name}</span>
<select
value={col}
onChange={e => {
const val = e.target.value;
const next = { ...runConfig.partitionByTable };
if (val) next[t.name] = val; else delete next[t.name];
// clear granularity when column is removed
const nextGran = { ...runConfig.partitionDateGranularity };
if (!val) delete nextGran[t.name];
setRunConfig(r => ({ ...r, partitionByTable: next, partitionDateGranularity: nextGran }));
}}
style={{ flex: 1, padding: '0.35rem 0.5rem', background: 'rgba(255,255,255,0.04)', border: '1px solid rgba(255,255,255,0.1)', borderRadius: '6px', color: col ? '#e2e8f0' : '#475569', fontSize: '0.82rem' }}
>
<option value="">— no partition —</option>
{t.columns.map(c => (
<option key={c.id} value={c.name} style={{ color: 'black' }}>{c.name}</option>
))}
</select>
</div>
{showGranularity && (
<div style={{ display: 'flex', alignItems: 'center', gap: '0.4rem', paddingLeft: '128px' }}>
{[{ value: '', label: 'Full' }, { value: 'year', label: 'YYYY' }, { value: 'month', label: 'YYYY-MM' }].map(opt => (
<button
key={opt.value}
onClick={() => {
const nextGran = { ...runConfig.partitionDateGranularity };
if (opt.value) nextGran[t.name] = opt.value; else delete nextGran[t.name];
setRunConfig(r => ({ ...r, partitionDateGranularity: nextGran }));
}}
style={{
padding: '0.2rem 0.6rem',
borderRadius: '4px',
border: gran === opt.value ? '1px solid #6366f1' : '1px solid rgba(255,255,255,0.08)',
background: gran === opt.value ? 'rgba(99,102,241,0.2)' : 'rgba(255,255,255,0.03)',
color: gran === opt.value ? '#a5b4fc' : '#475569',
fontSize: '0.75rem',
cursor: 'pointer',
}}
>{opt.label}</button>
))}
{gran && (
<span style={{ fontSize: '0.72rem', color: '#475569', fontFamily: 'monospace' }}>
{col}={gran === 'year' ? '2024' : '2024-04'}/
</span>
)}
</div>
)}
</div>
);
})}
</div>
</div>
)}
Expand Down
Loading
Loading