Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions field_synthesis/field_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class FieldSynthesis():
"""
Třída pro syntézu prostorových polí pomocí generování kotevních bodů
a jejich následného míchání.
"""
"""

Assumption: The set of points where the input and output fields live is the same.

Inicializuje parametry syntézy pole.

Args:
Expand All @@ -21,13 +22,31 @@ class FieldSynthesis():
num_source (int): Počet dostupných zdrojů (typů polí).
dimension (int): Dimenze prostoru (2D, 3D).
seed (int): Seed pro generátor náhodných čísel.

TODO: add parameter point_coords shape=(K, dimension)

"""

area_size: int
# TODO: compute from point_coords as volume of the axis aligned bounding box.
# must be float.
# next, minimum distance is vol ** (1/dim) / safety_factor , e.g. saftey_factor = 2.0
# should be in interval [1, 2].

count_points: int
# Suggested number of anchor points, could be refined due to minimal distance.

num_source: int
# Number of field samples we will mix;
# TODO: remove this, we will know that only
# after passing the field samples to the mixing function.

dimension: int = 2
# TODO: remove, diven by the shape of point_coords

free_space_ratio: float = 0.4
# TODO: Remove, given indirectly through `count_points`

seed: int = 42

# self.min_distance = self.calc_distance()
Expand Down Expand Up @@ -108,6 +127,9 @@ def spatial_points(self, target_points, k_neighbors=5) -> list:
Poté aplikujeme vektorizovanou masku pro filtrování sousedů, kteří jsou dále než 2 * D.
Args:
target_points (np.ndarray): Pole cílových bodů, pro které chceme naj

TODO: could be refactored into cached_property since target_points == point_coords are given
in constructor.
"""

tree = cKDTree(self.anchor_points)
Expand Down Expand Up @@ -135,19 +157,25 @@ def mix_fields(self, target_points) -> np.ndarray:
Pokud má sousedů více, provedeme průměr jejich hodnot. Pokud žádného
Args:
target_points (np.ndarray): Pole cílových bodů, pro které chceme získat smíšené hodnoty.
TODO: pass in field samples, field_samples shape = (N, K)
N .. number of samples
K ... number of points == len(point_coords)
"""

neighbor_indices_list = self.spatial_points(target_points)
# List of lists of anchor neighbours of each point.

mixed_results = []
for neighbors in neighbor_indices_list:
for i_point, neighbors in enumerate(neighbor_indices_list):
if len(neighbors) == 0:
mixed_results.append(np.nan)
elif len(neighbors) == 1:
mixed_results.append(self.fields_indices[neighbors[0]])
else:
# "Provedeme průměr, pokud jich zbyde více
# TODO: use the field samples and compute mean of their values
# mixed_samples = field_samples[self.fields_indices[neighbors], i_point] # shape (len(neighbours), )
# value = np.mean( mixed_samples)
values = self.fields_indices[neighbors]
mixed_results.append(np.mean(values))

return np.array(mixed_results)
return np.array(mixed_results)
54 changes: 28 additions & 26 deletions field_synthesis/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,69 @@
from field_synthesis import FieldSynthesis

def load_data(zip_path):
"""Витягує координати та всі сампли з одного ZIP-архіву."""
"""Extrahuje souřadnice a všechny vzorky z jednoho ZIP archivu."""
coords = None
all_values = []

with zipfile.ZipFile(zip_path, 'r') as z:
file_list = z.namelist()

# Načtení souřadnic
coords_file = [f for f in file_list if 'coords' in f and f.endswith('.npz')][0]
with z.open(coords_file) as f:
data = np.load(io.BytesIO(f.read()))
coords = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data

# Vybereme první pole z npz souboru
coords = data[data.files[0]]

# Načtení hodnot polí
value_files = [f for f in file_list if 'values' in f and f.endswith('.npz')]
print(f"Found {len(value_files)} source samples in ZIP.")
print(f"Nalezeno {len(value_files)} zdrojových vzorků v ZIPu.")

for name in sorted(value_files):
with z.open(name) as f:
data = np.load(io.BytesIO(f.read()))
val = data[data.files[0]] if isinstance(data, np.lib.npyio.NpzFile) else data
val = data[data.files[0]]
all_values.append(val)

return coords, np.array(all_values)

def main():
parser = argparse.ArgumentParser()
parser.add_argument("archive", help="Path to cond_tensors.zip")
parser.add_argument("-o", "--output", default="synthesis_results.zip")
parser.add_argument("--anchors", type=int, default=100)
parser.add_argument("--count", type=int, default=200)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("archive", help="Cesta k vstupnímu ZIP archivu (cond_tensors.zip)")
parser.add_argument("-o", "--output", default="synthesis_results.zip", help="Název výstupního souboru")
parser.add_argument("--anchors", type=int, default=100, help="Počet kotevních bodů (anchor points)")
parser.add_argument("--count", type=int, default=200, help="Počet polí k vygenerování")
parser.add_argument("--seed", type=int, default=42, help="Seed pro generátor náhodných čísel")
args = parser.parse_args()

# 1. Načtení dat
coords, samples_raw = load_data(args.archive)

samples = samples_raw[:, :, 0]
# Předpokládáme, že samples_raw má tvar (N, K, 1), převedeme na (N, K)
if samples_raw.ndim == 3:
samples = samples_raw[:, :, 0]
else:
samples = samples_raw

max_range = np.max(coords)
# 2. Inicializace FieldSynthesis
# Souřadnice bodů předáváme přímo do konstruktoru
fs = FieldSynthesis(
area_size=max_range,
point_coords=coords,
count_points=args.anchors,
num_source=len(samples),
dimension=3,
seed=args.seed
)

print(f"Synthesizing {args.count} fields...")
print(f"Syntetizuji {args.count} polí...")

# 3. Generování nových polí
with zipfile.ZipFile(args.output, 'w', compression=zipfile.ZIP_DEFLATED) as out_z:
for i in range(args.count):
res = fs.mix_fields(coords)
# Každé volání mix_fields vytvoří unikátní pole díky vnitřnímu stavu RNG
res = fs.mix_fields(samples)

# Uložení výsledku do paměti (io.BytesIO)
buffer = io.BytesIO()
np.save(buffer, res)

filename = f"field_{i:04d}.npy"
out_z.writestr(filename, buffer.getvalue())

if (i + 1) % 50 == 0:
print(f"Progress: {i + 1}/{args.count}")

print(f"Done! Results in {args.output}")

if __name__ == "__main__":
main()
out_z.writestr(filename, buffer.getvalue())