-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtree_preprocessing.py
More file actions
396 lines (304 loc) · 11.8 KB
/
Copy pathtree_preprocessing.py
File metadata and controls
396 lines (304 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
"""
Tree Preprocessing Module for TreeHarmonizer
This module handles the normalization of arbitrary newick trees to the format
required by TreeHarmonizer. The expected format includes:
1. An "extra root" node - a single-child root wrapping the entire tree
2. "Private nodes" - intermediate nodes between each leaf and its parent
Trees not in this format are automatically transformed.
"""
from ete3 import Tree
# Constants for naming synthetic nodes
ROOT_NAME = "__TH_ROOT__"
PRIVATE_SUFFIX = "_private"
class TreeMetadata:
"""
Metadata about tree structure and naming mappings.
This class tracks the relationship between original and transformed
node names, enabling downstream code to work with arbitrary input trees.
"""
def __init__(self, original_newick, was_transformed, leaf_names=None,
private_node_to_leaf=None, leaf_to_private_node=None,
root_name="", had_extra_root=False, had_private_nodes=False):
"""
Initialize TreeMetadata.
Args:
original_newick: The original newick string before transformation
was_transformed: Whether the tree was transformed
leaf_names: List of leaf/sample node names
private_node_to_leaf: Dict mapping private node names to leaf names
leaf_to_private_node: Dict mapping leaf names to private node names
root_name: Name of the root node
had_extra_root: Whether the original tree had an extra root
had_private_nodes: Whether the original tree had private nodes
"""
self.original_newick = original_newick
self.was_transformed = was_transformed
self.leaf_names = leaf_names if leaf_names is not None else []
self.private_node_to_leaf = private_node_to_leaf if private_node_to_leaf is not None else {}
self.leaf_to_private_node = leaf_to_private_node if leaf_to_private_node is not None else {}
self.root_name = root_name
self.had_extra_root = had_extra_root
self.had_private_nodes = had_private_nodes
def is_leaf(self, node_name):
"""Check if node name is a leaf/sample node."""
return node_name in self.leaf_names
def is_private_node(self, node_name):
"""Check if node is a private node (directly above a leaf)."""
return node_name in self.private_node_to_leaf
def is_internal_node(self, node_name):
"""
Check if node is a true internal node.
Internal nodes are not: root, private nodes, or leaves.
"""
return (
not self.is_leaf(node_name) and
not self.is_private_node(node_name) and
node_name != self.root_name
)
def get_leaf_for_private(self, private_name):
"""Get the leaf name for a given private node."""
return self.private_node_to_leaf.get(private_name)
def get_private_for_leaf(self, leaf_name):
"""Get the private node name for a given leaf."""
return self.leaf_to_private_node.get(leaf_name)
def has_extra_root(tree: Tree) -> bool:
"""
Detect if tree has an "extra root" node.
An extra root exists if:
1. Root has exactly one child
2. That child's descendant leaves equal the root's leaves
Args:
tree: An ete3 Tree object
Returns:
True if extra root exists, False if it needs to be added
"""
root = tree.get_tree_root()
children = root.get_children()
# Must have exactly one child
if len(children) != 1:
return False
# Child's leaves must equal root's leaves
root_leaves = set(root.get_leaf_names())
child_leaves = set(children[0].get_leaf_names())
return root_leaves == child_leaves
def has_private_nodes(tree: Tree) -> bool:
"""
Detect if tree has "private nodes" before each leaf.
Private nodes exist if each leaf's parent:
1. Has exactly one child (the leaf)
2. Is not the root
Args:
tree: An ete3 Tree object
Returns:
True if all leaves have private nodes, False otherwise
"""
root = tree.get_tree_root()
leaves = tree.get_leaves()
if not leaves:
return False
for leaf in leaves:
parent = leaf.up
# Leaf's parent must exist and not be root
if parent is None or parent == root:
return False
# Parent must have exactly one child (the leaf)
if len(parent.get_children()) != 1:
return False
return True
def detect_tree_structure(tree: Tree) -> dict:
"""
Analyze tree structure and return a report.
Args:
tree: An ete3 Tree object
Returns:
Dictionary with structure analysis results
"""
extra_root = has_extra_root(tree)
private_nodes = has_private_nodes(tree)
return {
'has_extra_root': extra_root,
'has_private_nodes': private_nodes,
'leaf_names': tree.get_leaf_names(),
'needs_transformation': not (extra_root and private_nodes)
}
def add_extra_root(tree: Tree, root_name: str = ROOT_NAME) -> Tree:
"""
Wrap tree with an extra single-child root node.
Before: ((A,B)C)D; (D is current root)
After: (((A,B)C)D)__TH_ROOT__;
Args:
tree: An ete3 Tree object (modified in place)
root_name: Name for the new root node
Returns:
The modified tree
"""
# Create new root
new_root = Tree()
new_root.name = root_name
# Get current root and make it a child of new root
current_root = tree.get_tree_root()
# Detach current root and add as child of new root
new_root.add_child(current_root)
return new_root
def add_private_nodes(tree: Tree, suffix: str = PRIVATE_SUFFIX) -> tuple:
"""
Insert private nodes between each leaf and its parent.
Before: (A,B)C;
After: ((A)A_private,(B)B_private)C;
Args:
tree: An ete3 Tree object (modified in place)
suffix: Suffix to add to leaf name for private node
Returns:
Tuple of (modified tree, mapping dict {private_name: leaf_name})
"""
mapping = {}
leaves = tree.get_leaves()
root = tree.get_tree_root()
for leaf in leaves:
parent = leaf.up
if parent is None:
continue
# Create private node name
private_name = f"{leaf.name}{suffix}"
mapping[private_name] = leaf.name
# Check if parent already has only this leaf as child AND is not the root
# (meaning private node structure already exists for this leaf)
# If parent IS the root, we still need to add a private node
if len(parent.get_children()) == 1 and parent != root:
continue
# Create new private node
private_node = Tree()
private_node.name = private_name
# Detach leaf from parent
leaf.detach()
# Add leaf as child of private node
private_node.add_child(leaf)
# Add private node to original parent
parent.add_child(private_node)
return tree, mapping
def _build_metadata_from_tree(tree: Tree, original_newick: str,
was_transformed: bool,
had_extra_root: bool,
had_private_nodes: bool) -> TreeMetadata:
"""
Build TreeMetadata from the current tree state.
Args:
tree: The (possibly transformed) tree
original_newick: The original input newick string
was_transformed: Whether any transformation was applied
had_extra_root: Whether original tree had extra root
had_private_nodes: Whether original tree had private nodes
Returns:
TreeMetadata instance
"""
root = tree.get_tree_root()
leaves = tree.get_leaves()
# Build leaf list
leaf_names = [leaf.name for leaf in leaves]
# Build private node mappings
private_node_to_leaf = {}
leaf_to_private_node = {}
for leaf in leaves:
parent = leaf.up
if parent is not None and parent != root:
# Check if parent is a private node (has only this leaf as child)
if len(parent.get_children()) == 1:
private_node_to_leaf[parent.name] = leaf.name
leaf_to_private_node[leaf.name] = parent.name
return TreeMetadata(
original_newick=original_newick,
was_transformed=was_transformed,
leaf_names=leaf_names,
private_node_to_leaf=private_node_to_leaf,
leaf_to_private_node=leaf_to_private_node,
root_name=root.name,
had_extra_root=had_extra_root,
had_private_nodes=had_private_nodes
)
def normalize_tree(newick_string: str, verbose: bool = False) -> tuple:
"""
Main entry point: detect and transform tree as needed.
This function analyzes the input tree and adds any missing structural
elements (extra root, private nodes) required by TreeHarmonizer.
Args:
newick_string: Newick format tree string
verbose: If True, print information about transformations
Returns:
Tuple of (normalized newick string, TreeMetadata object)
"""
# Parse tree
tree = Tree(newick_string, format=1)
# Detect current structure
structure = detect_tree_structure(tree)
had_extra_root = structure['has_extra_root']
had_private_nodes = structure['has_private_nodes']
was_transformed = False
if verbose:
print(f"TreeHarmonizer: Analyzing input tree...")
print(f" - Detected {len(structure['leaf_names'])} leaf nodes (samples)")
# Add private nodes first (before adding root, so tree structure is correct)
if not had_private_nodes:
tree, _ = add_private_nodes(tree)
was_transformed = True
if verbose:
print(f" - Added private nodes for variant placement")
# Add extra root if needed
if not had_extra_root:
tree = add_extra_root(tree)
was_transformed = True
if verbose:
print(f" - Added root node: {ROOT_NAME}")
if verbose:
if was_transformed:
print(f" - Tree normalization complete.")
else:
print(f" - Tree already in required format, no transformation needed.")
# Build metadata
metadata = _build_metadata_from_tree(
tree,
newick_string,
was_transformed,
had_extra_root,
had_private_nodes
)
# Return normalized newick string and metadata
normalized_newick = tree.write(format=1)
return normalized_newick, metadata
def validate_tree_for_placement(newick_string: str) -> None:
"""
Validate that a newick string is suitable for TreeHarmonizer.
Raises ValueError with descriptive message if:
- Tree cannot be parsed
- Tree has no leaves
- Leaf names contain reserved prefixes/suffixes
Args:
newick_string: Newick format tree string
Raises:
ValueError: If tree is invalid for placement
"""
try:
tree = Tree(newick_string, format=1)
except Exception as e:
raise ValueError(f"Failed to parse newick string: {e}")
leaves = tree.get_leaves()
if not leaves:
raise ValueError("Tree has no leaf nodes (samples)")
# Check for reserved naming patterns
for leaf in leaves:
if leaf.name.endswith(PRIVATE_SUFFIX):
raise ValueError(
f"Leaf name '{leaf.name}' ends with reserved suffix '{PRIVATE_SUFFIX}'. "
"Please rename your samples to avoid this suffix."
)
if leaf.name == ROOT_NAME:
raise ValueError(
f"Leaf name '{leaf.name}' is a reserved name. "
"Please rename this sample."
)
# Check all nodes have names
for node in tree.traverse():
if not node.name or node.name.strip() == "":
raise ValueError(
"Tree contains unnamed nodes. All nodes (root, internal, and leaves) "
"must have names in the newick string."
)