Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 98 additions & 108 deletions field_synthesis/field_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,36 @@


@dataclass(frozen=True)
class FieldSynthesis():
class FieldSynthesis:

"""
Třída pro syntézu prostorových polí pomocí generování kotevních bodů
a jejich následného míchání.
*
"""
"""
Inicializuje parametry syntézy pole.

Args:
area_size (float): Délka strany pracovní oblasti.
count_points (int): Požadovaný počet bodů k vygenerování.
num_source (int): Počet dostupných zdrojů (typů polí).
dimension (int): Dimenze prostoru (2D, 3D).
seed (int): Seed pro generátor náhodných čísel.
"""
point_coords: np.ndarray # Shape (K, dimension)
count_points: int = 100 # Počet kotevních bodů (anchor points)
safety_factor: float = 1.5 # Pro výpočet min_distance
seed: int = 42

area_size: int
count_points: int
num_source: int
dimension: int = 2
free_space_ratio: float = 0.4
seed: int = 42
@property
def dimension(self) -> int:
return self.point_coords.shape[1]

# self.min_distance = self.calc_distance()
# self.anchor_points = None
# self.field_indices = None
@cached_property
def area_stats(self) -> dict:
"""Vypočítá bounding box a objem pracovní oblasti."""
min_bounds = np.min(self.point_coords, axis=0)
max_bounds = np.max(self.point_coords, axis=0)
sides = max_bounds - min_bounds
volume = np.prod(sides)
return {
"min": min_bounds,
"max": max_bounds,
"volume": volume,
"sides": sides
}

@cached_property
def rng(self) -> np.random.Generator:
Expand All @@ -41,113 +45,99 @@ def rng(self) -> np.random.Generator:
@cached_property
def min_distance(self) -> float:
"""
Odhad distance D pro zadaný počet bodů a stranu krychle.
D se odhaduje na základě poměru volného prostoru a obsazeného prostoru.
Výpočet minimální vzdálenosti na základě objemu a počtu bodů.
D = (Vol / N)^(1/dim) / safety_factor
"""
if self.count_points <= 0 or self.area_size <= 0:
vol = self.area_stats["volume"]
if self.count_points <= 0 or vol <= 0:
return 0.0

total_vol = self.area_size ** self.dimension
occupied_ratio = 1.0 - self.free_space_ratio
vol_per_point = (total_vol * occupied_ratio) / self.count_points

if self.dimension == 2:
return math.sqrt(vol_per_point)
else:
return vol_per_point ** (1/self.dimension)

vol_per_point = vol / self.count_points
return (vol_per_point ** (1 / self.dimension)) / self.safety_factor

@cached_property
def anchor_points(self) -> np.ndarray:
"""
Funkce vygeneruje pole nahodnych bodu.

:param count_points: pocet nahodnych bodu.
:param min_distance: minimalni vzdalenost mezi bodami.
:param area_size: velokost matici.
:return: vrati pole koordinat [[x1, y1], [x2, y2], ...]
"""
if self.count_points <= 0 or self.area_size <= 0:
return np.zeros((0, 2))

if self.min_distance < 0:
self.min_distance = 0

radius = self.min_distance/self.area_size

if radius > 1:
radius = 0.99
try:
engine = PoissonDisk(d=self.dimension, radius=radius, seed=42)
"""Generuje kotevní body pomocí Poisson Disk Sampling v rámci bounding boxu."""
if self.count_points <= 0:
return np.zeros((0, self.dimension))

#ten algorytm je nakladny na cas a resurs pocitace
# points = engine.fill_space() * area_size
# Normalizovaný rádius pro PoissonDisk (v jednotkách [0, 1])
# Používáme průměrnou stranu pro normalizaci
avg_side = np.mean(self.area_stats["sides"])
radius = self.min_distance / avg_side if avg_side > 0 else 0.1
radius = min(max(radius, 0.01), 0.99)

# if (len(points) > count_points):
# return points[:count_points]

