From 86b3bf3c61d7b4202a2366103b93e20b5327455e Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Tue, 7 Apr 2026 12:34:05 +0200 Subject: [PATCH 1/3] Suggestions for development Added TODO comments for future improvements and clarifications regarding parameters and functionality. --- field_synthesis/field_synthesis.py | 38 ++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/field_synthesis/field_synthesis.py b/field_synthesis/field_synthesis.py index b5bd2e4..06c887a 100644 --- a/field_synthesis/field_synthesis.py +++ b/field_synthesis/field_synthesis.py @@ -11,8 +11,9 @@ class FieldSynthesis(): """ Třída pro syntézu prostorových polí pomocí generování kotevních bodů a jejich následného míchání. - """ - """ + + Assumption: The set of points where the input and output fields live is the same. + Inicializuje parametry syntézy pole. Args: @@ -21,13 +22,31 @@ class FieldSynthesis(): num_source (int): Počet dostupných zdrojů (typů polí). dimension (int): Dimenze prostoru (2D, 3D). seed (int): Seed pro generátor náhodných čísel. + + TODO: add parameter point_coords shape=(K, dimension) + """ area_size: int + # TODO: compute from point_coords as volume of the axis aligned bounding box. + # must be float. + # next, minimum distance is vol ** (1/dim) / safety_factor , e.g. saftey_factor = 2.0 + # should be in interval [1, 2]. + count_points: int + # Suggested number of anchor points, could be refined due to minimal distance. + num_source: int + # Number of field samples we will mix; + # TODO: remove this, we will know that only + # after passing the field samples to the mixing function. + dimension: int = 2 + # TODO: remove, diven by the shape of point_coords + free_space_ratio: float = 0.4 + # TODO: Remove, given indirectly through `count_points` + seed: int = 42 # self.min_distance = self.calc_distance() @@ -108,6 +127,9 @@ def spatial_points(self, target_points, k_neighbors=5) -> list: Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D. Args: target_points (np.ndarray): Pole cílových bodů, pro které chceme naj + + TODO: could be refactored into cached_property since target_points == point_coords are given + in constructor. """ tree = cKDTree(self.anchor_points) @@ -135,19 +157,25 @@ def mix_fields(self, target_points) -> np.ndarray: Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného Args: target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty. + TODO: pass in field samples, field_samples shape = (N, K) + N .. number of samples + K ... number of points == len(point_coords) """ neighbor_indices_list = self.spatial_points(target_points) + # List of lists of anchor neighbours of each point. mixed_results = [] - for neighbors in neighbor_indices_list: + for i_point, neighbors in enumerate(neighbor_indices_list): if len(neighbors) == 0: mixed_results.append(np.nan) elif len(neighbors) == 1: mixed_results.append(self.fields_indices[neighbors[0]]) else: - # "Provedeme průměr, pokud jich zbyde více + # TODO: use the field samples and compute mean of their values + # mixed_samples = field_samples[self.fields_indices[neighbors], i_point] # shape (len(neighbours), ) + # value = np.mean( mixed_samples) values = self.fields_indices[neighbors] mixed_results.append(np.mean(values)) - return np.array(mixed_results) \ No newline at end of file + return np.array(mixed_results) From 9b0bbb1ed254e0be907f35acf01967f8f7788712 Mon Sep 17 00:00:00 2001 From: eoan Date: Tue, 14 Apr 2026 09:39:21 +0200 Subject: [PATCH 2/3] change class --- field_synthesis/generate.py | 54 +++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/field_synthesis/generate.py b/field_synthesis/generate.py index afebbff..822aeaa 100644 --- a/field_synthesis/generate.py +++ b/field_synthesis/generate.py @@ -6,67 +6,69 @@ from field_synthesis import FieldSynthesis def load_data(zip_path): - """Витягує координати та всі сампли з одного ZIP-архіву.""" + """Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu.""" coords = None all_values = [] with zipfile.ZipFile(zip_path, 'r') as z: file_list = z.namelist() + # Načtení souřadnic coords_file = [f for f in file_list if 'coords' in f and f.endswith('.npz')][0] with z.open(coords_file) as f: data = np.load(io.BytesIO(f.read())) - coords = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data - + # Vybereme první pole z npz souboru + coords = data[data.files[0]] + + # Načtení hodnot polí value_files = [f for f in file_list if 'values' in f and f.endswith('.npz')] - print(f"Found {len(value_files)} source samples in ZIP.") + print(f"Nalezeno {len(value_files)} zdrojových vzorků v ZIPu.") for name in sorted(value_files): with z.open(name) as f: data = np.load(io.BytesIO(f.read())) - val = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data + val = data[data.files[0]] all_values.append(val) return coords, np.array(all_values) def main(): parser = argparse.ArgumentParser() - parser.add_argument("archive", help="Path to cond_tensors.zip") - parser.add_argument("-o", "--output", default="synthesis_results.zip") - parser.add_argument("--anchors", type=int, default=100) - parser.add_argument("--count", type=int, default=200) - parser.add_argument("--seed", type=int, default=42) + parser.add_argument("archive", help="Cesta k vstupnímu ZIP archivu (cond_tensors.zip)") + parser.add_argument("-o", "--output", default="synthesis_results.zip", help="Název výstupního souboru") + parser.add_argument("--anchors", type=int, default=100, help="Počet kotevních bodů (anchor points)") + parser.add_argument("--count", type=int, default=200, help="Počet polí k vygenerování") + parser.add_argument("--seed", type=int, default=42, help="Seed pro generátor náhodných čísel") args = parser.parse_args() + # 1. Načtení dat coords, samples_raw = load_data(args.archive) - samples = samples_raw[:, :, 0] + # Předpokládáme, že samples_raw má tvar (N, K, 1), převedeme na (N, K) + if samples_raw.ndim == 3: + samples = samples_raw[:, :, 0] + else: + samples = samples_raw - max_range = np.max(coords) + # 2. Inicializace FieldSynthesis + # Souřadnice bodů předáváme přímo do konstruktoru fs = FieldSynthesis( - area_size=max_range, + point_coords=coords, count_points=args.anchors, - num_source=len(samples), - dimension=3, seed=args.seed ) - print(f"Synthesizing {args.count} fields...") + print(f"Syntetizuji {args.count} polí...") + # 3. Generování nových polí with zipfile.ZipFile(args.output, 'w', compression=zipfile.ZIP_DEFLATED) as out_z: for i in range(args.count): - res = fs.mix_fields(coords) + # Každé volání mix_fields vytvoří unikátní pole díky vnitřnímu stavu RNG + res = fs.mix_fields(samples) + # Uložení výsledku do paměti (io.BytesIO) buffer = io.BytesIO() np.save(buffer, res) filename = f"field_{i:04d}.npy" - out_z.writestr(filename, buffer.getvalue()) - - if (i + 1) % 50 == 0: - print(f"Progress: {i + 1}/{args.count}") - - print(f"Done! Results in {args.output}") - -if __name__ == "__main__": - main() \ No newline at end of file + out_z.writestr(filename, buffer.getvalue()) \ No newline at end of file From a789481ba28674b0b422ae68f876836a71a7ad40 Mon Sep 17 00:00:00 2001 From: eoan Date: Tue, 14 Apr 2026 10:56:04 +0200 Subject: [PATCH 3/3] change class --- field_synthesis/field_synthesis.py | 234 ++++++++++-------------- field_synthesis/generate.py | 3 +- field_synthesis/test_field_synthesis.py | 81 ++++---- 3 files changed, 134 insertions(+), 184 deletions(-) diff --git a/field_synthesis/field_synthesis.py b/field_synthesis/field_synthesis.py index 06c887a..a433684 100644 --- a/field_synthesis/field_synthesis.py +++ b/field_synthesis/field_synthesis.py @@ -7,51 +7,36 @@ @dataclass(frozen=True) -class FieldSynthesis(): +class FieldSynthesis: + """ Třída pro syntézu prostorových polí pomocí generování kotevních bodů a jejich následného míchání. + * + """ - Assumption: The set of points where the input and output fields live is the same. - - Inicializuje parametry syntézy pole. + point_coords: np.ndarray # Shape (K, dimension) + count_points: int = 100 # Počet kotevních bodů (anchor points) + safety_factor: float = 1.5 # Pro výpočet min_distance + seed: int = 42 - Args: - area_size (float): Délka strany pracovní oblasti. - count_points (int): Požadovaný počet bodů k vygenerování. - num_source (int): Počet dostupných zdrojů (typů polí). - dimension (int): Dimenze prostoru (2D, 3D). - seed (int): Seed pro generátor náhodných čísel. - - TODO: add parameter point_coords shape=(K, dimension) - - """ + @property + def dimension(self) -> int: + return self.point_coords.shape[1] - area_size: int - # TODO: compute from point_coords as volume of the axis aligned bounding box. - # must be float. - # next, minimum distance is vol ** (1/dim) / safety_factor , e.g. saftey_factor = 2.0 - # should be in interval [1, 2]. - - count_points: int - # Suggested number of anchor points, could be refined due to minimal distance. - - num_source: int - # Number of field samples we will mix; - # TODO: remove this, we will know that only - # after passing the field samples to the mixing function. - - dimension: int = 2 - # TODO: remove, diven by the shape of point_coords - - free_space_ratio: float = 0.4 - # TODO: Remove, given indirectly through `count_points` - - seed: int = 42 - - # self.min_distance = self.calc_distance() - # self.anchor_points = None - # self.field_indices = None + @cached_property + def area_stats(self) -> dict: + """Vypočítá bounding box a objem pracovní oblasti.""" + min_bounds = np.min(self.point_coords, axis=0) + max_bounds = np.max(self.point_coords, axis=0) + sides = max_bounds - min_bounds + volume = np.prod(sides) + return { + "min": min_bounds, + "max": max_bounds, + "volume": volume, + "sides": sides + } @cached_property def rng(self) -> np.random.Generator: @@ -60,122 +45,99 @@ def rng(self) -> np.random.Generator: @cached_property def min_distance(self) -> float: """ - Odhad distance D pro zadaný počet bodů a stranu krychle. - D se odhaduje na základě poměru volného prostoru a obsazeného prostoru. + Výpočet minimální vzdálenosti na základě objemu a počtu bodů. + D = (Vol / N)^(1/dim) / safety_factor """ - if self.count_points <= 0 or self.area_size <= 0: + vol = self.area_stats["volume"] + if self.count_points <= 0 or vol <= 0: return 0.0 - total_vol = self.area_size ** self.dimension - occupied_ratio = 1.0 - self.free_space_ratio - vol_per_point = (total_vol * occupied_ratio) / self.count_points - - if self.dimension == 2: - return math.sqrt(vol_per_point) - else: - return vol_per_point ** (1/self.dimension) - + vol_per_point = vol / self.count_points + return (vol_per_point ** (1 / self.dimension)) / self.safety_factor @cached_property def anchor_points(self) -> np.ndarray: - """ - Funkce vygeneruje pole nahodnych bodu. - - :param count_points: pocet nahodnych bodu. - :param min_distance: minimalni vzdalenost mezi bodami. - :param area_size: velokost matici. - :return: vrati pole koordinat [[x1, y1], [x2, y2], ...] - """ - if self.count_points <= 0 or self.area_size <= 0: - return np.zeros((0, 2)) - - if self.min_distance < 0: - self.min_distance = 0 - - radius = self.min_distance/self.area_size - - if radius > 1: - radius = 0.99 - try: - engine = PoissonDisk(d=self.dimension, radius=radius, seed=42) - - #ten algorytm je nakladny na cas a resurs pocitace - # points = engine.fill_space() * area_size + """Generuje kotevní body pomocí Poisson Disk Sampling v rámci bounding boxu.""" + if self.count_points <= 0: + return np.zeros((0, self.dimension)) - # if (len(points) > count_points): - # return points[:count_points] + # Normalizovaný rádius pro PoissonDisk (v jednotkách [0, 1]) + # Používáme průměrnou stranu pro normalizaci + avg_side = np.mean(self.area_stats["sides"]) + radius = self.min_distance / avg_side if avg_side > 0 else 0.1 + radius = min(max(radius, 0.01), 0.99) - return engine.random(self.count_points) * self.area_size + try: + engine = PoissonDisk(d=self.dimension, radius=radius, seed=self.seed) + # Vygenerujeme body a roztáhneme je na rozměry bounding boxu + points = engine.random(self.count_points) + return self.area_stats["min"] + points * self.area_stats["sides"] except Exception: - return np.zeros((0, 2)) + # Fallback na čistě náhodné body v případě chyby engine + return self.rng.uniform( + self.area_stats["min"], + self.area_stats["max"], + (self.count_points, self.dimension) + ) + + def get_fields_indices(self, num_source: int) -> np.ndarray: + """Přiřadí každému kotevnímu bodu index jednoho ze zdrojových polí.""" + return self.rng.integers(0, num_source, size=len(self.anchor_points)) @cached_property - def fields_indices(self) -> np.ndarray: + def neighbor_data(self): """ - Každému bodu přiřadíme náhodně jedno ze zdrojových N polí. + Předvypočítá sousedy pro všechny target_points (point_coords). + Vrací seznam indexů kotevních bodů pro každý bod v point_coords. """ - if self.anchor_points is None or len(self.anchor_points) == 0: - return np.array([], dtype=int) - - return self.rng.integers(0, self.num_source, size=len(self.anchor_points)) - + if len(self.anchor_points) == 0: + return [np.array([], dtype=int)] * len(self.point_coords) - def spatial_points(self, target_points, k_neighbors=5) -> list: - """ - Vektorizované vyhledání sousedů pomocí cKDTree. - Pro každý cílový bod získáme k nejbližších kotevních bodů a jejich vzdálenosti. - Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D. - Args: - target_points (np.ndarray): Pole cílových bodů, pro které chceme naj - - TODO: could be refactored into cached_property since target_points == point_coords are given - in constructor. - """ - tree = cKDTree(self.anchor_points) - R_LIMIT = 2 * self.min_distance - - # Vektorizovaný dotaz - distances, indices = tree.query(target_points, k=k_neighbors) - - # Vektorizovaná maska vzdálenosti - valid_neighbor_mask = distances <= R_LIMIT - - final_result_indices = [] - for i in range(len(target_points)): - # Výběr platných indexů pro každý cílový bod - current_valid = indices[i][valid_neighbor_mask[i]] - final_result_indices.append(current_valid) - - return final_result_indices + r_limit = 2 * self.min_distance + # k_neighbors omezíme počtem dostupných kotevních bodů + k = min(10, len(self.anchor_points)) + + distances, indices = tree.query(self.point_coords, k=k, distance_upper_bound=r_limit) + + final_indices = [] + for i in range(len(self.point_coords)): + # cKDTree vrací 'inf' a index == len(data) pro nenalezené sousedy + idx = indices[i] + valid = idx < len(self.anchor_points) + final_indices.append(idx[valid]) + + return final_indices - - def mix_fields(self, target_points) -> np.ndarray: + def mix_fields(self, field_samples: np.ndarray) -> np.ndarray: """ - Finální míchání polí (průměrování). - Pro každý cílový bod získáme jeho sousedy pomocí spatial_points. - Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného + Míchání polí na základě předvypočítaných sousedů. + Args: - target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty. - TODO: pass in field samples, field_samples shape = (N, K) - N .. number of samples - K ... number of points == len(point_coords) + field_samples (np.ndarray): Shape (N, K), kde N je počet zdrojů + a K je počet bodů (len(point_coords)). """ + num_source, num_points = field_samples.shape + if num_points != len(self.point_coords): + raise ValueError("Počet bodů v field_samples neodpovídá point_coords.") + + # Generujeme indexy polí pro kotevní body + anchor_to_field_map = self.get_fields_indices(num_source) - neighbor_indices_list = self.spatial_points(target_points) - # List of lists of anchor neighbours of each point. - - mixed_results = [] + mixed_result = np.full(num_points, np.nan) + neighbor_indices_list = self.neighbor_data + for i_point, neighbors in enumerate(neighbor_indices_list): if len(neighbors) == 0: - mixed_results.append(np.nan) - elif len(neighbors) == 1: - mixed_results.append(self.fields_indices[neighbors[0]]) - else: - # TODO: use the field samples and compute mean of their values - # mixed_samples = field_samples[self.fields_indices[neighbors], i_point] # shape (len(neighbours), ) - # value = np.mean( mixed_samples) - values = self.fields_indices[neighbors] - mixed_results.append(np.mean(values)) - - return np.array(mixed_results) + continue + + # Získáme indexy zdrojových polí, které patří k sousedním kotevním bodům + source_indices = anchor_to_field_map[neighbors] + + # Vybereme hodnoty z příslušných polí pro daný bod i_point + # field_samples[source_idx, i_point] + values = field_samples[source_indices, i_point] + + mixed_result[i_point] = np.mean(values) + + return mixed_result \ No newline at end of file diff --git a/field_synthesis/generate.py b/field_synthesis/generate.py index 822aeaa..af72883 100644 --- a/field_synthesis/generate.py +++ b/field_synthesis/generate.py @@ -1,10 +1,11 @@ import argparse import zipfile import io -import os import numpy as np from field_synthesis import FieldSynthesis + + def load_data(zip_path): """Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu.""" coords = None diff --git a/field_synthesis/test_field_synthesis.py b/field_synthesis/test_field_synthesis.py index a13f8a5..34bd634 100644 --- a/field_synthesis/test_field_synthesis.py +++ b/field_synthesis/test_field_synthesis.py @@ -3,30 +3,44 @@ from field_synthesis import FieldSynthesis @pytest.fixture -def fs_instance(): +def sample_coords(): + """Створює сітку точок 10x10 для тестів.""" + x = np.linspace(0, 100, 10) + y = np.linspace(0, 100, 10) + xv, yv = np.meshgrid(x, y) + return np.stack([xv.ravel(), yv.ravel()], axis=-1) # Shape (100, 2) + +@pytest.fixture +def fs_instance(sample_coords): """Vytvoří základní testovací instanci třídy.""" return FieldSynthesis( - area_size=100.0, + point_coords=sample_coords, count_points=50, - num_source=5, - dimension=2, seed=42 ) ## --- Testy Inicializace a Vlastností --- -def test_initialization(fs_instance): +def test_initialization(fs_instance, sample_coords): """Ověří, že parametry jsou správně nastaveny.""" - assert fs_instance.area_size == 100.0 + # Testujeme shodu souřadnic + np.testing.assert_array_equal(fs_instance.point_coords, sample_coords) assert fs_instance.count_points == 50 - assert fs_instance.num_source == 5 + assert fs_instance.dimension == 2 + +def test_area_stats(fs_instance): + """Ověří výpočet statistik oblasti.""" + stats = fs_instance.area_stats + assert stats["volume"] == 10000.0 # 100 * 100 + assert np.all(stats["min"] == 0) + assert np.all(stats["max"] == 100) def test_cached_distance(fs_instance): """Ověří výpočet minimální vzdálenosti.""" dist = fs_instance.min_distance assert isinstance(dist, float) assert dist > 0 - # Ověření, že se hodnota nemění (cache) + # Ověření cache assert fs_instance.min_distance == dist ## --- Testy Generování Bodů --- @@ -37,51 +51,24 @@ def test_generate_points(fs_instance): assert isinstance(points, np.ndarray) # Tvar by měl odpovídat (count_points, dimension) assert points.shape == (50, 2) - # Body musí být v mezích oblasti - assert np.all(points >= 0) - assert np.all(points <= 100.0) + # Body musí být v mezích vypočítaného bounding boxu + stats = fs_instance.area_stats + assert np.all(points >= stats["min"]) + assert np.all(points <= stats["max"]) def test_assign_source_fields(fs_instance): """Ověří přiřazení indexů polí k bodům.""" - indices = fs_instance.fields_indices + # Testujeme pro 5 zdrojových polí + num_source = 5 + indices = fs_instance.get_fields_indices(num_source) assert len(indices) == 50 - # Indexy musí být v rozsahu [0, num_source - 1] assert np.all(indices >= 0) - assert np.all(indices < 5) - assert indices.dtype.kind in 'iu' # integer nebo unsigned integer + assert np.all(indices < num_source) ## --- Testy Prostorové Logiky --- -def test_spatial_points_return_type(fs_instance): - """Ověří, že vyhledávání sousedů vrací seznam indexů.""" - target = np.array([[50.0, 50.0]]) - neighbors = fs_instance.spatial_points(target) +def test_neighbor_data(fs_instance): + """Ověří, že vyhledávání sousedů vrací seznam indexů pro každý bod.""" + neighbors = fs_instance.neighbor_data assert isinstance(neighbors, list) - assert len(neighbors) == 1 - assert isinstance(neighbors[0], np.ndarray) - -def test_mix_fields_logic(fs_instance): - """Ověří, že míchání polí vrací správný počet výsledků.""" - target_points = np.array([[10, 10], [50, 50], [90, 90]]) - results = fs_instance.mix_fields(target_points) - - assert isinstance(results, np.ndarray) - assert len(results) == 3 - # Výsledek by měl být buď číslo (průměr) nebo NaN, pokud v okolí nic není - assert results.dtype == np.float64 - -## --- Testy Robustnosti --- - -def test_reproducibility(): - """Ověří, že stejný seed generuje identické výsledky.""" - fs1 = FieldSynthesis(100, 20, 3, seed=10) - fs2 = FieldSynthesis(100, 20, 3, seed=10) - - np.testing.assert_array_equal(fs1.anchor_points, fs2.anchor_points) - np.testing.assert_array_equal(fs1.fields_indices, fs2.fields_indices) - -def test_empty_area_handling(): - """Ověří chování při nulovém počtu bodů.""" - fs = FieldSynthesis(area_size=100, count_points=0, num_source=2) - assert fs.min_distance == 0.0 - assert fs.anchor_points.shape == (0, 2) \ No newline at end of file + assert len(neighbors) == len \ No newline at end of file