-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
128 lines (101 loc) · 4.38 KB
/
main.py
File metadata and controls
128 lines (101 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import argparse
import json
import sys
from parser.csv_parser import parse_csv
from parser.json_parser import parse_json
from parser.xml_parser import parse_xml
from processor.cleaner import clean
from processor.transformer import filter_data, transform
from processor.aggregator import aggregate
from processor.validator import validate_schema
from output.writer import write_json
from utils.logger import log, banner, success_box
from utils.batch import get_files_from_batch, merge_results
FORMAT_PARSERS = {
'csv': parse_csv,
'json': parse_json,
'xml': parse_xml,
}
def parse_args():
parser = argparse.ArgumentParser(
description="🛠️ CLI Data Parser — reads CSV/JSON/XML and outputs structured JSON",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--input', required=True,
help='Input file, directory, or glob pattern (e.g. data/*.csv)')
parser.add_argument('--format', choices=['csv', 'json', 'xml'], default=None,
help='Input format (auto-detected if not set)')
parser.add_argument('--output', required=True,
help='Output JSON file path')
parser.add_argument('--filter', default=None,
help='Filter expression e.g. "age>25" or "age>25 AND status=active"')
parser.add_argument('--aggregate', action='store_true',
help='Include aggregated summary in output')
parser.add_argument('--schema', default=None,
help='Path to JSON schema file for validation')
parser.add_argument('--no-clean', action='store_true',
help='Skip data cleaning step')
parser.add_argument('--no-transform', action='store_true',
help='Skip data transformation step')
parser.add_argument('--chunk-size', type=int, default=10000,
help='Chunk size for large file processing (default: 10000)')
return parser.parse_args()
def load_schema(schema_path: str) -> dict:
try:
with open(schema_path) as f:
return json.load(f)
except Exception as e:
log("ERROR", f"[SCHEMA] Could not load schema: {e}")
return {}
def run():
args = parse_args()
banner("CLI Data Parser v2.0")
# ── STEP 1: RESOLVE FILES ──────────────────────
files = get_files_from_batch(args.input)
if not files:
log("ERROR", "No valid input files found. Exiting.")
sys.exit(1)
# If format is specified, override detected format
if args.format:
files = [(fp, args.format) for fp, _ in files]
# ── STEP 2: PARSE (batch or single) ───────────
all_data = []
for filepath, fmt in files:
log("INFO", f"Parsing '{filepath}' as {fmt.upper()}...")
parser_fn = FORMAT_PARSERS.get(fmt)
if not parser_fn:
log("ERROR", f"Unsupported format: {fmt}")
continue
if fmt == 'csv':
data = parse_csv(filepath, chunk_size=args.chunk_size)
else:
data = parser_fn(filepath)
all_data.append(data)
data = merge_results(all_data)
if not data:
log("ERROR", "No data parsed. Exiting.")
sys.exit(1)
# ── STEP 3: CLEAN ──────────────────────────────
if not args.no_clean:
data = clean(data)
# ── STEP 4: SCHEMA VALIDATION ──────────────────
invalid_records = []
if args.schema:
schema = load_schema(args.schema)
if schema:
data, invalid_records = validate_schema(data, schema)
# ── STEP 5: FILTER ─────────────────────────────
if args.filter:
data = filter_data(data, args.filter)
# ── STEP 6: TRANSFORM ──────────────────────────
if not args.no_transform:
data = transform(data)
# ── STEP 7: AGGREGATE ──────────────────────────
summary = None
if args.aggregate:
summary = aggregate(data)
# ── STEP 8: OUTPUT ─────────────────────────────
write_json(data, args.output, summary=summary, invalid=invalid_records or None)
success_box(f"Done! {len(data)} records saved to '{args.output}'")
if __name__ == '__main__':
run()