return engine.random(self.count_points) * self.area_size
try:
engine = PoissonDisk(d=self.dimension, radius=radius, seed=self.seed)
# Vygenerujeme body a roztáhneme je na rozměry bounding boxu
points = engine.random(self.count_points)
return self.area_stats["min"] + points * self.area_stats["sides"]
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neodchytávat.

return np.zeros((0, 2))
# Fallback na čistě náhodné body v případě chyby engine
return self.rng.uniform(
self.area_stats["min"],
self.area_stats["max"],
(self.count_points, self.dimension)
)

def get_fields_indices(self, num_source: int) -> np.ndarray:
"""Přiřadí každému kotevnímu bodu index jednoho ze zdrojových polí."""
return self.rng.integers(0, num_source, size=len(self.anchor_points))

@cached_property
def fields_indices(self) -> np.ndarray:
def neighbor_data(self):
"""
Každému bodu přiřadíme náhodně jedno ze zdrojových N polí.
Předvypočítá sousedy pro všechny target_points (point_coords).
Vrací seznam indexů kotevních bodů pro každý bod v point_coords.
"""
if self.anchor_points is None or len(self.anchor_points) == 0:
return np.array([], dtype=int)

return self.rng.integers(0, self.num_source, size=len(self.anchor_points))

if len(self.anchor_points) == 0:
return [np.array([], dtype=int)] * len(self.point_coords)

def spatial_points(self, target_points, k_neighbors=5) -> list:
"""
Vektorizované vyhledání sousedů pomocí cKDTree.
Pro každý cílový bod získáme k nejbližších kotevních bodů a jejich vzdálenosti.
Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D.
Args:
target_points (np.ndarray): Pole cílových bodů, pro které chceme naj
"""

tree = cKDTree(self.anchor_points)
R_LIMIT = 2 * self.min_distance

# Vektorizovaný dotaz
distances, indices = tree.query(target_points, k=k_neighbors)

# Vektorizovaná maska vzdálenosti
valid_neighbor_mask = distances <= R_LIMIT

final_result_indices = []
for i in range(len(target_points)):
# Výběr platných indexů pro každý cílový bod
current_valid = indices[i][valid_neighbor_mask[i]]
final_result_indices.append(current_valid)

return final_result_indices
r_limit = 2 * self.min_distance
# k_neighbors omezíme počtem dostupných kotevních bodů
k = min(10, len(self.anchor_points))

distances, indices = tree.query(self.point_coords, k=k, distance_upper_bound=r_limit)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment it arrays of shape (len(point_coords), k).

final_indices = []
for i in range(len(self.point_coords)):
# cKDTree vrací 'inf' a index == len(data) pro nenalezené sousedy
idx = indices[i]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idx -> candidate_indices

valid = idx < len(self.anchor_points)
final_indices.append(idx[valid])

return final_indices
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List[List[int]]



def mix_fields(self, target_points) -> np.ndarray:
def mix_fields(self, field_samples: np.ndarray) -> np.ndarray:
"""
Finální míchání polí (průměrování).
Pro každý cílový bod získáme jeho sousedy pomocí spatial_points.
Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného
Míchání polí na základě předvypočítaných sousedů.

Args:
target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty.
field_samples (np.ndarray): Shape (N, K), kde N je počet zdrojů
a K je počet bodů (len(point_coords)).
"""
num_source, num_points = field_samples.shape
if num_points != len(self.point_coords):
raise ValueError("Počet bodů v field_samples neodpovídá point_coords.")

# Generujeme indexy polí pro kotevní body
anchor_to_field_map = self.get_fields_indices(num_source)

neighbor_indices_list = self.spatial_points(target_points)

mixed_results = []
for neighbors in neighbor_indices_list:
mixed_result = np.full(num_points, np.nan)
neighbor_indices_list = self.neighbor_data

for i_point, neighbors in enumerate(neighbor_indices_list):
if len(neighbors) == 0:
mixed_results.append(np.nan)
elif len(neighbors) == 1:
mixed_results.append(self.fields_indices[neighbors[0]])
else:
# "Provedeme průměr, pokud jich zbyde více
values = self.fields_indices[neighbors]
mixed_results.append(np.mean(values))

