Skip to content
3 changes: 3 additions & 0 deletions bin/dm_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import glob
import logging
import os
import re
import sys

from docopt import docopt
Expand Down Expand Up @@ -245,6 +246,8 @@ def get_scanid_from_lookup_table(archive_path):
"""
global lookup
basename = os.path.basename(os.path.normpath(archive_path))
# Strip whitespace, because the scans.csv entries can't contain them
basename = re.sub(r'\s+', '', basename)
source_name = basename[:-len(datman.utils.get_extension(basename))]
lookupinfo = lookup[lookup["source_name"] == source_name]

Expand Down
11 changes: 8 additions & 3 deletions bin/dm_qc_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
--log-to-server If set, all log messages will also be sent to the
configured logging server. This is useful when the
script is run on the queue, since it swallows logging.
--max-mem STR The maximum memory required if submitted to the
queue. Should include the units (e.g. GB) [default: 5GB]
-q --quiet Only report errors
-v --verbose Be chatty
-d --debug Be extra chatty
Expand Down Expand Up @@ -64,6 +66,7 @@ def main():
REMAKE = arguments["--remake"]
REFRESH = arguments["--refresh"]
use_server = arguments["--log-to-server"]
max_mem = arguments["--max-mem"]
verbose = arguments["--verbose"]
debug = arguments["--debug"]
quiet = arguments["--quiet"]
Expand All @@ -81,7 +84,7 @@ def main():
logger.setLevel(logging.DEBUG)

if not session:
return submit_subjects(config)
return submit_subjects(config, max_mem=max_mem)

if not datman.dashboard.dash_found:
logger.error("Dashboard database not found, can't run.")
Expand Down Expand Up @@ -131,11 +134,13 @@ def add_server_handler(config):
logger.addHandler(server_handler)


def submit_subjects(config):
def submit_subjects(config, max_mem="5GB"):
"""Submit a job for each subject in the study that still needs metrics.

Args:
config (:obj:`datman.config.config`): A config object for the study.
max_mem (int, optional): The maximum amount of memory needed for jobs.
Default = 5GB.
"""
missing_cmds = check_prerequisites()
if missing_cmds:
Expand All @@ -157,7 +162,7 @@ def submit_subjects(config):
logger.info(f"Submitting QC job for {subject}.")
datman.utils.submit_job(
command, job_name, "/tmp", system=config.system,
argslist="--mem=5G"
argslist=f"--mem={max_mem}"
)


Expand Down
79 changes: 68 additions & 11 deletions bin/dm_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@
<study>: Short name of the study to process

Options:
--before <date> Only download scans added to the server before
the given date. Date format is expected to be
YYYY-MM-DD. Can be combined with --after.
--after <date> Only download scans added to the server after
the given date. Date format is expected to be
YYYY-MM-DD. Can be combined with --before.
-h --help Show this screen.
-q --quiet Suppress output.
-v --verbose Show more output.
-d --debug Show lots of output.
--dry-run

"""
import fnmatch
import logging
import os
import shutil
import fnmatch
import time
from datetime import datetime

import pysftp

Expand All @@ -39,6 +47,10 @@ def main():
quiet = arguments["--quiet"]
study = arguments["<study>"]

start_date, end_date = get_date_range(
arguments["--after"], arguments["--before"]
)

# setup logging
log_level = logging.WARN

Expand Down Expand Up @@ -108,7 +120,40 @@ def main():
# process each folder in turn
logger.debug("Copying from:{} to:{}"
.format(valid_dir, zips_path))
process_dir(sftp, valid_dir, zips_path)
process_dir(sftp, valid_dir, zips_path,
start=start_date, end=end_date)


def get_date_range(after_str=None, before_str=None):
if before_str:
end_date = parse_date(before_str)
else:
end_date = None

if after_str:
start_date = parse_date(after_str)
else:
start_date = None

if start_date and end_date and start_date > end_date:
raise ValueError(
"Start of date range is after the cut off date. When both "
"--after and --before are in use, --after date must precede "
"--before date"
)

return start_date, end_date


def parse_date(date_str):
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
except ValueError as e:
raise ValueError(
"Invalid date format given, expected format YYYY-MM-DD"
) from e
parsed = time.mktime(dt.timetuple())
return parsed


