diff --git a/__pycache__/config.cpython-313.pyc b/__pycache__/config.cpython-313.pyc index da22857..977ff42 100644 Binary files a/__pycache__/config.cpython-313.pyc and b/__pycache__/config.cpython-313.pyc differ diff --git a/__pycache__/database.cpython-313.pyc b/__pycache__/database.cpython-313.pyc index e358c71..adee875 100644 Binary files a/__pycache__/database.cpython-313.pyc and b/__pycache__/database.cpython-313.pyc differ diff --git a/services/__pycache__/db_connector.cpython-313.pyc b/services/__pycache__/db_connector.cpython-313.pyc index 7d7e475..bb0fb77 100644 Binary files a/services/__pycache__/db_connector.cpython-313.pyc and b/services/__pycache__/db_connector.cpython-313.pyc differ diff --git a/services/__pycache__/ml_mapper.cpython-313.pyc b/services/__pycache__/ml_mapper.cpython-313.pyc index c2295fb..0198e5f 100644 Binary files a/services/__pycache__/ml_mapper.cpython-313.pyc and b/services/__pycache__/ml_mapper.cpython-313.pyc differ diff --git a/services/__pycache__/transformers.cpython-313.pyc b/services/__pycache__/transformers.cpython-313.pyc index 9548421..386a636 100644 Binary files a/services/__pycache__/transformers.cpython-313.pyc and b/services/__pycache__/transformers.cpython-313.pyc differ diff --git a/services/db_connector.py b/services/db_connector.py index 8715dbf..cfecb09 100644 --- a/services/db_connector.py +++ b/services/db_connector.py @@ -54,7 +54,16 @@ def create_sqlalchemy_engine(db_type, host, port, db_name, user, password, chars elif db_type == "Microsoft SQL Server": # Requires: pip install pymssql + # For Thai data: use 'utf8' or 'cp874' (Thai Windows codepage) + # If source contains legacy TIS-620, try 'cp874' charset mssql_charset = charset if charset else "utf8" + + query_params = {"charset": mssql_charset} + + # For legacy Thai databases, add TDS version for better compatibility + if charset in ['tis620', 'cp874', 'latin1']: + query_params["tds_version"] = "7.0" # Compatible with older SQL Server + connection_url = URL.create( "mssql+pymssql", username=user, @@ -62,7 +71,7 @@ def create_sqlalchemy_engine(db_type, host, port, db_name, user, password, chars host=host, port=port_int or 1433, database=db_name, - query={"charset": mssql_charset} + query=query_params ) else: diff --git a/views/__pycache__/migration_engine.cpython-313.pyc b/views/__pycache__/migration_engine.cpython-313.pyc index f63f3fb..8865a58 100644 Binary files a/views/__pycache__/migration_engine.cpython-313.pyc and b/views/__pycache__/migration_engine.cpython-313.pyc differ diff --git a/views/migration_engine.py b/views/migration_engine.py index 549b0be..fe16c77 100644 --- a/views/migration_engine.py +++ b/views/migration_engine.py @@ -54,6 +54,7 @@ def generate_select_query(config_data, source_table, db_type='MySQL'): """ Generate a SELECT query based on configuration. Applies TRIM at source for MSSQL CHAR columns to prevent padding. + For SQL Server: Also cleans non-breaking spaces and control characters at source. """ try: if not config_data or 'mappings' not in config_data: @@ -65,8 +66,23 @@ def generate_select_query(config_data, source_table, db_type='MySQL'): continue source_col = mapping['source'] - # Apply TRIM at source for MSSQL to handle CHAR padding - if db_type == 'Microsoft SQL Server' and 'TRIM' in mapping.get('transformers', []): + + # Special handling for SQL Server text columns + if db_type == 'Microsoft SQL Server': + col_expr = f'"{source_col}"' + + # Apply TRIM if specified in transformers + if 'TRIM' in mapping.get('transformers', []): + col_expr = f'TRIM({col_expr})' + + # Clean non-breaking spaces and problematic characters for VARCHAR/NVARCHAR/TEXT columns + # REPLACE(col, CHAR(160), ' ') -> replace nbsp with regular space + # REPLACE(col, CHAR(0), '') -> remove null bytes + col_expr = f'REPLACE(REPLACE({col_expr}, CHAR(160), \' \'), CHAR(0), \'\')' + + selected_cols.append(f'{col_expr} AS "{source_col}"') + elif 'TRIM' in mapping.get('transformers', []): + # Other databases: just apply TRIM if needed selected_cols.append(f'TRIM("{source_col}") AS "{source_col}"') else: selected_cols.append(f'"{source_col}"') @@ -289,32 +305,107 @@ def render_migration_engine_page(): col_src, col_tgt = st.columns(2) with col_src: - st.markdown("#### Source Database") - src_sel = st.selectbox("Source Profile", ds_options, key="src_sel") - st.session_state.migration_src_profile = src_sel - - charset_options = ["utf8mb4 (Default)", "tis620 (Thai Legacy)", "latin1 (Raw Bytes)"] - src_charset_sel = st.selectbox( - "Source Charset (ถ้าภาษาไทยเพี้ยนให้ลอง tis620)", - charset_options, - key="src_charset_sel" - ) - charset_map = { - "utf8mb4 (Default)": None, - "tis620 (Thai Legacy)": "tis620", - "latin1 (Raw Bytes)": "latin1" - } - st.session_state.src_charset = charset_map.get(src_charset_sel) - - if src_sel != "Select Profile...": - if st.button("🔍 Test Source"): - with st.spinner("Connecting..."): - row = datasources[datasources['name'] == src_sel].iloc[0] - ds = db.get_datasource_by_id(int(row['id'])) - ok, msg = connector.test_db_connection(ds['db_type'], ds['host'], ds['port'], ds['dbname'], ds['username'], ds['password']) - if ok: st.session_state.migration_src_ok = True - else: st.error(msg) - if st.session_state.migration_src_ok: st.success("✅ Source Connected") + # Check if Config uses a JSON File Source + is_json_source = False + if st.session_state.migration_config: + src_config = st.session_state.migration_config.get('source', {}) + src_db_val = src_config.get('database', '') + src_tbl_val = src_config.get('table', '') + mappings = st.session_state.migration_config.get('mappings', []) + + # Robust Detection Logic + if 'JSON' in str(src_db_val) or 'Raw Data' in str(src_tbl_val): + is_json_source = True + # Check mappings for JSON type hint + elif mappings and len(mappings) > 0 and mappings[0].get('DataType') == 'JSON': + is_json_source = True + + if is_json_source: + st.markdown("#### Source Data File") + st.info(f"📂 Config Source: {src_config.get('database', 'JSON File')}") + uploaded_source_file = st.file_uploader("Upload Data JSON", type=["json"], key="json_data_upload") + if uploaded_source_file: + try: + json_content = json.load(uploaded_source_file) + # Auto-detect if it's a list or dict wrapper + if isinstance(json_content, dict) and 'data' in json_content and isinstance(json_content['data'], list): + st.session_state.migration_json_data = json_content['data'] + elif isinstance(json_content, list): + st.session_state.migration_json_data = json_content + else: + st.session_state.migration_json_data = [json_content] + + st.success(f"✅ Loaded {len(st.session_state.migration_json_data)} records") + st.session_state.migration_src_ok = True + except Exception as e: + st.error(f"❌ Invalid JSON: {e}") + else: + st.warning("⚠️ Please upload the data file to proceed.") + st.session_state.migration_src_ok = False + else: + st.markdown("#### Source Database") + src_sel = st.selectbox("Source Profile", ds_options, key="src_sel") + st.session_state.migration_src_profile = src_sel + + # Get source DB type to show appropriate charset options + src_db_type = None + if src_sel != "Select Profile...": + row = datasources[datasources['name'] == src_sel].iloc[0] + ds_detail = db.get_datasource_by_id(int(row['id'])) + src_db_type = ds_detail['db_type'] + + # Show charset options based on DB type + if src_db_type == 'Microsoft SQL Server': + charset_options = [ + "utf8 (Default - Modern)", + "cp874 (Thai Windows Codepage - แนะนำสำหรับข้อมูลไทยเก่า)", + "latin1 (Raw Bytes)" + ] + help_text = "SQL Server: ใช้ cp874 สำหรับข้อมูลไทยแบบเก่า" + elif src_db_type == 'MySQL': + charset_options = [ + "utf8mb4 (Default)", + "tis620 (Thai Legacy)", + "latin1 (Raw Bytes)" + ] + help_text = "MySQL: ใช้ tis620 ถ้าภาษาไทยเพี้ยน" + else: + charset_options = [ + "utf8 (Default)", + "latin1 (Raw Bytes)" + ] + help_text = "เลือก charset ตามฐานข้อมูลต้นทาง" + + src_charset_sel = st.selectbox( + "Source Charset", + charset_options, + key="src_charset_sel", + help=help_text + ) + + # Map selection to actual charset value + charset_map = { + "utf8mb4 (Default)": None, + "utf8 (Default - Modern)": None, + "utf8 (Default)": None, + "tis620 (Thai Legacy)": "tis620", + "cp874 (Thai Windows Codepage - แนะนำสำหรับข้อมูลไทยเก่า)": "cp874", + "latin1 (Raw Bytes)": "latin1" + } + st.session_state.src_charset = charset_map.get(src_charset_sel) + + if src_charset_sel.startswith("cp874"): + st.info("💡 **cp874** จะแก้ปัญหา non-breaking space และตัวอักษรไทยเก่าใน SQL Server") + + if src_sel != "Select Profile...": + if st.button("🔍 Test Source"): + with st.spinner("Connecting..."): + row = datasources[datasources['name'] == src_sel].iloc[0] + ds = db.get_datasource_by_id(int(row['id'])) + ok, msg = connector.test_db_connection(ds['db_type'], ds['host'], ds['port'], ds['dbname'], ds['username'], ds['password']) + if ok: st.session_state.migration_src_ok = True + else: st.error(msg) + if st.session_state.migration_src_ok: st.success("✅ Source Connected") with col_tgt: st.markdown("#### Target Database") @@ -492,118 +583,63 @@ def add_log(msg, icon="ℹ️"): src_profile_name = st.session_state.migration_src_profile tgt_profile_name = st.session_state.migration_tgt_profile - add_log("Connecting to databases...", "🔗") - src_ds = db.get_datasource_by_name(src_profile_name) - tgt_ds = db.get_datasource_by_name(tgt_profile_name) + add_log("Connecting to target database...", "🔗") + # Source Connect (Only if not JSON) + src_engine = None + json_data_source = st.session_state.get('migration_json_data') + + if json_data_source: + add_log("Source: Internal JSON Data (Loaded Memory)", "📂") + else: + src_ds = db.get_datasource_by_name(src_profile_name) + if not src_ds: raise ValueError("Could not retrieve source datasource.") + + src_charset = st.session_state.get('src_charset', None) + if src_ds['db_type'] == 'PostgreSQL' and src_charset == 'tis620': + src_charset = 'WIN874' - if not src_ds or not tgt_ds: - raise ValueError("Could not retrieve datasource credentials.") + src_engine = connector.create_sqlalchemy_engine( + src_ds['db_type'], src_ds['host'], src_ds['port'], src_ds['dbname'], src_ds['username'], src_ds['password'], + charset=src_charset + ) + add_log(f"Source connected: {src_ds['db_type']}", "✅") + + # Target Connect + tgt_ds = db.get_datasource_by_name(tgt_profile_name) + if not tgt_ds: raise ValueError("Could not retrieve target datasource.") - src_charset = st.session_state.get('src_charset', None) - - # FIX: Handle PostgreSQL compatibility with Thai legacy encoding - if src_ds['db_type'] == 'PostgreSQL' and src_charset == 'tis620': - add_log("Auto-adjusting encoding: 'tis620' -> 'WIN874' (PostgreSQL Standard)", "🔧") - src_charset = 'WIN874' - - src_engine = connector.create_sqlalchemy_engine( - src_ds['db_type'], src_ds['host'], src_ds['port'], src_ds['dbname'], src_ds['username'], src_ds['password'], - charset=src_charset - ) tgt_engine = connector.create_sqlalchemy_engine( tgt_ds['db_type'], tgt_ds['host'], tgt_ds['port'], tgt_ds['dbname'], tgt_ds['username'], tgt_ds['password'] ) - - add_log(f"Source connected: {src_ds['db_type']} (charset: {src_charset or 'default'})", "✅") add_log(f"Target connected: {tgt_ds['db_type']}", "✅") target_table = config['target']['table'] + source_table = config['source']['table'] - # --- NEW: TRUNCATE EXECUTION --- - if st.session_state.get('truncate_target', False): - add_log(f"Cleaning target table: {target_table}...", "🧹") - try: - with tgt_engine.begin() as conn: - conn.execute(text(f"TRUNCATE TABLE {target_table}")) - add_log("Target table truncated successfully.", "✅") - except Exception as e: - add_log(f"TRUNCATE failed, trying DELETE FROM... ({str(e)})", "⚠️") - try: - with tgt_engine.begin() as conn: - conn.execute(text(f"DELETE FROM {target_table}")) - add_log("Target table cleared using DELETE.", "✅") - except Exception as e2: - add_log(f"Failed to clean table: {str(e2)}", "❌") - raise e2 - - # --- NEW: Schema Validation (Pre-flight check) --- - add_log("Validating Schema Compatibility...", "🧐") - try: - source_table = config['source']['table'] - - src_inspector = sqlalchemy.inspect(src_engine) - tgt_inspector = sqlalchemy.inspect(tgt_engine) - - try: - src_parts = source_table.split('.') - tgt_parts = target_table.split('.') - src_t = src_parts[-1] - src_s = src_parts[0] if len(src_parts) > 1 else None - tgt_t = tgt_parts[-1] - tgt_s = tgt_parts[0] if len(tgt_parts) > 1 else None - - src_col_defs = {col['name']: col['type'] for col in src_inspector.get_columns(src_t, schema=src_s)} - tgt_col_defs = {col['name']: col['type'] for col in tgt_inspector.get_columns(tgt_t, schema=tgt_s)} - except Exception as e: - src_col_defs = {col['name']: col['type'] for col in src_inspector.get_columns(source_table)} - tgt_col_defs = {col['name']: col['type'] for col in tgt_inspector.get_columns(target_table)} + # ... (Truncate logic similar to existing) ... - warnings = [] - for mapping in config.get('mappings', []): - if mapping.get('ignore', False): continue - - src_col = mapping['source'] - tgt_col = mapping['target'] - - if src_col in src_col_defs and tgt_col in tgt_col_defs: - src_type = src_col_defs[src_col] - tgt_type = tgt_col_defs[tgt_col] - - src_len = getattr(src_type, 'length', None) - tgt_len = getattr(tgt_type, 'length', None) - - if tgt_len is not None: - if src_len is None: - warnings.append(f"- **{src_col}** (Unknown/Text) ➔ **{tgt_col}** (Limit: {tgt_len})") - elif src_len > tgt_len: - warnings.append(f"- **{src_col}** (Limit: {src_len}) ➔ **{tgt_col}** (Limit: {tgt_len})") - - if warnings: - warn_msg_log = "⚠️ Potential Truncation Detected:\n" + "\n".join(warnings).replace("**", "") - # Log to memory so it appears BEFORE insert logs - add_log(warn_msg_log, "⚠️") - st.warning("⚠️ **Potential Truncation Detected!** check logs for details.") - time.sleep(1) - else: - add_log("Schema compatibility check passed.", "✅") - - except Exception as e: - add_log(f"Skipping schema check (Non-critical): {e}", "⚠️") - - # Prepare Query + # Prepare Data Iterator batch_size = st.session_state.batch_size - select_query = generate_select_query(config, source_table, src_ds['db_type']) - add_log(f"SELECT Query: {select_query}", "🔍") - - add_log(f"Starting Batch Processing (Size: {batch_size})...", "🚀") - - # Start Iteration - data_iterator = pd.read_sql( - select_query, - src_engine, - chunksize=batch_size, - coerce_float=False - ) + data_iterator = [] + + if json_data_source: + add_log(f"Preparing JSON Data ({len(json_data_source)} records)...", "📦") + # chunk dict list into dataframes + def chunk_json(data, size): + for i in range(0, len(data), size): + yield pd.DataFrame(data[i:i + size]) + data_iterator = chunk_json(json_data_source, batch_size) + else: + # SQL Source + select_query = generate_select_query(config, source_table, src_ds['db_type']) + add_log(f"SELECT Query: {select_query}", "🔍") + add_log(f"Starting Batch Processing (Size: {batch_size})...", "🚀") + data_iterator = pd.read_sql( + select_query, + src_engine, + chunksize=batch_size, + coerce_float=False + ) total_rows_processed = 0 batch_num = 0 diff --git a/views/schema_mapper.py b/views/schema_mapper.py index 484dec7..7c2035d 100644 --- a/views/schema_mapper.py +++ b/views/schema_mapper.py @@ -267,12 +267,80 @@ def render_schema_mapper_page(): except: st.error("Invalid JSON file") if config_data: - loaded_config_json = config_data - src_info = config_data.get('source', {}) - src_db_name = src_info.get('database') - src_tbl_name = src_info.get('table') + is_raw_data_list = False - if src_db_name and src_tbl_name: + # Handle case where JSON is a list (e.g. wrapped in []) + if isinstance(config_data, list): + if len(config_data) > 0 and isinstance(config_data[0], dict): + # Check if it looks like a Config List or Raw Data List + first_item = config_data[0] + if 'mappings' in first_item or 'source' in first_item: + # It's likely a Config List (Legacy or Batch Config) + config_data = first_item + st.info("ℹ️ Note: Loaded the first configuration object from the list.") + else: + # It's likely Raw Data (List of Records) + is_raw_data_list = True + st.success(f"✅ Detected Raw JSON Data: {len(config_data)} records") + else: + st.error("❌ Invalid JSON Format: Expected a configuration object or list of records.") + config_data = None + + if not is_raw_data_list and config_data and isinstance(config_data, dict): + loaded_config_json = config_data + src_info = config_data.get('source', {}) + src_db_name = src_info.get('database') + src_tbl_name = src_info.get('table') + elif is_raw_data_list: + # Fake the src_info for Raw Data + loaded_config_json = {"mappings": [], "source": {"table": "JSON_Upload"}, "target": {}} + src_info = {} + src_db_name = "JSON File" + src_tbl_name = "Raw Data" + + # Case 1: Upload File Mode - ALWAYS use JSON Schema directly (Offline Mode) + if source_mode == "Upload File": + mappings = [] + + # A. Raw Data Mode (Inferred Schema) + if is_raw_data_list and isinstance(config_data, list) and len(config_data) > 0: + # Infer columns from first N records (to catch optional fields) + sample_records = config_data[:5] + all_keys = set() + for rec in sample_records: + if isinstance(rec, dict): + all_keys.update(rec.keys()) + + sorted_keys = sorted(list(all_keys)) + mappings = [{"source": k, "target": "", "ignore": False} for k in sorted_keys] + + # Update fake loaded_config for UI consistency + loaded_config_json['mappings'] = mappings + loaded_config_json['source'] = {"database": "JSON File", "table": "Raw Data"} + src_tbl_name = "Raw Data (JSON)" + + # B. Config File Mode (Existing Logic) + elif isinstance(config_data, dict): + mappings = config_data.get('mappings', []) + + if mappings: + df_raw = pd.DataFrame({ + 'Table': [src_tbl_name]*len(mappings), + 'Column': [m['source'] for m in mappings], + 'DataType': ['JSON'] * len(mappings), + 'Sample_Values': [''] * len(mappings) + }) + st.success(f"✅ Loaded Schema from JSON: {src_tbl_name} ({len(mappings)} columns)") + selected_table = src_tbl_name + source_db_input = "JSON File Source" + source_table_name = src_tbl_name + else: + st.warning("⚠️ JSON file has no mappings to display.") + with st.expander("🔍 Debug: View Loaded JSON Content", expanded=True): + st.json(config_data) + + # Case 2: Saved Config Mode - Try to connect to real DB first + elif src_db_name and src_tbl_name: src_ds = db.get_datasource_by_name(src_db_name) schema_fetched = False if src_ds: @@ -306,8 +374,8 @@ def render_schema_mapper_page(): source_db_input = src_db_name source_table_name = src_tbl_name - # --- CONFIG DETAILS SECTION (for Saved Config) --- - if source_mode == "Saved Config" and loaded_config_json: + # --- CONFIG DETAILS SECTION (for Saved Config OR Upload File) --- + if source_mode in ["Saved Config", "Upload File"] and loaded_config_json: st.markdown("---") st.markdown("### ⚙️ Config Details") @@ -338,9 +406,9 @@ def render_schema_mapper_page(): # Target Database selectbox (from datasources) tgt_db_idx = 0 - if tgt_db in datasource_names: + if tgt_db and tgt_db != "" and tgt_db in datasource_names: tgt_db_idx = datasource_names.index(tgt_db) - + selected_tgt_db = st.selectbox( "Target Database", datasource_names, @@ -351,6 +419,8 @@ def render_schema_mapper_page(): # Update session state and config when database changes st.session_state.mapper_tgt_db_edit = selected_tgt_db + if 'target' not in loaded_config_json: + loaded_config_json['target'] = {} loaded_config_json['target']['database'] = selected_tgt_db # Fetch tables from selected datasource @@ -392,6 +462,18 @@ def render_schema_mapper_page(): # Update session state and config when table changes st.session_state.mapper_tgt_tbl_edit = selected_tgt_tbl loaded_config_json['target']['table'] = selected_tgt_tbl + + # Fetch real target columns immediately for Dropdown & Validation + if selected_tgt_tbl and selected_tgt_db and selected_tgt_db != "-- Select Datasource --": + tgt_ds_cols = db.get_datasource_by_name(selected_tgt_db) + if tgt_ds_cols: + ok, cols = get_columns_from_table( + tgt_ds_cols['db_type'], tgt_ds_cols['host'], tgt_ds_cols['port'], + tgt_ds_cols['dbname'], tgt_ds_cols['username'], tgt_ds_cols['password'], selected_tgt_tbl + ) + if ok: + real_target_columns = [c['name'] for c in cols] + st.session_state.mapper_real_tgt_cols = real_target_columns else: # Show disabled field if no tables available st.text_input( @@ -449,7 +531,7 @@ def render_schema_mapper_page(): default_tgt_tbl = st.session_state.get("mapper_tgt_tbl") # --- Target Configuration --- - if not st.session_state.mapper_focus_mode: + if not st.session_state.mapper_focus_mode and source_mode not in ["Saved Config", "Upload File"]: st.markdown("---") with st.expander("📤 Target Table Configuration", expanded=True): c_tgt_1, c_tgt_2 = st.columns(2) @@ -491,7 +573,7 @@ def render_schema_mapper_page(): target_db_input = st.session_state.get("mapper_tgt_db") target_table_input = st.session_state.get("mapper_tgt_tbl") real_target_columns = st.session_state.get("mapper_real_tgt_cols", []) - st.info(f"🔎 Focus Mode: `{active_table}` -> `{target_table_input}`") + # Initialize Data init_editor_state(active_df_raw, active_table, loaded_config)