return np.array(mixed_results)
continue

# Získáme indexy zdrojových polí, které patří k sousedním kotevním bodům
source_indices = anchor_to_field_map[neighbors]

# Vybereme hodnoty z příslušných polí pro daný bod i_point
# field_samples[source_idx, i_point]
values = field_samples[source_indices, i_point]

mixed_result[i_point] = np.mean(values)

return mixed_result
57 changes: 30 additions & 27 deletions field_synthesis/generate.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,75 @@
import argparse
import zipfile
import io
import os
import numpy as np
from field_synthesis import FieldSynthesis



def load_data(zip_path):
"""Витягує координати та всі сампли з одного ZIP-архіву."""
"""Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu."""
coords = None
all_values = []

with zipfile.ZipFile(zip_path, 'r') as z:
file_list = z.namelist()

# Načtení souřadnic
coords_file = [f for f in file_list if 'coords' in f and f.endswith('.npz')][0]
with z.open(coords_file) as f:
data = np.load(io.BytesIO(f.read()))
coords = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data

# Vybereme první pole z npz souboru
coords = data[data.files[0]]

# Načtení hodnot polí
value_files = [f for f in file_list if 'values' in f and f.endswith('.npz')]
print(f"Found {len(value_files)} source samples in ZIP.")
print(f"Nalezeno {len(value_files)} zdrojových vzorků v ZIPu.")

for name in sorted(value_files):
with z.open(name) as f:
data = np.load(io.BytesIO(f.read()))
val = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data
val = data[data.files[0]]
all_values.append(val)

return coords, np.array(all_values)

def main():
parser = argparse.ArgumentParser()
parser.add_argument("archive", help="Path to cond_tensors.zip")
parser.add_argument("-o", "--output", default="synthesis_results.zip")
parser.add_argument("--anchors", type=int, default=100)
parser.add_argument("--count", type=int, default=200)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("archive", help="Cesta k vstupnímu ZIP archivu (cond_tensors.zip)")
parser.add_argument("-o", "--output", default="synthesis_results.zip", help="Název výstupního souboru")
parser.add_argument("--anchors", type=int, default=100, help="Počet kotevních bodů (anchor points)")
parser.add_argument("--count", type=int, default=200, help="Počet polí k vygenerování")
parser.add_argument("--seed", type=int, default=42, help="Seed pro generátor náhodných čísel")
args = parser.parse_args()

# 1. Načtení dat
coords, samples_raw = load_data(args.archive)

samples = samples_raw[:, :, 0]
# Předpokládáme, že samples_raw má tvar (N, K, 1), převedeme na (N, K)
if samples_raw.ndim == 3:
samples = samples_raw[:, :, 0]
else:
samples = samples_raw

max_range = np.max(coords)
# 2. Inicializace FieldSynthesis
# Souřadnice bodů předáváme přímo do konstruktoru
fs = FieldSynthesis(
area_size=max_range,
point_coords=coords,
count_points=args.anchors,
num_source=len(samples),
dimension=3,
seed=args.seed
)

print(f"Synthesizing {args.count} fields...")
print(f"Syntetizuji {args.count} polí...")

# 3. Generování nových polí
with zipfile.ZipFile(args.output, 'w', compression=zipfile.ZIP_DEFLATED) as out_z:
for i in range(args.count):
res = fs.mix_fields(coords)
# Každé volání mix_fields vytvoří unikátní pole díky vnitřnímu stavu RNG
res = fs.mix_fields(samples)

# Uložení výsledku do paměti (io.BytesIO)
buffer = io.BytesIO()
np.save(buffer, res)

filename = f"field_{i:04d}.npy"
out_z.writestr(filename, buffer.getvalue())

if (i + 1) % 50 == 0:
print(f"Progress: {i + 1}/{args.count}")

print(f"Done! Results in {args.output}")

if __name__ == "__main__":
main()
out_z.writestr(filename, buffer.getvalue())
Loading