-
Notifications
You must be signed in to change notification settings - Fork 0
Nodianosh class #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,32 +7,36 @@ | |
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class FieldSynthesis(): | ||
| class FieldSynthesis: | ||
|
|
||
| """ | ||
| Třída pro syntézu prostorových polí pomocí generování kotevních bodů | ||
| a jejich následného míchání. | ||
| * | ||
| """ | ||
| """ | ||
| Inicializuje parametry syntézy pole. | ||
|
|
||
| Args: | ||
| area_size (float): Délka strany pracovní oblasti. | ||
| count_points (int): Požadovaný počet bodů k vygenerování. | ||
| num_source (int): Počet dostupných zdrojů (typů polí). | ||
| dimension (int): Dimenze prostoru (2D, 3D). | ||
| seed (int): Seed pro generátor náhodných čísel. | ||
| """ | ||
| point_coords: np.ndarray # Shape (K, dimension) | ||
| count_points: int = 100 # Počet kotevních bodů (anchor points) | ||
| safety_factor: float = 1.5 # Pro výpočet min_distance | ||
| seed: int = 42 | ||
|
|
||
| area_size: int | ||
| count_points: int | ||
| num_source: int | ||
| dimension: int = 2 | ||
| free_space_ratio: float = 0.4 | ||
| seed: int = 42 | ||
| @property | ||
| def dimension(self) -> int: | ||
| return self.point_coords.shape[1] | ||
|
|
||
| # self.min_distance = self.calc_distance() | ||
| # self.anchor_points = None | ||
| # self.field_indices = None | ||
| @cached_property | ||
| def area_stats(self) -> dict: | ||
| """Vypočítá bounding box a objem pracovní oblasti.""" | ||
| min_bounds = np.min(self.point_coords, axis=0) | ||
| max_bounds = np.max(self.point_coords, axis=0) | ||
| sides = max_bounds - min_bounds | ||
| volume = np.prod(sides) | ||
| return { | ||
| "min": min_bounds, | ||
| "max": max_bounds, | ||
| "volume": volume, | ||
| "sides": sides | ||
| } | ||
|
|
||
| @cached_property | ||
| def rng(self) -> np.random.Generator: | ||
|
|
@@ -41,113 +45,99 @@ def rng(self) -> np.random.Generator: | |
| @cached_property | ||
| def min_distance(self) -> float: | ||
| """ | ||
| Odhad distance D pro zadaný počet bodů a stranu krychle. | ||
| D se odhaduje na základě poměru volného prostoru a obsazeného prostoru. | ||
| Výpočet minimální vzdálenosti na základě objemu a počtu bodů. | ||
| D = (Vol / N)^(1/dim) / safety_factor | ||
| """ | ||
| if self.count_points <= 0 or self.area_size <= 0: | ||
| vol = self.area_stats["volume"] | ||
| if self.count_points <= 0 or vol <= 0: | ||
| return 0.0 | ||
|
|
||
| total_vol = self.area_size ** self.dimension | ||
| occupied_ratio = 1.0 - self.free_space_ratio | ||
| vol_per_point = (total_vol * occupied_ratio) / self.count_points | ||
|
|
||
| if self.dimension == 2: | ||
| return math.sqrt(vol_per_point) | ||
| else: | ||
| return vol_per_point ** (1/self.dimension) | ||
|
|
||
| vol_per_point = vol / self.count_points | ||
| return (vol_per_point ** (1 / self.dimension)) / self.safety_factor | ||
|
|
||
| @cached_property | ||
| def anchor_points(self) -> np.ndarray: | ||
| """ | ||
| Funkce vygeneruje pole nahodnych bodu. | ||
|
|
||
| :param count_points: pocet nahodnych bodu. | ||
| :param min_distance: minimalni vzdalenost mezi bodami. | ||
| :param area_size: velokost matici. | ||
| :return: vrati pole koordinat [[x1, y1], [x2, y2], ...] | ||
| """ | ||
| if self.count_points <= 0 or self.area_size <= 0: | ||
| return np.zeros((0, 2)) | ||
|
|
||
| if self.min_distance < 0: | ||
| self.min_distance = 0 | ||
|
|
||
| radius = self.min_distance/self.area_size | ||
|
|
||
| if radius > 1: | ||
| radius = 0.99 | ||
| try: | ||
| engine = PoissonDisk(d=self.dimension, radius=radius, seed=42) | ||
| """Generuje kotevní body pomocí Poisson Disk Sampling v rámci bounding boxu.""" | ||
| if self.count_points <= 0: | ||
| return np.zeros((0, self.dimension)) | ||
|
|
||
| #ten algorytm je nakladny na cas a resurs pocitace | ||
| # points = engine.fill_space() * area_size | ||
| # Normalizovaný rádius pro PoissonDisk (v jednotkách [0, 1]) | ||
| # Používáme průměrnou stranu pro normalizaci | ||
| avg_side = np.mean(self.area_stats["sides"]) | ||
| radius = self.min_distance / avg_side if avg_side > 0 else 0.1 | ||
| radius = min(max(radius, 0.01), 0.99) | ||
|
|
||
| # if (len(points) > count_points): | ||
| # return points[:count_points] | ||
|
|
||
| return engine.random(self.count_points) * self.area_size | ||
| try: | ||
| engine = PoissonDisk(d=self.dimension, radius=radius, seed=self.seed) | ||
| # Vygenerujeme body a roztáhneme je na rozměry bounding boxu | ||
| points = engine.random(self.count_points) | ||
| return self.area_stats["min"] + points * self.area_stats["sides"] | ||
| except Exception: | ||
| return np.zeros((0, 2)) | ||
| # Fallback na čistě náhodné body v případě chyby engine | ||
| return self.rng.uniform( | ||
| self.area_stats["min"], | ||
| self.area_stats["max"], | ||
| (self.count_points, self.dimension) | ||
| ) | ||
|
|
||
| def get_fields_indices(self, num_source: int) -> np.ndarray: | ||
| """Přiřadí každému kotevnímu bodu index jednoho ze zdrojových polí.""" | ||
| return self.rng.integers(0, num_source, size=len(self.anchor_points)) | ||
|
|
||
| @cached_property | ||
| def fields_indices(self) -> np.ndarray: | ||
| def neighbor_data(self): | ||
| """ | ||
| Každému bodu přiřadíme náhodně jedno ze zdrojových N polí. | ||
| Předvypočítá sousedy pro všechny target_points (point_coords). | ||
| Vrací seznam indexů kotevních bodů pro každý bod v point_coords. | ||
| """ | ||
| if self.anchor_points is None or len(self.anchor_points) == 0: | ||
| return np.array([], dtype=int) | ||
|
|
||
| return self.rng.integers(0, self.num_source, size=len(self.anchor_points)) | ||
|
|
||
| if len(self.anchor_points) == 0: | ||
| return [np.array([], dtype=int)] * len(self.point_coords) | ||
|
|
||
| def spatial_points(self, target_points, k_neighbors=5) -> list: | ||
| """ | ||
| Vektorizované vyhledání sousedů pomocí cKDTree. | ||
| Pro každý cílový bod získáme k nejbližších kotevních bodů a jejich vzdálenosti. | ||
| Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D. | ||
| Args: | ||
| target_points (np.ndarray): Pole cílových bodů, pro které chceme naj | ||
| """ | ||
|
|
||
| tree = cKDTree(self.anchor_points) | ||
| R_LIMIT = 2 * self.min_distance | ||
|
|
||
| # Vektorizovaný dotaz | ||
| distances, indices = tree.query(target_points, k=k_neighbors) | ||
|
|
||
| # Vektorizovaná maska vzdálenosti | ||
| valid_neighbor_mask = distances <= R_LIMIT | ||
|
|
||
| final_result_indices = [] | ||
| for i in range(len(target_points)): | ||
| # Výběr platných indexů pro každý cílový bod | ||
| current_valid = indices[i][valid_neighbor_mask[i]] | ||
| final_result_indices.append(current_valid) | ||
|
|
||
| return final_result_indices | ||
| r_limit = 2 * self.min_distance | ||
| # k_neighbors omezíme počtem dostupných kotevních bodů | ||
| k = min(10, len(self.anchor_points)) | ||
|
|
||
| distances, indices = tree.query(self.point_coords, k=k, distance_upper_bound=r_limit) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment it arrays of shape (len(point_coords), k). |
||
| final_indices = [] | ||
| for i in range(len(self.point_coords)): | ||
| # cKDTree vrací 'inf' a index == len(data) pro nenalezené sousedy | ||
| idx = indices[i] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. idx -> candidate_indices |
||
| valid = idx < len(self.anchor_points) | ||
| final_indices.append(idx[valid]) | ||
|
|
||
| return final_indices | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List[List[int]] |
||
|
|
||
|
|
||
| def mix_fields(self, target_points) -> np.ndarray: | ||
| def mix_fields(self, field_samples: np.ndarray) -> np.ndarray: | ||
| """ | ||
| Finální míchání polí (průměrování). | ||
| Pro každý cílový bod získáme jeho sousedy pomocí spatial_points. | ||
| Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného | ||
| Míchání polí na základě předvypočítaných sousedů. | ||
|
|
||
| Args: | ||
| target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty. | ||
| field_samples (np.ndarray): Shape (N, K), kde N je počet zdrojů | ||
| a K je počet bodů (len(point_coords)). | ||
| """ | ||
| num_source, num_points = field_samples.shape | ||
| if num_points != len(self.point_coords): | ||
| raise ValueError("Počet bodů v field_samples neodpovídá point_coords.") | ||
|
|
||
| # Generujeme indexy polí pro kotevní body | ||
| anchor_to_field_map = self.get_fields_indices(num_source) | ||
|
|
||
| neighbor_indices_list = self.spatial_points(target_points) | ||
|
|
||
| mixed_results = [] | ||
| for neighbors in neighbor_indices_list: | ||
| mixed_result = np.full(num_points, np.nan) | ||
| neighbor_indices_list = self.neighbor_data | ||
|
|
||
| for i_point, neighbors in enumerate(neighbor_indices_list): | ||
| if len(neighbors) == 0: | ||
| mixed_results.append(np.nan) | ||
| elif len(neighbors) == 1: | ||
| mixed_results.append(self.fields_indices[neighbors[0]]) | ||
| else: | ||
| # "Provedeme průměr, pokud jich zbyde více | ||
| values = self.fields_indices[neighbors] | ||
| mixed_results.append(np.mean(values)) | ||
|
|
||
| return np.array(mixed_results) | ||
| continue | ||
|
|
||
| # Získáme indexy zdrojových polí, které patří k sousedním kotevním bodům | ||
| source_indices = anchor_to_field_map[neighbors] | ||
|
|
||
| # Vybereme hodnoty z příslušných polí pro daný bod i_point | ||
| # field_samples[source_idx, i_point] | ||
| values = field_samples[source_indices, i_point] | ||
|
|
||
| mixed_result[i_point] = np.mean(values) | ||
|
|
||
| return mixed_result | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,72 +1,75 @@ | ||
| import argparse | ||
| import zipfile | ||
| import io | ||
| import os | ||
| import numpy as np | ||
| from field_synthesis import FieldSynthesis | ||
|
|
||
|
|
||
|
|
||
| def load_data(zip_path): | ||
| """Витягує координати та всі сампли з одного ZIP-архіву.""" | ||
| """Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu.""" | ||
| coords = None | ||
| all_values = [] | ||
|
|
||
| with zipfile.ZipFile(zip_path, 'r') as z: | ||
| file_list = z.namelist() | ||
|
|
||
| # Načtení souřadnic | ||
| coords_file = [f for f in file_list if 'coords' in f and f.endswith('.npz')][0] | ||
| with z.open(coords_file) as f: | ||
| data = np.load(io.BytesIO(f.read())) | ||
| coords = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data | ||
|
|
||
| # Vybereme první pole z npz souboru | ||
| coords = data[data.files[0]] | ||
|
|
||
| # Načtení hodnot polí | ||
| value_files = [f for f in file_list if 'values' in f and f.endswith('.npz')] | ||
| print(f"Found {len(value_files)} source samples in ZIP.") | ||
| print(f"Nalezeno {len(value_files)} zdrojových vzorků v ZIPu.") | ||
|
|
||
| for name in sorted(value_files): | ||
| with z.open(name) as f: | ||
| data = np.load(io.BytesIO(f.read())) | ||
| val = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data | ||
| val = data[data.files[0]] | ||
| all_values.append(val) | ||
|
|
||
| return coords, np.array(all_values) | ||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("archive", help="Path to cond_tensors.zip") | ||
| parser.add_argument("-o", "--output", default="synthesis_results.zip") | ||
| parser.add_argument("--anchors", type=int, default=100) | ||
| parser.add_argument("--count", type=int, default=200) | ||
| parser.add_argument("--seed", type=int, default=42) | ||
| parser.add_argument("archive", help="Cesta k vstupnímu ZIP archivu (cond_tensors.zip)") | ||
| parser.add_argument("-o", "--output", default="synthesis_results.zip", help="Název výstupního souboru") | ||
| parser.add_argument("--anchors", type=int, default=100, help="Počet kotevních bodů (anchor points)") | ||
| parser.add_argument("--count", type=int, default=200, help="Počet polí k vygenerování") | ||
| parser.add_argument("--seed", type=int, default=42, help="Seed pro generátor náhodných čísel") | ||
| args = parser.parse_args() | ||
|
|
||
| # 1. Načtení dat | ||
| coords, samples_raw = load_data(args.archive) | ||
|
|
||
| samples = samples_raw[:, :, 0] | ||
| # Předpokládáme, že samples_raw má tvar (N, K, 1), převedeme na (N, K) | ||
| if samples_raw.ndim == 3: | ||
| samples = samples_raw[:, :, 0] | ||
| else: | ||
| samples = samples_raw | ||
|
|
||
| max_range = np.max(coords) | ||
| # 2. Inicializace FieldSynthesis | ||
| # Souřadnice bodů předáváme přímo do konstruktoru | ||
| fs = FieldSynthesis( | ||
| area_size=max_range, | ||
| point_coords=coords, | ||
| count_points=args.anchors, | ||
| num_source=len(samples), | ||
| dimension=3, | ||
| seed=args.seed | ||
| ) | ||
|
|
||
| print(f"Synthesizing {args.count} fields...") | ||
| print(f"Syntetizuji {args.count} polí...") | ||
|
|
||
| # 3. Generování nových polí | ||
| with zipfile.ZipFile(args.output, 'w', compression=zipfile.ZIP_DEFLATED) as out_z: | ||
| for i in range(args.count): | ||
| res = fs.mix_fields(coords) | ||
| # Každé volání mix_fields vytvoří unikátní pole díky vnitřnímu stavu RNG | ||
| res = fs.mix_fields(samples) | ||
|
|
||
| # Uložení výsledku do paměti (io.BytesIO) | ||
| buffer = io.BytesIO() | ||
| np.save(buffer, res) | ||
|
|
||
| filename = f"field_{i:04d}.npy" | ||
| out_z.writestr(filename, buffer.getvalue()) | ||
|
|
||
| if (i + 1) % 50 == 0: | ||
| print(f"Progress: {i + 1}/{args.count}") | ||
|
|
||
| print(f"Done! Results in {args.output}") | ||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| out_z.writestr(filename, buffer.getvalue()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neodchytávat.