From d928500871e3294ee0adcc2d2971a526f5ed29d7 Mon Sep 17 00:00:00 2001 From: Stefano Piani Date: Fri, 19 Jun 2026 17:55:27 +0200 Subject: [PATCH] Improved var_aggregator.py using a dummy communicator Added the possibility to run the var_aggregator.py script in serial mode, without importing and using MPI. This commit also replaces the print calls with a logging mechanism and makes the argparse more robust by specifying types and using path instead of strings. --- var_aggregator.py | 138 ++++++++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 48 deletions(-) diff --git a/var_aggregator.py b/var_aggregator.py index 8b04e79..8390c5a 100644 --- a/var_aggregator.py +++ b/var_aggregator.py @@ -1,32 +1,39 @@ import argparse +from bitsea.utilities.argparse_types import existing_dir_path +from bitsea.utilities.argparse_types import existing_file_path +from bitsea.utilities.argparse_types import path_inside_an_existing_dir + def argument(): parser = argparse.ArgumentParser(description = ''' Creates ave files with aggregated var for aveScan.py. Avescan.py. Creates also chl sup.''') parser.add_argument( '--inputdir', '-i', - type = str, + type = existing_dir_path, required = True, help = '/some/path/MODEL/AVE_FREQ_1/') parser.add_argument( '--tmpdir', '-t', - type = str, + type = path_inside_an_existing_dir, default = None, + required = False, help = """ /some/path/POSTPROC/output/AVE_FREQ_1/TMP/ . Path to put files with aggregated variables for aveScan.py. No generation of aggregate files if this parameter is omitted. """) parser.add_argument( '--archivedir', '-a', - type = str, + type = path_inside_an_existing_dir, default = None, + required=False, help = '''/some/path/POSTPROC/output/AVE_FREQ_1/Archive/ . Path to put native vars as they are, in order to compress them. No generation of archived files if this parameter is omitted. ''') parser.add_argument( '--chlsupdir', '-c', - type = str, + type = path_inside_an_existing_dir, default = None, + required=False, help = """/some/path/POSTPROC/output/AVE_FREQ_1/CHL_SUP. No generation of chl sup if this parameter is omitted. """) @@ -35,11 +42,12 @@ def argument(): default = "ave*N1p.nc", help = 'ave*.N1p.nc, they configure the date list') parser.add_argument( '--descriptor',"-d", - type = str, + type = existing_file_path, required = True, help = 'VarDescriptor_1.xml, or the complete path') parser.add_argument( '--maskfile',"-m", - type = str, + type = existing_file_path, + required = True, help = '''Path of the mask file''') parser.add_argument( '--serial', '-s', action='store_true', @@ -50,75 +58,109 @@ def argument(): args = argument() -import glob -import os -import GB_lib as G + +import logging + import read_descriptor +import GB_lib as G from bitsea.commons.mask import Mask +from bitsea.utilities.mpi_serial_interface import DummyCommunicator +from bitsea.utilities.mpi_serial_interface import get_mpi_communicator + +LOGGER = logging.getLogger() + if args.serial: - rank = 0 - nranks = 1 + comm = DummyCommunicator() else: try: from mpi4py import MPI - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - nranks = comm.size - except: - raise ValueError("mpi4py not found; use the -s flag to force serial execution") + comm = get_mpi_communicator() + except Exception: + raise ValueError( + "mpi4py not found; use the -s flag to force serial execution" + ) + + +def configure_logger(communicator=comm): + if communicator.size == 1: + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + else: + formatter = logging.Formatter( + f"%(asctime)s - RANK {communicator.Get_rank():0>3} - %(name)s - " + f"%(levelname)s - %(message)s" + ) + LOGGER.setLevel(logging.INFO) + + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + + LOGGER.addHandler(handler) -def addsep(string): - if string[-1] != os.sep: - return string + os.sep - else: - return string - - -AVEDIR = addsep(args.inputdir) +configure_logger() +AVEDIR = args.inputdir RD = read_descriptor.read_descriptor(args.descriptor) -PATH_NAME = AVEDIR + args.avelist -if rank==0 : print("INPUT_DIR =", AVEDIR) +LOGGER.info("INPUT_DIR = %s", AVEDIR) if args.archivedir: - ARCHIVEdir = addsep(args.archivedir) - if rank==0 : print("ARCHIVEDIR=", ARCHIVEdir) - os.system("mkdir -p " + ARCHIVEdir) + ARCHIVEdir = args.archivedir + if comm.Get_rank() == 0: + LOGGER.info("ARCHIVEDIR = %s", ARCHIVEdir) + ARCHIVEdir.mkdir(exist_ok=True) if args.tmpdir: - TMPOUTdir = addsep(args.tmpdir) - if rank==0 : print("TMPOUTDIR= ", TMPOUTdir) - os.system("mkdir -p " + TMPOUTdir) + TMPOUTdir = args.tmpdir + if comm.Get_rank() == 0: + LOGGER.info("TMPOUTDIR = %s", TMPOUTdir) + TMPOUTdir.mkdir(exist_ok=True) if args.chlsupdir: - CHLSUPdir = addsep(args.chlsupdir) - if rank==0 : print("CHLSUPDIR =", CHLSUPdir) - os.system("mkdir -p " + CHLSUPdir) + CHLSUPdir = args.chlsupdir + if comm.Get_rank() == 0: + LOGGER.info("CHLSUPDIR = %s", CHLSUPdir) + CHLSUPdir.mkdir(exist_ok=True) -SingleVar_filelist=glob.glob(PATH_NAME) -SingleVar_filelist.sort() +comm.barrier() + +SingleVar_filelist = sorted(AVEDIR.glob(args.avelist)) try: - TheMask=Mask.from_file(args.maskfile) + TheMask = Mask.from_file(args.maskfile) except AttributeError: - TheMask=Mask(args.maskfile) + TheMask = Mask(args.maskfile) + +rank = comm.Get_rank() +nranks = comm.size for N1pfile in SingleVar_filelist[rank::nranks]: - dailyAve = os.path.basename(N1pfile) - print("writing ", dailyAve,flush=True) + LOGGER.info("Writing %s", N1pfile.name) if args.tmpdir: - G.WriteAggregateAvefiles(TheMask, N1pfile, AVEDIR, TMPOUTdir, TMPOUTdir, RD) + G.WriteAggregateAvefiles( + TheMask, + str(N1pfile), + str(AVEDIR) + "/", + str(TMPOUTdir) + "/", + str(TMPOUTdir) + "/", + RD + ) - if args.chlsupdir: - F = G.filename_manager(N1pfile) - chl3dfile = TMPOUTdir + F.prefix + "." + F.datestr + ".P_l.nc" - chl2dfile = CHLSUPdir + F.prefix + "." + F.datestr + ".P_l.nc" - if chl3dfile.find('after')>-1: + if args.chlsupdir: + if not args.tmpdir: + raise ValueError( + "tmpdir must be specified if chlsupdir is specified" + ) + F = G.filename_manager(str(N1pfile)) + chl3dfile = TMPOUTdir / (F.prefix + "." + F.datestr + ".P_l.nc") + chl2dfile = CHLSUPdir / (F.prefix + "." + F.datestr + ".P_l.nc") + if str(chl3dfile).find('after')>-1: chlvar = 'TRNP_l' else: chlvar = 'P_l' - G.writeChlSup(chl3dfile, chl2dfile,chlvar) + G.writeChlSup(str(chl3dfile), str(chl2dfile), chlvar)