def get_server_config(cfg):
Expand Down Expand Up @@ -204,7 +249,7 @@ def get_valid_remote_dirs(connection, mrfolders):
return valid_dirs


def process_dir(connection, directory, zips_path):
def process_dir(connection, directory, zips_path, start=None, end=None):
"""Process a directory on the ftp server,
copy new files to zips_path
"""
Expand All @@ -218,14 +263,18 @@ def process_dir(connection, directory, zips_path):
return
for file_name in files:
if connection.isfile(file_name):
get_file(connection, file_name, zips_path)
get_file(connection, file_name, zips_path,
start=start, end=end)
else:
get_folder(connection, file_name, zips_path)
get_folder(connection, file_name, zips_path,
start=start, end=end)


def get_folder(connection, folder_name, dst_path):
def get_folder(connection, folder_name, dst_path, start=None, end=None):
expected_file = os.path.join(dst_path, folder_name + ".zip")
if not download_needed(connection, folder_name, expected_file):
if not download_needed(connection, folder_name, expected_file,
start=start, end=end
):
logger.debug("File: {} already exists, skipping".format(folder_name))
return

Expand All @@ -239,26 +288,34 @@ def get_folder(connection, folder_name, dst_path):
new_zip))


def get_file(connection, file_name, zips_path):
def get_file(connection, file_name, zips_path, start=None, end=None):
target = os.path.join(zips_path, file_name)
if download_needed(connection, file_name, target):
if download_needed(connection, file_name, target, start=start, end=end):
logger.info("Copying new remote file: {}".format(file_name))
connection.get(file_name, target, preserve_mtime=True)
else:
logger.debug("File: {} already exists, skipping".format(file_name))


def download_needed(sftp, filename, target):
def download_needed(sftp, filename, target, start=None, end=None):
"""Check if a local copy of the file exists,
If no local copy exists return True
If local copy exists and is older than remote return True
otherwise return false"""

remote_mtime = sftp.stat(filename).st_mtime

if start and remote_mtime < start:
return False

if end and remote_mtime > end:
return False

if not os.path.isfile(target):
return True

# check the file modification times
local_mtime = os.path.getmtime(target)
remote_mtime = sftp.stat(filename).st_mtime
if local_mtime < remote_mtime:
return True

Expand Down
19 changes: 17 additions & 2 deletions bin/dm_xnat_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,12 @@ def collect_zips(config, args):
return []

if args.experiment:
ident = get_identifier(config, args.experiment)
try:
ident = get_identifier(config, args.experiment)
except datman.exceptions.ParseException as e:
logger.error(f"Ignoring invalid ID {args.experiment} - {e}")
return []

if not ident:
logger.error(f"Invalid session ID {args.experiment}.")
return []
Expand All @@ -400,12 +405,22 @@ def collect_zips(config, args):

zip_files = []
for zip_path in glob.glob(os.path.join(zip_folder, "*.zip")):
if not os.path.exists(zip_path):
logger.debug(f"Broken symlink found. Ignoring {zip_path}")
continue

sess_name = os.path.basename(zip_path).replace(".zip", "")
ident = get_identifier(config, sess_name)
try:
ident = get_identifier(config, sess_name)
except datman.exceptions.ParseException as e:
logger.error(f"Ignoring invalid ID {sess_name} - {e}")
continue

if not ident:
logger.error(
f"Ignoring invalid zip file name in dicom dir: {sess_name}")
continue

zip_files.append(
(None, datman.importers.ZipImporter(ident, zip_path))
)
Expand Down
30 changes: 22 additions & 8 deletions bin/dm_xnat_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,28 @@ def process_archive(file_name, dicom_dir):
exper_id))
return


# Hold off on resource uploads until scan data exists.
# We upload using xnat's direct-to-archive options, but if any data
# exists for a session (even just resources) during upload then
# everything gets punted to the prearchive and ends up requiring a manual
# merge.
if data_exists and not resource_exists:
logger.debug("Uploading resource from: {}".format(archive_file))
try:
upload_non_dicom_data(archive_file, xnat_subject.project, scanid,
xnat)
except Exception as e:
logger.debug("An exception occurred: {}".format(e))
pass
elif not data_exists and not resource_exists:
logger.info(
f"Skipping resource upload of {archive_file} until scans exist. "
"Resource files will upload on a later run once scan upload "
"complete."
)


