-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecode_N.py
More file actions
173 lines (151 loc) · 7.9 KB
/
decode_N.py
File metadata and controls
173 lines (151 loc) · 7.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import numpy as np
import math
import re
import itertools
import pandas as pd
import argparse
import os
from Fast_functions import *
parser = argparse.ArgumentParser(description='Parse some arguments')
parser.add_argument('--differentiate', type=int, default=-1, help='The number of maximum number of positives in the pooling strategy.')
parser.add_argument('--path_to_WA', type=str, help="A string argument containing the path to the pooling strategy file.")
parser.add_argument('--readout', type=str, help="A string either containing the readout or containing a path a csv of the readout (readout in the form 0,1,0,1,0,0 or 3,6,14)")
parser.add_argument('--extensive_search', type=str, default='False', help="weather to search all possibilities (True) or use a faster exclusion based implementation (False, default)")
parser.add_argument('--min_signal', type=float, default=None, help='Minimum signal threshold for constrained continuous decoding.')
parser.add_argument('--diluting', type=str, default='True', help='Whether to use dilution scaling in continuous decoding (True/False).')
args = parser.parse_args()
args_dict = vars(args)
path_to_wa = args_dict['path_to_WA']
diff = args_dict['differentiate']
readout_in = args_dict['readout']
min_signal = args_dict['min_signal']
diluting_in = str(args_dict['diluting']).strip().lower()
diluting = diluting_in in ['1', 'true', 't', 'yes', 'y']
if not path_to_wa or not readout_in:
raise SystemExit('Please provide both --path_to_WA and --readout.')
WA_df = pd.read_csv(path_to_wa, index_col=0)
WA = WA_df.values
n_pools = WA.shape[1]
n_compounds = WA.shape[0]
if diff == -1:
diff = n_compounds
if readout_in.lower().endswith('csv'):
# Try reading with index_col=0 first (if CSV has row IDs)
try:
readout_df_with_index = pd.read_csv(readout_in, index_col=0, dtype=str)
readout_ids = readout_df_with_index.index.tolist() if hasattr(readout_df_with_index, 'index') else None
# Get data without index for processing
readout_df = readout_df_with_index.reset_index(drop=True)
readout_df = readout_df.astype(str)
except (pd.errors.ParserError, pd.errors.EmptyDataError):
# If that fails, try reading without index
try:
readout_df = pd.read_csv(readout_in, header=None, dtype=str)
# First column should be the readout IDs
if readout_df.shape[1] > 1:
readout_ids = readout_df.iloc[:, 0].tolist()
readout_df = readout_df.iloc[:, 1:].reset_index(drop=True)
else:
readout_ids = None
except (pd.errors.ParserError, pd.errors.EmptyDataError):
# Last resort: manually parse for inconsistent column counts or special delimiters
readout_data = []
readout_ids = []
with open(readout_in, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
# First, check if line uses pipe delimiter (ID|data)
if '|' in line:
parts = line.split('|')
if len(parts) >= 2:
readout_ids.append(parts[0])
# Treat rest as comma-separated data
data_str = '|'.join(parts[1:])
remaining = [x.strip() for x in data_str.split(',') if x.strip()]
readout_data.append(remaining)
continue
# Otherwise parse with quote handling for comma-separated
parts = []
current = ""
in_quotes = False
for char in line:
if char == '"':
in_quotes = not in_quotes
elif char == ',' and not in_quotes:
parts.append(current.strip(' "'))
current = ""
else:
current += char
if current:
parts.append(current.strip(' "'))
if len(parts) > 0:
readout_ids.append(parts[0])
# For remaining parts, collect all commas into a single comma-separated string
remaining = parts[1:]
# If we have multiple remaining parts, they were split on commas - rejoin them
if len(remaining) > 1:
# Multiple parts mean they were comma-separated (e.g., 1,2,3)
remaining = [','.join(remaining)]
# Now split the comma-separated string into list of values
if remaining and remaining[0]:
values = remaining[0].split(',')
remaining = [x.strip() for x in values]
readout_data.append(remaining)
# Create DataFrame from parsed data
# Pad rows to same length if needed
max_cols = max(len(row) for row in readout_data) if readout_data else 0
for row in readout_data:
while len(row) < max_cols:
row.append('')
readout_df = pd.DataFrame(readout_data, dtype=str)
if readout_df is None or readout_df.empty:
raise SystemExit('Error: Readout CSV appears to be empty.')
df_out = decode_multi_readout_df(readout_df, WA, diff, min_signal=min_signal, diluting=diluting, readout_ids=readout_ids)
if isinstance(df_out, pd.DataFrame) and not df_out.empty:
df_with_index = df_out.copy()
if "readout_id" in df_with_index.columns:
df_with_index = df_with_index.set_index("readout_id")
else:
df_with_index.index = [f"Readout {i + 1}" for i in range(len(df_with_index))]
if "decoder_output" in df_with_index.columns:
def format_output(val):
if isinstance(val, np.ndarray):
# Format numpy array to single line with space-separated values
return " ".join([str(x) for x in val])
if isinstance(val, list):
return " ".join([str(x) for x in val])
if isinstance(val, str):
# Remove newlines and brackets
val = val.replace('\n', ' ').replace(' ', ' ').strip()
if val.startswith("[") and val.endswith("]"):
val = val[1:-1].strip()
return val
return str(val)
df_with_index["decoder_output"] = df_with_index["decoder_output"].apply(format_output)
df_with_index.columns = [col.replace("_", " ").title() for col in df_with_index.columns]
else:
df_with_index = df_out
decoded_csv = "decoded_readouts.csv"
if isinstance(df_out, pd.DataFrame) and not df_out.empty:
df_with_index.to_csv(decoded_csv, index=True)
else:
df_with_index.to_csv(decoded_csv, index=False)
else:
file_name = os.path.basename(path_to_wa)
single_result = decode_single_readout_payload(readout_in, n_pools, WA, diff, min_signal=min_signal, diluting=diluting)
if single_result.get("decoded_type") == "error":
text_msg = str(single_result.get("decoder_output"))
else:
text_msg = (
f"Processing file {file_name} with max. {diff} positive samples.\n"
f"Readout: {readout_in}\n"
f"Decoded type: {single_result.get('decoded_type')}\n"
f"Decoder output: {single_result.get('decoder_output')}"
)
decoded_txt = "decoder_output.txt"
txt = f"Uploaded file: {file_name}\nReadout: {readout_in}\n\n{text_msg}"
with open(decoded_txt, 'w+') as f:
f.write(txt)
print(text_msg)