Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 90 additions & 48 deletions var_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
import argparse
from bitsea.utilities.argparse_types import existing_dir_path
from bitsea.utilities.argparse_types import existing_file_path
from bitsea.utilities.argparse_types import path_inside_an_existing_dir


def argument():
parser = argparse.ArgumentParser(description = '''
Creates ave files with aggregated var for aveScan.py.
Avescan.py. Creates also chl sup.''')
parser.add_argument( '--inputdir', '-i',
type = str,
type = existing_dir_path,
required = True,
help = '/some/path/MODEL/AVE_FREQ_1/')

parser.add_argument( '--tmpdir', '-t',
type = str,
type = path_inside_an_existing_dir,
default = None,
required = False,
help = """ /some/path/POSTPROC/output/AVE_FREQ_1/TMP/ .
Path to put files with aggregated variables for aveScan.py.
No generation of aggregate files if this parameter is omitted.
""")
parser.add_argument( '--archivedir', '-a',
type = str,
type = path_inside_an_existing_dir,
default = None,
required=False,
help = '''/some/path/POSTPROC/output/AVE_FREQ_1/Archive/ .
Path to put native vars as they are, in order to compress them.
No generation of archived files if this parameter is omitted.
''')

parser.add_argument( '--chlsupdir', '-c',
type = str,
type = path_inside_an_existing_dir,
default = None,
required=False,
help = """/some/path/POSTPROC/output/AVE_FREQ_1/CHL_SUP.
No generation of chl sup if this parameter is omitted.
""")
Expand All @@ -35,11 +42,12 @@ def argument():
default = "ave*N1p.nc",
help = 'ave*.N1p.nc, they configure the date list')
parser.add_argument( '--descriptor',"-d",
type = str,
type = existing_file_path,
required = True,
help = 'VarDescriptor_1.xml, or the complete path')
parser.add_argument( '--maskfile',"-m",
type = str,
type = existing_file_path,
required = True,
help = '''Path of the mask file''')
Comment thread
spiani marked this conversation as resolved.
parser.add_argument( '--serial', '-s',
action='store_true',
Expand All @@ -50,75 +58,109 @@ def argument():

args = argument()

import glob
import os
import GB_lib as G

import logging

import read_descriptor
import GB_lib as G
from bitsea.commons.mask import Mask
from bitsea.utilities.mpi_serial_interface import DummyCommunicator
from bitsea.utilities.mpi_serial_interface import get_mpi_communicator

LOGGER = logging.getLogger()


if args.serial:
rank = 0
nranks = 1
comm = DummyCommunicator()
else:
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nranks = comm.size
except:
raise ValueError("mpi4py not found; use the -s flag to force serial execution")
comm = get_mpi_communicator()
except Exception:
raise ValueError(
"mpi4py not found; use the -s flag to force serial execution"
)


def configure_logger(communicator=comm):
if communicator.size == 1:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
else:
formatter = logging.Formatter(
f"%(asctime)s - RANK {communicator.Get_rank():0>3} - %(name)s - "
f"%(levelname)s - %(message)s"
)

LOGGER.setLevel(logging.INFO)

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)

LOGGER.addHandler(handler)

def addsep(string):
if string[-1] != os.sep:
return string + os.sep
else:
return string



AVEDIR = addsep(args.inputdir)
configure_logger()
AVEDIR = args.inputdir

RD = read_descriptor.read_descriptor(args.descriptor)
PATH_NAME = AVEDIR + args.avelist
if rank==0 : print("INPUT_DIR =", AVEDIR)
LOGGER.info("INPUT_DIR = %s", AVEDIR)
Comment thread
spiani marked this conversation as resolved.


if args.archivedir:
ARCHIVEdir = addsep(args.archivedir)
if rank==0 : print("ARCHIVEDIR=", ARCHIVEdir)
os.system("mkdir -p " + ARCHIVEdir)
ARCHIVEdir = args.archivedir
if comm.Get_rank() == 0:
LOGGER.info("ARCHIVEDIR = %s", ARCHIVEdir)
ARCHIVEdir.mkdir(exist_ok=True)
Comment thread
spiani marked this conversation as resolved.
Comment thread
spiani marked this conversation as resolved.

Comment thread
spiani marked this conversation as resolved.
if args.tmpdir:
TMPOUTdir = addsep(args.tmpdir)
if rank==0 : print("TMPOUTDIR= ", TMPOUTdir)
os.system("mkdir -p " + TMPOUTdir)
TMPOUTdir = args.tmpdir
if comm.Get_rank() == 0:
LOGGER.info("TMPOUTDIR = %s", TMPOUTdir)
TMPOUTdir.mkdir(exist_ok=True)
Comment thread
spiani marked this conversation as resolved.

if args.chlsupdir:
CHLSUPdir = addsep(args.chlsupdir)
if rank==0 : print("CHLSUPDIR =", CHLSUPdir)
os.system("mkdir -p " + CHLSUPdir)
CHLSUPdir = args.chlsupdir
if comm.Get_rank() == 0:
LOGGER.info("CHLSUPDIR = %s", CHLSUPdir)
CHLSUPdir.mkdir(exist_ok=True)
Comment thread
spiani marked this conversation as resolved.

SingleVar_filelist=glob.glob(PATH_NAME)
SingleVar_filelist.sort()
comm.barrier()

SingleVar_filelist = sorted(AVEDIR.glob(args.avelist))
try:
TheMask=Mask.from_file(args.maskfile)
TheMask = Mask.from_file(args.maskfile)
except AttributeError:
TheMask=Mask(args.maskfile)
TheMask = Mask(args.maskfile)


rank = comm.Get_rank()
nranks = comm.size
for N1pfile in SingleVar_filelist[rank::nranks]:
dailyAve = os.path.basename(N1pfile)
print("writing ", dailyAve,flush=True)
LOGGER.info("Writing %s", N1pfile.name)

if args.tmpdir:
G.WriteAggregateAvefiles(TheMask, N1pfile, AVEDIR, TMPOUTdir, TMPOUTdir, RD)
G.WriteAggregateAvefiles(
TheMask,
str(N1pfile),
str(AVEDIR) + "/",
str(TMPOUTdir) + "/",
str(TMPOUTdir) + "/",
RD
)

if args.chlsupdir:
F = G.filename_manager(N1pfile)
chl3dfile = TMPOUTdir + F.prefix + "." + F.datestr + ".P_l.nc"
chl2dfile = CHLSUPdir + F.prefix + "." + F.datestr + ".P_l.nc"
if chl3dfile.find('after')>-1:
if args.chlsupdir:
if not args.tmpdir:
raise ValueError(
"tmpdir must be specified if chlsupdir is specified"
)
F = G.filename_manager(str(N1pfile))
chl3dfile = TMPOUTdir / (F.prefix + "." + F.datestr + ".P_l.nc")
chl2dfile = CHLSUPdir / (F.prefix + "." + F.datestr + ".P_l.nc")
if str(chl3dfile).find('after')>-1:
chlvar = 'TRNP_l'
else:
chlvar = 'P_l'
G.writeChlSup(chl3dfile, chl2dfile,chlvar)
G.writeChlSup(str(chl3dfile), str(chl2dfile), chlvar)