if not data_exists:
logger.info("Uploading dicoms from {}".format(archive_file))
try:
Expand All @@ -166,14 +188,6 @@ def process_archive(file_name, dicom_dir):
.format(archive_file, xnat_subject.project,
xnat_subject.name, e))

if not resource_exists:
logger.debug("Uploading resource from: {}".format(archive_file))
try:
upload_non_dicom_data(archive_file, xnat_subject.project, scanid,
xnat)
except Exception as e:
logger.debug("An exception occurred: {}".format(e))
pass


def get_xnat_subject(ident, xnat):
Expand Down
8 changes: 6 additions & 2 deletions datman/exporters/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ def export(self, raw_data_dir: str, **kwargs):
self.make_output_dir()

try:
self.run_dcm2bids(raw_data_dir, force_dcm2niix=force_dcm2niix)
self.run_dcm2bids(
raw_data_dir,
force_dcm2niix=force_dcm2niix,
refresh=self.opts.refresh
)
except Exception as e:
logger.error(f"Failed to extract to BIDs - {e}")

Expand Down Expand Up @@ -178,7 +182,7 @@ def _get_scan_dir(self, download_dir: str, refresh: bool = False) -> str:
if refresh:
# Use existing tmp_dir instead of raw dcms
return self.bids_tmp
return download_dir
return os.path.join(download_dir, self.dcm_dir)

def make_command(
self, raw_data_dir: str, force_dcm2niix: bool = False
Expand Down
6 changes: 5 additions & 1 deletion datman/exporters/bids_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ def check_contents(self, expected, actual):
misnamed[actual_name] = expected_name
continue

expected_name = expected[scan][0]
actual_name = actual[scan][0]
if "blacklisted" in actual_name:
# Do not 'fix' blacklisted scans by moving them back.
continue

expected_name = expected[scan][0]
if expected_name == actual_name:
continue
misnamed[actual_name] = expected_name
Expand Down
11 changes: 10 additions & 1 deletion datman/importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def __init__(self, ident, zip_path):
self.contents = self.parse_contents()
self.scans = self.get_scans()
self.resource_files = self.contents['resources']
self.dcm_subdir = os.path.split(self.scans[0].series_dir)[0]
self.dcm_subdir = self.get_dcm_subdir()
self.date = self.scans[0].date

@property
Expand Down Expand Up @@ -1138,6 +1138,15 @@ def dcm_subdir(self) -> str:
def dcm_subdir(self, value: str):
self._dcm_subdir = value

def get_dcm_subdir(self) -> str:
"""Find the common parent folder for all scan dicoms.
"""
series_dirs = [scan.series_dir for scan in self.scans]
prefix = os.path.commonpath(series_dirs)
if prefix == '':
return os.path.split(self.scans[0].series_dir)[0]
return prefix

def is_shared(self) -> bool:
# Can't track shared sessions with zip files.
return False
Expand Down
13 changes: 0 additions & 13 deletions datman/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,14 @@ class DTIMetrics(MetricDTI):
"_b0.png": QCOutput(2, "b0 Montage")
},
"qc-dti": {
"_qascripts_dti.csv": None,
"_stats.csv": None,
"_directions.png": QCOutput(3, "bvec Directions")
},
"qc-spikecount": {
"_spikecount.csv": None
}
}

def generate(self, img_gap=2, width=1600):
self.run(f"qc-dti {self.input} {self.bvec} {self.bval} "
f"{self.output_root}", "qc-dti")

self.run(f"qc-spikecount {self.input} "
f"{self.output_root + '_spikecount.csv'} {self.bval}",
"qc-spikecount")

self.make_montage(self.output_root + "_montage.png")
self.make_image(self.output_root + "_b0.png", img_gap, width)

Expand All @@ -267,10 +258,6 @@ class FMRIMetrics(Metric):
"_scanlengths.csv": None
},
"qc-fmri": {
"_fd.csv": None,
"_qascripts_bold.csv": None,
"_spectra.csv": None,
"_stats.csv": None,
"_sfnr.nii.gz": None,
"_corr.nii.gz": None
},
Expand Down
Loading