diff --git a/import/migrate/neo4j/complete_migration/complete_migration.py b/import/migrate/neo4j/complete_migration/complete_migration.py index 150b8c4..6386930 100644 --- a/import/migrate/neo4j/complete_migration/complete_migration.py +++ b/import/migrate/neo4j/complete_migration/complete_migration.py @@ -28,7 +28,20 @@ def ensure_neo4j_has_data(): session.run("MATCH (n) DETACH DELETE n") print("Creating complete dataset in Neo4j...") - + with driver.session() as session: + try: + session.run("CREATE CONSTRAINT my_constraint FOR (n:Label1) REQUIRE n.id IS UNIQUE") + except Exception as e: + print(f"Constraint creation failed: {e}") + try: + session.run("CREATE CONSTRAINT my_constraint FOR (n:Label1) REQUIRE n.name IS NOT NULL") + except Exception as e: + print(f"Constraint creation failed: {e}") + try: + session.run("CREATE INDEX node_range_index_name FOR (n:Label1) ON (n.created_at)") + except Exception as e: + print(f"Index creation failed: {e}") + # Insert all nodes for all labels with driver.session() as session: for label in LABELS: @@ -132,7 +145,7 @@ def execute_query(query): # Get comprehensive schema information using apoc.meta.schema() schema_result = list(memgraph.execute_and_fetch( """ - call migrate_neo4j_driver2.neo4j("CALL apoc.meta.schema() YIELD value RETURN value", {host: "neo4j", port: 7687}) YIELD row RETURN row.value as schema + CALL migrate_neo4j_driver2.neo4j("CALL apoc.meta.schema() YIELD value RETURN value", {host: "neo4j", port: 7687}) YIELD row RETURN row.value AS schema """ )) @@ -152,8 +165,8 @@ def execute_query(query): # Create indexes for all discovered labels for label in discovered_labels: + # Later we will inspect the schema for constraints and indices memgraph.execute(f"CREATE INDEX ON :{label}") - memgraph.execute(f"CREATE INDEX ON :{label}(id)") print("[Worker 1] Starting migration of nodes...") @@ -163,7 +176,7 @@ def execute_query(query): execute_query( f""" call migrate_neo4j_driver2.neo4j( - "MATCH (n:{label}) RETURN elementId(n) AS elementId, labels(n) as labels, properties(n) AS props", + "MATCH (n:{label}) RETURN elementId(n) AS elementId, labels(n) AS labels, properties(n) AS props", {{host: "neo4j", port: 7687}} ) YIELD row MERGE (n:{label}:__MigrationNode__ {{__elementId__: row.elementId}}) @@ -180,7 +193,7 @@ def execute_query(query): print(f"[Worker 1] Migrating {rel_type} relationships...") execute_query( f""" - call migrate_neo4j_driver2.neo4j( + CALL migrate_neo4j_driver2.neo4j( "MATCH (a)-[r:{rel_type}]->(b) RETURN elementId(a) AS from_elementId, elementId(b) AS to_elementId, properties(r) AS rel_props", {{host: "neo4j", port: 7687}} ) YIELD row @@ -192,7 +205,51 @@ def execute_query(query): ) print(f"[Worker 1] Completed migration of {rel_type} relationships") - print("[Worker 1] Migration complete.") + print("[Worker 1] Completed migration of relationships.") + + print("[Worker 1] Starting migration of indices...") + indexes_result = list(memgraph.execute_and_fetch( + """ + CALL migrate_neo4j_driver2.neo4j("SHOW INDEXES", {host: "neo4j", port: 7687}) YIELD row RETURN row + """ + )) + range_indexes_result = [x["row"] for x in indexes_result if x["row"]["type"] in ["RANGE", "TEXT"] and len(x["row"]["labelsOrTypes"]) == 1] + for index in range_indexes_result: + props = ",".join(index["properties"]) + label_or_type = index["labelsOrTypes"][0] + if index["entityType"] == "NODE": + memgraph.execute(f"CREATE INDEX ON :{label_or_type}({props})") + elif index["entityType"] == "RELATIONSHIP" and len(index["properties"]) == 1: + memgraph.execute(f"CREATE EDGE INDEX ON :{label_or_type}({props})") + + fulltext_indexes_result = [x["row"] for x in indexes_result if x["row"]["type"] == "FULLTEXT" and len(x["row"]["labelsOrTypes"]) == 1] + for index in fulltext_indexes_result: + text_index_name = index["name"] + props = ",".join(index["properties"]) + label_or_type = index["labelsOrTypes"][0] + if index["entityType"] == "NODE": + memgraph.execute(f"CREATE TEXT INDEX {text_index_name} ON :{label_or_type}({props})") + + print("[Worker 1] Completed migration of indices.") + + print("[Worker 1] Starting migration of constraints...") + constraints_result = list(memgraph.execute_and_fetch( + """ + CALL migrate_neo4j_driver2.neo4j("SHOW CONSTRAINTS", {host: "neo4j", port: 7687}) YIELD row RETURN row + """ + )) + constraints_result = [x["row"] for x in constraints_result if x["row"]["entityType"] == "NODE" and x["row"]["type"] in ["UNIQUENESS", "NODE_PROPERTY_EXISTENCE", "NODE_PROPERTY_TYPE", "NODE_KEY"] and len(x["row"]["labelsOrTypes"]) == 1] + for constraint in constraints_result: + label = constraint["labelsOrTypes"][0] + props = [f"n.{x}" for x in constraint["properties"]] + joined_props = ", ".join(props) + if constraint["type"] == "UNIQUENESS": + memgraph.execute(f"CREATE CONSTRAINT ON (n:{label}) ASSERT {joined_props} IS UNIQUE") + elif constraint["type"] == "NODE_PROPERTY_EXISTENCE": + memgraph.execute(f"CREATE CONSTRAINT ON (n:{label}) ASSERT EXISTS ({joined_props})") + + + print("[Worker 1] Completed migration of constraints.") # Verify migration results print("[Worker 1] Verifying migration results...") @@ -209,6 +266,9 @@ def execute_query(query): print("[Worker 1] Creating snapshot...") memgraph.execute("CREATE SNAPSHOT") + print("[Worker 1] Created snapshot.") + print("[Worker 1] Migration complete.") + except Exception as e: print(f"[Worker 1] Error during migration: {e}")