-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
265 lines (207 loc) · 9.47 KB
/
utils.py
File metadata and controls
265 lines (207 loc) · 9.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
###############################################################################################
'''
Collection of helper functions used throughout the colocalization analysis pipeline.
- Load configuration, directory structure and metadata (YAML/HDF5)
- Load, save and filter localization data
- Load and apply binary masks to localizations
- Derive unspecific localizations for dark time calibration
- Define and apply spatial patches to cells
- Simulate localization datasets and save them for further analysis
'''
###############################################################################################
import os
import h5py
import numpy as np
import pandas as pd
import yaml
import cv2
# from scipy.optimize import curve_fit
from typing import Dict
###############################################################################################
def load_config(config_path: str) -> dict:
"""Load configuration from YAML file"""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def get_directory_structure(input_dir: str) -> Dict:
structure = {}
protein_names = []
condition_names = []
for condition in os.listdir(input_dir):
condition_path = os.path.join(input_dir, condition)
if os.path.isdir(condition_path):
condition_names.append(condition)
structure[condition] = {}
for protein in os.listdir(condition_path):
protein_names.append(protein)
protein_path = os.path.join(condition_path, protein)
if os.path.isdir(protein_path):
structure[condition][protein] = []
for file in os.listdir(protein_path):
if file.endswith('.hdf5'):
structure[condition][protein].append(
os.path.join(protein_path, file)
)
protein_names = list(set(protein_names))
return structure, protein_names, condition_names
def load_hdf5(file_path):
with h5py.File(file_path, 'r') as hdf5_file:
locs_data = hdf5_file['locs'][:]
info = load_info(file_path)
return locs_data, info
def save_hdf5(file_path, locs, info):
with h5py.File(file_path, "w") as locs_file:
locs_file.create_dataset("locs", data=locs)
save_info(file_path, info)
def load_info(file_path):
path_base = file_path.rsplit(".", 1)[0]
filename = path_base + ".yaml"
try:
with open(filename, "r") as info_file:
info = list(yaml.load_all(info_file, Loader=yaml.UnsafeLoader))
except FileNotFoundError as e:
print(f"Could not find metadata file: {filename}")
raise FileNotFoundError(e)
return info
def save_info(path, info):
yaml_path = path.replace('.hdf5', '.yaml')
with open(yaml_path, "w") as file:
yaml.dump_all(info, file, default_flow_style=False)
def load_mask(mask_path):
if not os.path.exists(mask_path):
print(f"Warning: Mask file not found: {mask_path}")
return None
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
if mask is None:
print(f"Warning: Could not load mask file: {mask_path}")
return None
# else:
# print(f"Mask successfully loaded: {mask_path}")
return mask
def load_and_filter_locs(file_path, filter_params, filter_enabled):
locs_data, info = load_hdf5(file_path)
if filter_enabled:
locs_data = filter_locs(locs_data, filter_params)
return locs_data, info
def filter_locs(locs_data, filter_params):
df = pd.DataFrame(locs_data)
df = df.dropna()
for key, value in filter_params.items():
if key == 'enabled':
continue
if value is not None and key in df.columns:
df = df[(df[key] >= value[0]) & (df[key] <= value[1])]
return np.rec.array(df.to_records(index=False))
def mask_locs(locs_data, mask):
if mask is None:
# print("Warning: No mask provided, returning original data")
return locs_data, locs_data
x = locs_data['x']
y = locs_data['y']
# Ensure coordinates are within mask bounds
x_idx = np.clip(x.astype(int), 0, mask.shape[1] - 1)
y_idx = np.clip(y.astype(int), 0, mask.shape[0] - 1)
valid_points = mask[y_idx, x_idx] == 255
outside_points = mask[y_idx, x_idx] == 0
filtered_locs = locs_data[valid_points]
unspecific_locs = locs_data[outside_points]
# Unspecific locs for dark time calibration
unspecific_df = pd.DataFrame(unspecific_locs)
unspecific_df = unspecific_df[(unspecific_df['n'] > 10) & (unspecific_df['n'] < 50)]
# Convert back to np.rec.array
unspecific_locs = np.rec.array(unspecific_df.to_records(index=False)) if len(unspecific_df) > 0 else np.array([])
return filtered_locs, unspecific_locs
def save_filtered_data(file_path, locs_data, unspecific_locs, info, out_path):
basename = os.path.basename(file_path).replace(".hdf5", '')
save_hdf5((os.path.join(out_path, f"{basename}_locs.hdf5")), locs_data, info)
save_hdf5((os.path.join(out_path, f"{basename}_unsp.hdf5")), unspecific_locs, info)
def save_colocalizing_protein_data(locs_data, coloc_indices, protein, other_protein,
file_path, condition_out_path, patch_id):
if len(coloc_indices) > 0:
coloc_data = locs_data[coloc_indices]
original_filename = os.path.basename(file_path)
base_name = original_filename.replace('.hdf5', '')
# Templates for saving results:
new_filename = f"{base_name}_patch{patch_id}_coloc_{other_protein}.hdf5"
# new_filename = f"{base_name}_patch{patch_id}_coloc_{other_protein}.hdf5" if patch_id is not None else f"{base_name}_coloc_{other_protein}.hdf5"
new_file_path = os.path.join(condition_out_path, new_filename)
info = load_info(file_path)
save_hdf5(new_file_path, coloc_data, info)
def get_patches(mask, pxl_size, patch_size):
"""
Divide mask into non-overlapping patches of patch_size x patch_size
Only keep patches fully inside the mask
Returns: list of (patch_id, (x0, y0, x1, y1)) for each valid patch
"""
patch_size_px = int(patch_size / pxl_size)
# Add this check to prevent zero step size
if patch_size_px <= 0:
print(f"Warning: patch_size ({patch_size} nm) is too small compared to pixel_size ({pxl_size} nm). Skipping patching.")
return []
h, w = mask.shape
patches = []
patch_id = 0
for y0 in range(0, h, patch_size_px):
for x0 in range(0, w, patch_size_px):
x1 = x0 + patch_size_px
y1 = y0 + patch_size_px
if x1 > w or y1 > h:
continue
patch_mask = mask[y0:y1, x0:x1]
# if np.all(patch_mask == 255): # fully inside mask
if np.any(patch_mask == 255): # some can be not fully inside
patches.append((patch_id, (x0, y0, x1, y1)))
patch_id += 1
return patches
# def get_patches(mask, pxl_size, patch_size):
# """
# Divide mask into non-overlapping patches
# Keep patches that have any overlap with the mask (they can be not fully inside the mask)
# Returns: list of (patch_id, (x0, y0, x1, y1)) for each valid patch.
# """
# patch_size_px = int(patch_size / pxl_size)
# if patch_size_px <= 0:
# print(f"Warning: patch_size ({patch_size} nm) is too small compared to pixel_size ({pxl_size} nm). Skipping patching.")
# return []
# h, w = mask.shape
# patches = []
# patch_id = 0
# for y0 in range(0, h, patch_size_px):
# for x0 in range(0, w, patch_size_px):
# x1 = x0 + patch_size_px
# y1 = y0 + patch_size_px
# if x1 > w or y1 > h:
# continue
# patch_mask = mask[y0:y1, x0:x1]
# # keep patch if it has ANY overlap with mask (not fully inside)
# if np.any(patch_mask == 255): # Changed from np.all to np.any (from previous version)
# patches.append((patch_id, (x0, y0, x1, y1)))
# patch_id += 1
# return patches
def assign_locs_to_patches(locs_data, patches):
# If loc is not assigned -> patch -1
patch_ids = np.full(len(locs_data), -1, dtype=int)
for patch_id, (x0, y0, x1, y1) in patches:
in_patch = (
(locs_data['x'] >= x0) & (locs_data['x'] < x1) &
(locs_data['y'] >= y0) & (locs_data['y'] < y1)
)
patch_ids[in_patch] = patch_id
return patch_ids
def simulate_locs_data(locs_data, n_simulations=5, random_seed=None):
"""
Create simulated data by randomizing localization coordinates and properties
"""
if random_seed is not None:
np.random.seed(random_seed)
simulated_data = locs_data.copy()
# Randomize coordinates within the entire dataset
x_min, x_max = locs_data['x'].min(), locs_data['x'].max()
y_min, y_max = locs_data['y'].min(), locs_data['y'].max()
simulated_data['x'] = np.random.uniform(x_min, x_max, len(locs_data))
simulated_data['y'] = np.random.uniform(y_min, y_max, len(locs_data))
# Randomize other properties
# for col in ['n_locs', 'area', 'convexhull', 'dark_mean', 'Td']:
for col in ['n_locs', 'area', 'convexhull']:
if col in simulated_data.dtype.names:
simulated_data[col] = np.random.permutation(locs_data[col])
return simulated_data