diff --git a/src/dataforge/cli.py b/src/dataforge/cli.py index 48728e7..03f5f9b 100644 --- a/src/dataforge/cli.py +++ b/src/dataforge/cli.py @@ -160,6 +160,33 @@ def _parse_partition_by(partition_by: tuple[str, ...]) -> dict[str, str]: return result +def _parse_date_granularity(specs: tuple[str, ...]) -> dict[str, str]: + """Converte 'table:gran' ou 'gran' em {table: gran}. Chave '*' = todas as tabelas.""" + result: dict[str, str] = {} + for spec in specs: + if ":" in spec: + table, gran = spec.split(":", 1) + result[table.strip()] = gran.strip() + else: + result["*"] = spec.strip() + return result + + +def _truncate_date_value(val, granularity: str) -> str: + """Trunca um valor de data conforme granularidade ('year' → YYYY, 'month' → YYYY-MM).""" + import pandas as pd + + try: + ts = pd.Timestamp(val) + if granularity == "year": + return str(ts.year) + elif granularity == "month": + return f"{ts.year:04d}-{ts.month:02d}" + except Exception: + pass + return str(val).replace("/", "-").replace(" ", "_") + + def _write_batch( datasets: dict, formats: tuple[str, ...], @@ -170,6 +197,7 @@ def _write_batch( recurrence: bool, partition_map: dict[str, str] | None = None, max_workers: int = 16, + date_granularity_map: dict[str, str] | None = None, ) -> list[tuple[Path, str]]: """Escreve um batch de datasets. Retorna lista de (local_path, remote_sub) onde remote_sub é o caminho relativo ao prefix: 'dataset/table_name/[col=val/]filename.ext'.""" @@ -188,12 +216,16 @@ def _write_batch( written.append((path, f"{schema.name}/{name}/{path.name}")) elif fmt == "json" and recurrence and batch > 1: - paths = _append_json_flat(datasets, out_path, schema.name, partition_map) + paths = _append_json_flat( + datasets, out_path, schema.name, partition_map, date_granularity_map + ) for p, remote_sub in paths: written.append((p, remote_sub)) elif fmt == "csv" and recurrence and batch > 1: - paths = _append_csv(datasets, out_path, schema.name, partition_map) + paths = _append_csv( + datasets, out_path, schema.name, partition_map, date_granularity_map + ) for p, remote_sub in paths: written.append((p, remote_sub)) @@ -206,6 +238,13 @@ def _write_batch( if partition_map: partition_by = partition_map.get(name) or partition_map.get("*") + # Resolve granularidade para esta tabela + date_granularity: str | None = None + if date_granularity_map: + date_granularity = date_granularity_map.get(name) or date_granularity_map.get( + "*" + ) + if partition_by and partition_by in df.columns: # Escrita particionada: uma subpasta por valor da coluna — paralela appendable = fmt in ("csv", "json") @@ -219,9 +258,13 @@ def _write_partition( _ts=ts, _recurrence=recurrence, _appendable=appendable, + _date_granularity=date_granularity, ): val, group = val_group - safe_val = str(val).replace("/", "-").replace(" ", "_") + if _date_granularity: + safe_val = _truncate_date_value(val, _date_granularity) + else: + safe_val = str(val).replace("/", "-").replace(" ", "_") filename = ( f"{_name}_{_ts}{_ext}" if _recurrence and not _appendable @@ -238,7 +281,17 @@ def _write_partition( remote_sub = f"{schema.name}/{_name}/{_partition_by}={safe_val}/{filename}" return dest, remote_sub - partitions = list(df.groupby(partition_by)) + if date_granularity: + _grp_col = f"__partition_key_{partition_by}__" + df[_grp_col] = df[partition_by].apply( + lambda v, _g=date_granularity: _truncate_date_value(v, _g) + ) + partitions = list(df.groupby(_grp_col)) + df.drop(columns=[_grp_col], inplace=True) + for _key, _grp in partitions: + _grp.drop(columns=[_grp_col], inplace=True, errors="ignore") + else: + partitions = list(df.groupby(partition_by)) max_workers = min(max_workers, len(partitions)) results: list[tuple[Path, str]] = [] with ThreadPoolExecutor(max_workers=max_workers) as pool: @@ -274,15 +327,34 @@ def _append_csv( out_path: Path, dataset_name: str, partition_map: dict[str, str] | None = None, + date_granularity_map: dict[str, str] | None = None, ) -> list[tuple[Path, str]]: written: list[tuple[Path, str]] = [] for name, df in datasets.items(): partition_by = ( (partition_map.get(name) or partition_map.get("*")) if partition_map else None ) + date_granularity = ( + (date_granularity_map.get(name) or date_granularity_map.get("*")) + if date_granularity_map + else None + ) if partition_by and partition_by in df.columns: - for val, group in df.groupby(partition_by): - safe_val = str(val).replace("/", "-").replace(" ", "_") + if date_granularity: + _grp_col = f"__pk_{partition_by}__" + df = df.copy() + df[_grp_col] = df[partition_by].apply( + lambda v, _g=date_granularity: _truncate_date_value(v, _g) + ) + groups = [(k, g.drop(columns=[_grp_col])) for k, g in df.groupby(_grp_col)] + else: + groups = list(df.groupby(partition_by)) + for val, group in groups: + safe_val = ( + str(val).replace("/", "-").replace(" ", "_") + if not date_granularity + else str(val) + ) dest_dir = out_path / dataset_name / name / f"{partition_by}={safe_val}" dest_dir.mkdir(parents=True, exist_ok=True) path = dest_dir / f"{name}.csv" @@ -306,15 +378,34 @@ def _append_json_flat( out_path: Path, dataset_name: str, partition_map: dict[str, str] | None = None, + date_granularity_map: dict[str, str] | None = None, ) -> list[tuple[Path, str]]: written: list[tuple[Path, str]] = [] for name, df in datasets.items(): partition_by = ( (partition_map.get(name) or partition_map.get("*")) if partition_map else None ) + date_granularity = ( + (date_granularity_map.get(name) or date_granularity_map.get("*")) + if date_granularity_map + else None + ) if partition_by and partition_by in df.columns: - for val, group in df.groupby(partition_by): - safe_val = str(val).replace("/", "-").replace(" ", "_") + if date_granularity: + _grp_col = f"__pk_{partition_by}__" + df = df.copy() + df[_grp_col] = df[partition_by].apply( + lambda v, _g=date_granularity: _truncate_date_value(v, _g) + ) + groups = [(k, g.drop(columns=[_grp_col])) for k, g in df.groupby(_grp_col)] + else: + groups = list(df.groupby(partition_by)) + for val, group in groups: + safe_val = ( + str(val).replace("/", "-").replace(" ", "_") + if not date_granularity + else str(val) + ) dest_dir = out_path / dataset_name / name / f"{partition_by}={safe_val}" dest_dir.mkdir(parents=True, exist_ok=True) path = dest_dir / f"{name}.json" @@ -414,6 +505,17 @@ def cli(): multiple=True, help="Partition output Hive-style. Use 'column' (all tables) or 'table:column' (per table). Repeatable.", ) +@click.option( + "--partition-date-granularity", + "partition_date_granularity", + multiple=True, + help=( + "Truncate date partition values. " + "Use 'granularity' (all tables) or 'table:granularity' (per table). " + "Granularity options: year (→ YYYY) or month (→ YYYY-MM). " + "Example: --partition-date-granularity month or --partition-date-granularity orders:year" + ), +) # SQL loading @click.option( "--db-url", @@ -473,6 +575,7 @@ def generate( prefix, credentials, partition_by, + partition_date_granularity, db_url, if_exists, db_schema, @@ -540,6 +643,9 @@ def generate( recurrence=is_recurrent, partition_map=_parse_partition_by(partition_by) if partition_by else None, max_workers=workers, + date_granularity_map=_parse_date_granularity(partition_date_granularity) + if partition_date_granularity + else None, ) # Upload diff --git a/src/dataforge/frontend/src/App.tsx b/src/dataforge/frontend/src/App.tsx index 0262dec..2244996 100644 --- a/src/dataforge/frontend/src/App.tsx +++ b/src/dataforge/frontend/src/App.tsx @@ -147,6 +147,24 @@ const VALID_DTYPES = [ const nodeTypes = { tableNode: TableNode }; +function newId(): string { + if (typeof crypto !== 'undefined' && typeof (crypto as any).randomUUID === 'function') { + return (crypto as any).randomUUID() as string; + } + // fallback for non-secure contexts (HTTP via network IP) + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { + const r = Math.random() * 16 | 0; + return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); + }); +} + +const DATE_DTYPES = new Set(['date', 'date_of_birth', 'past_date', 'future_date', 'iso8601']); +const DATE_FAKER_METHODS = new Set(['date', 'date_of_birth', 'past_date', 'future_date', 'iso8601', 'date_time', 'date_time_between']); + +function isDateColumn(col: Column): boolean { + return DATE_DTYPES.has(col.dtype) || DATE_FAKER_METHODS.has(col.fakerProvider); +} + export default function App() { const [domain, setDomain] = useState("custom"); @@ -234,7 +252,7 @@ export default function App() { columns: [ ...table.columns, { - id: crypto.randomUUID(), + id: newId(), name: `col_${table.columns.length + 1}`, dtype: 'str', isPrimaryKey: false, @@ -276,11 +294,11 @@ export default function App() { }, []); const addTable = () => { - const newId = crypto.randomUUID(); + const tableId = newId(); setTables([ ...tables, { - id: newId, + id: tableId, name: `table_${tables.length + 1}`, rows: 1000, columns: [], @@ -659,6 +677,7 @@ export default function App() { bucket: string, prefix: string, partitionByTable: Record, + partitionDateGranularity: Record, jsonMode: string, seed: string, dbUrl: string, @@ -686,6 +705,7 @@ export default function App() { bucket: '', prefix: 'datasets/', partitionByTable: {}, + partitionDateGranularity: {}, jsonMode: 'flat', seed: '', dbUrl: '', @@ -731,6 +751,7 @@ export default function App() { bucket: runConfig.bucket.trim(), prefix: runConfig.prefix.trim() || domain, partitionByTable: Object.keys(runConfig.partitionByTable).length > 0 ? runConfig.partitionByTable : undefined, + partitionDateGranularity: Object.keys(runConfig.partitionDateGranularity).length > 0 ? runConfig.partitionDateGranularity : undefined, jsonMode: runConfig.jsonMode, seed: runConfig.seed !== '' ? parseInt(runConfig.seed) : undefined, dbUrl: runConfig.destination === 'database' ? computedDbUrl || undefined : undefined, @@ -1802,27 +1823,66 @@ export default function App() { {tables.length > 0 && (
-
- {tables.map(t => ( -
- {t.name} - -
- ))} +
+ {tables.map(t => { + const col = runConfig.partitionByTable[t.name] || ''; + const gran = runConfig.partitionDateGranularity[t.name] || ''; + const colObj = t.columns.find(c => c.name === col); + const showGranularity = !!col && !!colObj && isDateColumn(colObj); + return ( +
+
+ {t.name} + +
+ {showGranularity && ( +
+ {[{ value: '', label: 'Full' }, { value: 'year', label: 'YYYY' }, { value: 'month', label: 'YYYY-MM' }].map(opt => ( + + ))} + {gran && ( + + {col}={gran === 'year' ? '2024' : '2024-04'}/ + + )} +
+ )} +
+ ); + })}
)} diff --git a/src/dataforge/frontend/src/services/SchemaReader.ts b/src/dataforge/frontend/src/services/SchemaReader.ts index 64a56dc..678090c 100644 --- a/src/dataforge/frontend/src/services/SchemaReader.ts +++ b/src/dataforge/frontend/src/services/SchemaReader.ts @@ -1,6 +1,16 @@ import YAML from 'yaml'; import type { Schema, Table, Column } from '../types/schema'; +function genId(): string { + if (typeof crypto !== 'undefined' && typeof (crypto as any).randomUUID === 'function') { + return (crypto as any).randomUUID() as string; + } + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c => { + const r = Math.random() * 16 | 0; + return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16); + }); +} + export class SchemaReader { /** * Parses a YAML string and converts it into the internal React state representation. @@ -38,7 +48,7 @@ export class SchemaReader { const columns: Column[] = Object.keys(rawCols).map(colName => { const cData = rawCols[colName]; return { - id: crypto.randomUUID(), + id: genId(), name: colName, dtype: cData.dtype || 'str', isPrimaryKey: !!cData.primary_key, @@ -54,7 +64,7 @@ export class SchemaReader { }); return { - id: crypto.randomUUID(), + id: genId(), name: tableName, rows: tData.rows || 1000, columns diff --git a/src/dataforge/frontend/vite.config.ts b/src/dataforge/frontend/vite.config.ts index d69f356..99067e1 100644 --- a/src/dataforge/frontend/vite.config.ts +++ b/src/dataforge/frontend/vite.config.ts @@ -83,7 +83,7 @@ const cliRunnerPlugin = () => ({ req.on('end', () => { try { const data = JSON.parse(body); - const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds, workers } = data; + const { yamlStr, formats, outputDir, uploadTarget, bucket, prefix, partitionByTable, partitionDateGranularity, jsonMode, seed, dbUrl, ifExists, dbSchema, recurrence, count, credentials, rows, tables: tablesToInclude, columns: columnsToInclude, increments, cloudCreds, workers } = data; const baseDir = resolve(__dirname, '../../../'); // Cloud and database-only runs use a temp dir that is cleaned up after @@ -125,6 +125,11 @@ const cliRunnerPlugin = () => ({ if (col) args.push('--partition-by', `${table}:${col}`); } } + if (partitionDateGranularity && typeof partitionDateGranularity === 'object') { + for (const [table, gran] of Object.entries(partitionDateGranularity as Record)) { + if (gran === 'year' || gran === 'month') args.push('--partition-date-granularity', `${table}:${gran}`); + } + } // Resolve credentials: UI input takes priority, falls back to credentials/ folder const credentialsDir = resolve(baseDir, 'credentials'); const extraEnv: Record = {};