diff --git a/field_synthesis/field_synthesis.py b/field_synthesis/field_synthesis.py index b5bd2e4..06c887a 100644 --- a/field_synthesis/field_synthesis.py +++ b/field_synthesis/field_synthesis.py @@ -11,8 +11,9 @@ class FieldSynthesis(): """ Třída pro syntézu prostorových polí pomocí generování kotevních bodů a jejich následného míchání. - """ - """ + + Assumption: The set of points where the input and output fields live is the same. + Inicializuje parametry syntézy pole. Args: @@ -21,13 +22,31 @@ class FieldSynthesis(): num_source (int): Počet dostupných zdrojů (typů polí). dimension (int): Dimenze prostoru (2D, 3D). seed (int): Seed pro generátor náhodných čísel. + + TODO: add parameter point_coords shape=(K, dimension) + """ area_size: int + # TODO: compute from point_coords as volume of the axis aligned bounding box. + # must be float. + # next, minimum distance is vol ** (1/dim) / safety_factor , e.g. saftey_factor = 2.0 + # should be in interval [1, 2]. + count_points: int + # Suggested number of anchor points, could be refined due to minimal distance. + num_source: int + # Number of field samples we will mix; + # TODO: remove this, we will know that only + # after passing the field samples to the mixing function. + dimension: int = 2 + # TODO: remove, diven by the shape of point_coords + free_space_ratio: float = 0.4 + # TODO: Remove, given indirectly through `count_points` + seed: int = 42 # self.min_distance = self.calc_distance() @@ -108,6 +127,9 @@ def spatial_points(self, target_points, k_neighbors=5) -> list: Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D. Args: target_points (np.ndarray): Pole cílových bodů, pro které chceme naj + + TODO: could be refactored into cached_property since target_points == point_coords are given + in constructor. """ tree = cKDTree(self.anchor_points) @@ -135,19 +157,25 @@ def mix_fields(self, target_points) -> np.ndarray: Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného Args: target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty. + TODO: pass in field samples, field_samples shape = (N, K) + N .. number of samples + K ... number of points == len(point_coords) """ neighbor_indices_list = self.spatial_points(target_points) + # List of lists of anchor neighbours of each point. mixed_results = [] - for neighbors in neighbor_indices_list: + for i_point, neighbors in enumerate(neighbor_indices_list): if len(neighbors) == 0: mixed_results.append(np.nan) elif len(neighbors) == 1: mixed_results.append(self.fields_indices[neighbors[0]]) else: - # "Provedeme průměr, pokud jich zbyde více + # TODO: use the field samples and compute mean of their values + # mixed_samples = field_samples[self.fields_indices[neighbors], i_point] # shape (len(neighbours), ) + # value = np.mean( mixed_samples) values = self.fields_indices[neighbors] mixed_results.append(np.mean(values)) - return np.array(mixed_results) \ No newline at end of file + return np.array(mixed_results) diff --git a/field_synthesis/generate.py b/field_synthesis/generate.py index afebbff..822aeaa 100644 --- a/field_synthesis/generate.py +++ b/field_synthesis/generate.py @@ -6,67 +6,69 @@ from field_synthesis import FieldSynthesis def load_data(zip_path): - """Витягує координати та всі сампли з одного ZIP-архіву.""" + """Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu.""" coords = None all_values = [] with zipfile.ZipFile(zip_path, 'r') as z: file_list = z.namelist() + # Načtení souřadnic coords_file = [f for f in file_list if 'coords' in f and f.endswith('.npz')][0] with z.open(coords_file) as f: data = np.load(io.BytesIO(f.read())) - coords = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data - + # Vybereme první pole z npz souboru + coords = data[data.files[0]] + + # Načtení hodnot polí value_files = [f for f in file_list if 'values' in f and f.endswith('.npz')] - print(f"Found {len(value_files)} source samples in ZIP.") + print(f"Nalezeno {len(value_files)} zdrojových vzorků v ZIPu.") for name in sorted(value_files): with z.open(name) as f: data = np.load(io.BytesIO(f.read())) - val = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data + val = data[data.files[0]] all_values.append(val) return coords, np.array(all_values) def main(): parser = argparse.ArgumentParser() - parser.add_argument("archive", help="Path to cond_tensors.zip") - parser.add_argument("-o", "--output", default="synthesis_results.zip") - parser.add_argument("--anchors", type=int, default=100) - parser.add_argument("--count", type=int, default=200) - parser.add_argument("--seed", type=int, default=42) + parser.add_argument("archive", help="Cesta k vstupnímu ZIP archivu (cond_tensors.zip)") + parser.add_argument("-o", "--output", default="synthesis_results.zip", help="Název výstupního souboru") + parser.add_argument("--anchors", type=int, default=100, help="Počet kotevních bodů (anchor points)") + parser.add_argument("--count", type=int, default=200, help="Počet polí k vygenerování") + parser.add_argument("--seed", type=int, default=42, help="Seed pro generátor náhodných čísel") args = parser.parse_args() + # 1. Načtení dat coords, samples_raw = load_data(args.archive) - samples = samples_raw[:, :, 0] + # Předpokládáme, že samples_raw má tvar (N, K, 1), převedeme na (N, K) + if samples_raw.ndim == 3: + samples = samples_raw[:, :, 0] + else: + samples = samples_raw - max_range = np.max(coords) + # 2. Inicializace FieldSynthesis + # Souřadnice bodů předáváme přímo do konstruktoru fs = FieldSynthesis( - area_size=max_range, + point_coords=coords, count_points=args.anchors, - num_source=len(samples), - dimension=3, seed=args.seed ) - print(f"Synthesizing {args.count} fields...") + print(f"Syntetizuji {args.count} polí...") + # 3. Generování nových polí with zipfile.ZipFile(args.output, 'w', compression=zipfile.ZIP_DEFLATED) as out_z: for i in range(args.count): - res = fs.mix_fields(coords) + # Každé volání mix_fields vytvoří unikátní pole díky vnitřnímu stavu RNG + res = fs.mix_fields(samples) + # Uložení výsledku do paměti (io.BytesIO) buffer = io.BytesIO() np.save(buffer, res) filename = f"field_{i:04d}.npy" - out_z.writestr(filename, buffer.getvalue()) - - if (i + 1) % 50 == 0: - print(f"Progress: {i + 1}/{args.count}") - - print(f"Done! Results in {args.output}") - -if __name__ == "__main__": - main() \ No newline at end of file + out_z.writestr(filename, buffer.getvalue()) \ No newline at end of file