Browse Source

Reestablish code after moving

Wu Jianxiao 5 months ago
parent
commit
167b599238

+ 25 - 0
hcpdiffpy/container.py

@@ -0,0 +1,25 @@
+from pathlib import Path
+from typing import Union
+
+base_dir = Path(__file__).resolve().parent.parent.parent
+
+
+class SimgCmd:
+    def __init__(self, config: dict, simg: Union[str, None]) -> None:
+        if simg is None:
+            self.command = None
+        else:
+            self.command = (f"singularity run -B {config['work_dir']}:{config['work_dir']},"
+                        f"{config['output_dir']}:{config['output_dir']},{base_dir}:{base_dir}")
+            self._simg = simg
+
+    def cmd(self, command: str, options: Union[str, None] = None) -> str:
+        if self.command is None:
+            run_cmd = command
+        else:
+            if options is None:
+                run_cmd = f"{self.command} {self._simg} {command}"
+            else:
+                run_cmd = f"{self.command} {options} {self._simg} {command}"
+
+        return run_cmd

+ 58 - 131
hcpdiffpy/interfaces/data.py

@@ -1,166 +1,93 @@
 from pathlib import Path
+from shutil import copyfile
 import logging
 
 from nipype.interfaces.base import BaseInterfaceInputSpec, TraitedSpec, SimpleInterface, traits
 import datalad.api as dl
 
-
 logging.getLogger('datalad').setLevel(logging.WARNING)
 
-dataset_url = {
-    'HCP-YA': 'git@github.com:datalad-datasets/human-connectome-project-openaccess.git',
-    'HCP-A': 'git@jugit.fz-juelich.de:inm7/datasets/datasets_repo.git',
-    'HCP-D': 'git@jugit.fz-juelich.de:inm7/datasets/datasets_repo.git',
-    'ABCD': 'git@jugit.fz-juelich.de:inm7/datasets/datasets_repo.git'}
-
 
-class _InitDiffusionDataInputSpec(BaseInterfaceInputSpec):
-    dataset = traits.Str(
-        mandatory=True, desc='name of dataset to get (HCP-YA, HCP-A, HCP-D, ABCD, UKB)')
-    work_dir = traits.Directory(mandatory=True, desc='absolute path to work directory')
-    subject = traits.Str(mandatory=True, desc='subject ID')
-    int_dir = traits.Directory(desc='directory containing intermediate files from previous runs')
-    output_dir = traits.Directory(mandatory=True, desc='absolute path to output directory')
+class _InitDataInputSpec(BaseInterfaceInputSpec):
+    config = traits.Dict(mandatory=True, desc="Workflow configurations")
 
 
-class _InitDiffusionDataOutputSpec(TraitedSpec):
+class _InitDataOutputSpec(TraitedSpec):
     d_files = traits.Dict(dtype=Path, desc='filenames of diffusion data')
     t1_files = traits.Dict(dtype=Path, desc='filenames of T1w data')
     fs_files = traits.Dict(dtype=Path, desc='filenames of FreeSurfer outputs')
-    dataset_dir = traits.Directory(desc='absolute path to installed root dataset')
-    fs_dir = traits.Directory(desc='FreeSurfer subject directory')
 
 
-class InitDiffusionData(SimpleInterface):
-    """Instal and get subject-specific diffusion data"""
-    input_spec = _InitDiffusionDataInputSpec
-    output_spec = _InitDiffusionDataOutputSpec
+class InitData(SimpleInterface):
+    """Get subject-specific diffusion data"""
+    input_spec = _InitDataInputSpec
+    output_spec = _InitDataOutputSpec
 
     def _run_interface(self, runtime):
-        self._results['dataset_dir'] = Path(self.inputs.work_dir, self.inputs.subject)
-        dataset_dirs = {
-            'HCP-YA': Path(self._results['dataset_dir']),
-            'HCP-A': Path(self._results['dataset_dir'], 'original', 'hcp', 'hcp_aging'),
-            'HCP-D': Path(self._results['dataset_dir'], 'original', 'hcp', 'hcp_development'),
-            'ABCD': Path(self._results['dataset_dir'], 'original', 'abcd', 'abcd-hcp-pipeline')}
-        dataset_dir = dataset_dirs[self.inputs.dataset]
-
-        if self.inputs.dataset == 'HCP-A' or self.inputs.dataset == 'HCP-D':
-            source = 'inm7-storage'
-        else:
-            source = None
-
-        # install datasets
-        dl.install(
-            path=self._results['dataset_dir'], source=dataset_url[self.inputs.dataset],
-            on_failure='stop')
-        dl.get(
-            path=dataset_dir, dataset=self._results['dataset_dir'], get_data=False, source=source,
-            on_failure='stop')
-        if self.inputs.dataset == 'HCP-YA':
-            subject_dir = Path(dataset_dir, 'HCP1200', self.inputs.subject)
-        else:
-            subject_dir = Path(dataset_dir, self.inputs.subject)
-        dl.get(
-            path=subject_dir, dataset=dataset_dir, get_data=False, source=source,
-            on_failure='stop')
-
-        if self.inputs.dataset == 'HCP-A' or self.inputs.dataset == 'HCP-D':
-            if self.inputs.int_dir:
-                d_dir = self.inputs.int_dir
-                d_files = {
-                    'data': Path(d_dir, 'data.nii.gz'), 'bval': Path(d_dir, 'bvals'),
-                    'bvec': Path(d_dir, 'bvecs'), 'mask': Path(d_dir, 'nodif_brain_mask.nii.gz')}
-            else:
-                d_dir = Path(subject_dir, 'unprocessed', 'Diffusion')
+        sub_dir = self.inputs.config["subject_dir"]
+        dataset_dir = self.inputs.config["dataset_dir"]
+        source = self.inputs.config["source"]
+
+        d_dir = Path(sub_dir, "unprocessed", "Diffusion")
+        anat_dir = Path(sub_dir, "T1w")
+        fs_dir = Path(anat_dir, self.inputs.config["subject"])
+        if dataset_dir is not None:
+            for dir in sub_dir, d_dir.parent, anat_dir:
                 dl.get(
-                    path=d_dir.parent, dataset=dataset_dir, get_data=False, source=source,
-                    on_failure='stop')
-                d_files = {}
-                for dirs in [98, 99]:
-                    for phase in ['AP', 'PA']:
-                        for ftype in ['.nii.gz', '.bval', '.bvec']:
-                            key = f'dir{dirs}_{phase}{ftype}'
-                            d_files[key] = Path(d_dir, f'{self.inputs.subject}_dMRI_{key}')
-        elif self.inputs.dataset == 'HCP-YA':
-            d_dir = Path(subject_dir, 'T1w', 'Diffusion')
-            dl.get(
-                path=d_dir.parent, dataset=dataset_dir, get_data=False, source=source,
-                on_failure='stop')
-            d_files = {
-                'data': Path(d_dir, 'data.nii.gz'), 'bval': Path(d_dir, 'bvals'),
-                'bvec': Path(d_dir, 'bvecs'), 'mask': Path(d_dir, 'nodif_brain_mask.nii.gz')}
-
-        self._results['d_files'] = d_files.copy()
-        for key, val in d_files.items():
-            if val.is_symlink():
+                    path=dir, dataset=dataset_dir, get_data=False, source=source, on_failure='stop')
+
+        d_files = {}
+        for ndir in self.inputs.config["ndirs"]:
+            for phase in self.inputs.config["phases"]:
+                for file_type in [".nii.gz", ".bval", ".bvec"]:
+                    key = f"dir{ndir}_{phase}{file_type}"
+                    d_files[key] = Path(d_dir, f"{self.inputs.subject}_dMRI_{key}")
+        self._results["d_files"] = d_files.copy()
+        if dataset_dir is not None:
+            for key, val in d_files.items():
                 dl.get(path=val, dataset=d_dir.parent, source=source, on_failure='stop')
-            elif not self.inputs.int_dir:
-                self._results['d_files'][key] = ''
-
-        anat_dir = Path(subject_dir, 'T1w')
-        fs_dir = Path(subject_dir, 'T1w', self.inputs.subject)
-        mni_dir = Path(subject_dir, 'MNINonLinear')
-        dl.get(
-            path=anat_dir, dataset=dataset_dir, get_data=False, source=source, on_failure='stop')
-        dl.get(
-            path=mni_dir, dataset=dataset_dir, get_data=False, source=source, on_failure='stop')
 
         fs_files = {
-            'lh_aparc': Path(fs_dir, 'label', 'lh.aparc.annot'),
-            'rh_aparc': Path(fs_dir, 'label', 'rh.aparc.annot'),
-            'lh_pial': Path(fs_dir, 'surf', 'lh.pial'),
-            'rh_pial': Path(fs_dir, 'surf', 'rh.pial'),
-            'lh_white': Path(fs_dir, 'surf', 'lh.white'),
-            'rh_white': Path(fs_dir, 'surf', 'rh.white'),
             'lh_white_deformed': Path(fs_dir, 'surf', 'lh.white.deformed'),
             'rh_white_deformed': Path(fs_dir, 'surf', 'rh.white.deformed'),
-            'lh_reg': Path(fs_dir, 'surf', 'lh.sphere.reg'),
-            'rh_reg': Path(fs_dir, 'surf', 'rh.sphere.reg'),
-            'lh_cort_label': Path(fs_dir, 'label', 'lh.cortex.label'),
-            'rh_cort_label': Path(fs_dir, 'label', 'rh.cortex.label'),
-            'lh_ribbon': Path(fs_dir, 'mri', 'lh.ribbon.mgz'),
-            'rh_ribbon': Path(fs_dir, 'mri', 'rh.ribbon.mgz'),
-            'ribbon': Path(fs_dir, 'mri', 'ribbon.mgz'),
-            'aseg': Path(fs_dir, 'mri', 'aseg.mgz'),
-            'aparc_aseg': Path(fs_dir, 'mri', 'aparc+aseg.mgz'),
-            'orig': Path(fs_dir, 'mri', 'orig.mgz'),
-            'brain_mask': Path(fs_dir, 'mri', 'brainmask.mgz'),
-            'talaraich_xfm': Path(fs_dir, 'mri', 'transforms', 'talairach.xfm'),
-            'norm': Path(fs_dir, 'mri', 'norm.mgz'),
-            'eye': Path(fs_dir, 'mri', 'transforms', 'eye.dat'),
-            'lh_thickness': Path(fs_dir, 'surf', 'lh.thickness'),
-            'rh_thickness': Path(fs_dir, 'surf', 'rh.thickness')}
-        self._results['fs_dir'] = fs_dir
-
+            'eye': Path(fs_dir, 'mri', 'transforms', 'eye.dat')}
         self._results['fs_files'] = fs_files.copy()
-        for key, val in fs_files.items():
-            if val.is_symlink():
+        if dataset_dir is not None:
+            for key, val in fs_files.items():
                 dl.get(path=val, dataset=anat_dir, source=source, on_failure='stop')
-            else:
-                self._results['fs_files'][key] = ''
 
         t1_files = {
             't1': Path(anat_dir, 'T1w_acpc_dc.nii.gz'),
             't1_restore': Path(anat_dir, 'T1w_acpc_dc_restore.nii.gz'),
             't1_restore_brain': Path(anat_dir, 'T1w_acpc_dc_restore_brain.nii.gz'),
             'bias': Path(anat_dir, 'BiasField_acpc_dc.nii.gz'),
-            'fs_mask': Path(anat_dir, 'brainmask_fs.nii.gz'),
-            't1_to_mni': Path(mni_dir, 'xfms', 'acpc_dc2standard.nii.gz'),
-            'aseg': Path(mni_dir, 'mri', 'aseg.mgz'),
-            'aparc_aseg': Path(mni_dir, 'mri', 'aparc+aseg.mgz'),
-            'brainmask': Path(mni_dir, 'mri', 'brainmask.mgz'),
-            'talairach_xfm': Path(mni_dir, 'mri', 'xfms', 'talairach.xfm'),
-            'norm': Path(mni_dir, 'mri', 'norm.mgz')}
-
+            'fs_mask': Path(anat_dir, 'brainmask_fs.nii.gz')}
         self._results['t1_files'] = t1_files.copy()
-        for key, val in t1_files.items():
-            if val.is_symlink():
-                if key == 't1_to_mni' or key == 'aseg':
-                    dl.get(path=val, dataset=mni_dir, source=source, on_failure='stop')
-                else:
-                    dl.get(path=val, dataset=anat_dir, source=source, on_failure='stop')
-            else:
-                self._results['t1_files'][key] = ''
+        if dataset_dir is not None:
+            for key, val in t1_files.items():
+                dl.get(path=val, dataset=anat_dir, source=source, on_failure='stop')
+
+        return runtime
+
+
+class _SaveDataInputSpec(BaseInterfaceInputSpec):
+    config = traits.Dict(mandatory=True, desc="Workflow configurations")
+    data_file = traits.File(mandatory=True, exists=True, desc="Diffusion data file")
+    bval_file = traits.File(mandatory=True, exists=True, desc="b value file")
+    bvec_file = traits.File(mandatory=True, exists=True, desc="b vector file")
+    mask_file = traits.File(mandatory=True, exists=True, desc="mask file")
+
+
+class SaveData(SimpleInterface):
+    """Save processed diffusion data to output folder"""
+    input_spec = _SaveDataInputSpec
+
+    def _run_interface(self, runtime):
+        out_dir = self.inputs.config["output_dir"]
+        out_dir.mkdir(parents=True, exist_ok=True)
+        copyfile(self.inputs.data_file, Path(out_dir, "data.nii.gz"))
+        copyfile(self.inputs.bval_file, Path(out_dir, "bvals"))
+        copyfile(self.inputs.bvec_file, Path(out_dir, "bvecs"))
+        copyfile(self.inputs.mask_file, Path(out_dir, "nodif_brain_mask.nii.gz"))
 
         return runtime

File diff suppressed because it is too large
+ 175 - 618
hcpdiffpy/interfaces/preproc.py


+ 245 - 0
hcpdiffpy/interfaces/utilities.py

@@ -0,0 +1,245 @@
+import itertools
+import subprocess
+
+from nipype.interfaces.base import BaseInterfaceInputSpec, TraitedSpec, SimpleInterface, traits
+import nibabel as nib
+
+
+class _PickDiffFilesInputSpec(BaseInterfaceInputSpec):
+    d_files = traits.Dict(mandatory=True, desc="collection of diffusion files")
+    ndir = traits.Str(mandatory=True, desc="Number of directions to pick")
+    phase = traits.Str(mandatory=True, desc="Phase encoding direction to pick")
+
+
+class _PickDiffFilesOutputSpec(TraitedSpec):
+    data_file = traits.File(exists=True, desc="Diffusion data files")
+    bval_file = traits.File(exists=True, desc="b value files")
+    bvec_file = traits.File(exists=True, desc="b vector files")
+
+
+class PickDiffFiles(SimpleInterface):
+    """Pick the diffusion files for one number of directions and phase encoding direction"""
+    input_spec = _PickDiffFilesInputSpec
+    output_spec = _PickDiffFilesOutputSpec
+
+    def _run_interface(self, runtime):
+        key = f"dir{self.inputs.ndir}_{self.inputs.phase}"
+        self._results["data_file"] = self.inputs.d_files[f"{key}.nii.gz"]
+        self._results["bval_file"] = self.inputs.d_files[f"{key}.bval"]
+        self._results["bvec_file"] = self.inputs.d_files[f"{key}.bvec"]
+        return runtime
+
+
+class _UpdateDiffFilesInputSpec(BaseInterfaceInputSpec):
+    config = traits.Dict(mandatory=True, desc="Workflow configurations")
+    d_files = traits.Dict(mandatory=True, desc="Collection of diffusion files")
+    data_files = traits.List(mandatory=True, desc="New data files")
+
+
+class _UpdateDiffFilesOutputSpec(TraitedSpec):
+    d_files = traits.Dict(mandatory=True, desc="Collection of diffusion files")
+
+
+class UpdateDiffFiles(SimpleInterface):
+    """Update diffusion files collection with new data files"""
+    input_spec = _UpdateDiffFilesInputSpec
+    output_spec = _UpdateDiffFilesOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["d_files"] = self.inputs.d_files.copy()
+        for key in self.inputs.config["keys"]:
+            dwi_key = [
+                d_key for d_key in self.inputs.d_files if key in d_key and '.nii.gz' in d_key]
+            dwi_replace = [d_file for d_file in self.inputs.dwi_replacements if key in str(d_file)]
+            self._results["d_files"][dwi_key[0]] = dwi_replace[0]
+        return runtime
+
+
+class _SplitDiffFilesInputSpec(BaseInterfaceInputSpec):
+    d_files = traits.Dict(mandatory=True, desc="Collection of diffusion files")
+
+
+class _SplitDiffFilesOutputSpec(TraitedSpec):
+    data_files = traits.List(exists=True, desc="Diffusion data files")
+    bval_files = traits.List(exists=True, desc="b value files")
+    bvec_files = traits.List(exists=True, desc="b vector files")
+
+
+class SplitDiffFiles(SimpleInterface):
+    """Split diffusion files collection by file types"""
+    input_spec = _SplitDiffFilesInputSpec
+    output_spec = _SplitDiffFilesOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["data_files"] = [
+            self.inputs.d_files[key] for key in self.inputs.d_files if ".nii.gz" in key]
+        self._results["bval_files"] = [
+            self.inputs.d_files[key] for key in self.inputs.d_files if ".bval" in key]
+        self._results["bvec_files"] = [
+            self.inputs.d_files[key] for key in self.inputs.d_files if ".bvec" in key]
+        return runtime
+
+
+class _SplitT1FilesInputSpec(BaseInterfaceInputSpec):
+    t1_files = traits.Dict(mandatory=True, desc="Collection of T1 files")
+
+
+class _SplitT1FilesOutputSpec(TraitedSpec):
+    t1_file = traits.File(exists=True, desc="T1 file")
+    t1_restore_file = traits.File(exists=True, desc="T1 restored file")
+    t1_brain_file = traits.File(exists=True, desc="T1 brain restored file")
+    bias_file = traits.File(exists=True, desc="Bias file")
+    mask_file = traits.File(exists=True, desc="FreeSurfer mask file")
+
+
+class SplitT1Files(SimpleInterface):
+    """Split T1 files collection by file types"""
+    input_spec = _SplitT1FilesInputSpec
+    output_spec = _SplitT1FilesOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["t1_file"] = self.inputs.t1_files["t1"]
+        self._results["t1_restore_file"] = self.inputs.t1_files["t1_restore"]
+        self._results["t1_brain_file"] = self.inputs.t1_files["t1_restore_brain"]
+        self._results["bias_file"] = self.inputs.t1_files["bias"]
+        self._results["mask_file"] = self.inputs.t1_files["fs_mask"]
+        return runtime
+
+
+class FSEyeInputSpec(BaseInterfaceInputSpec):
+    fs_files = traits.Dict(mandatory=True, desc="Collection of FreeSurfer files")
+
+
+class FSEyeOutputSpec(TraitedSpec):
+    eye_file = traits.File(exists=True, desc="FreeSurfer eye file")
+
+
+class FSEye(SimpleInterface):
+    """Get the eye file from FreeSurfer files collection"""
+    input_spec = FSEyeInputSpec
+    output_spec = FSEyeOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["eye_file"] = self.inputs.fs_files["eye"]
+        return runtime
+
+
+class _FLIRTSchInputSpec(BaseInterfaceInputSpec):
+    in_file = traits.File(mandatory=True, exists=True, desc="Input file")
+    reference = traits.File(mandatory=True, exists=True, desc="Reference file")
+    wm_seg = traits.File(exists=True, desc="White matter segmentation volume")
+    in_matrix_file = traits.File(exists=True, desc="Input affine matrix")
+    schedule = traits.File(desc="Replaces default schedule")
+    out_matrix_file = traits.File(desc="Output affine matrix")
+    fsl_cmd = traits.Any(mandatory=True, desc="command for using singularity image (or not)")
+
+
+class _FLIRTSchOutputSpec(TraitedSpec):
+    out_matrix_file = traits.File(exists=True, desc="Output affine matrix")
+
+
+class Flirtsch(SimpleInterface):
+    """Run FLIRT with non-default schedule file which may exist in a container"""
+    input_spec = _FLIRTSchInputSpec
+    output_spec = _FLIRTSchOutputSpec
+
+    def _run_interface(self, runtime):
+        subprocess.run(
+            self.inputs.fsl_cmd.cmd("flirt").split() + [
+                "-in", self.inputs.in_file, "-ref", self.inputs.reference,
+                "-wmseg", self.inputs.wm_seg, "-init", self.inputs.in_matrix_file,
+                "-omat", self.inputs.out_matrix_file, "-dof", "6", "-cost", "bbr",
+                "-schedule", self.inputs.schedule],
+            check=True)
+        self._results["out_matrix_file"] = self.inputs.out_matrix_file
+        return runtime
+
+
+class _DiffResInputSpec(BaseInterfaceInputSpec):
+    data_file = traits.File(mandatory=True, exists=True, desc="Diffusion data file")
+
+
+class _DiffResOutputSpec(TraitedSpec):
+    res = traits.Int(desc="Diffusion data resolution (assuming isotropic)")
+    dilate = traits.Int(desc="Dilation range")
+
+
+class DiffRes(SimpleInterface):
+    """Get resolution and dilation range of diffusion data"""
+    input_spec = _DiffResInputSpec
+    output_spec = _DiffResOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["res"] = nib.load(self.inputs.data_file).header.get_zooms()[0]
+        self._results["dilate"] = int(self._results["res"] * 4)
+        return runtime
+
+
+class _ListInputSpec(BaseInterfaceInputSpec):
+    input = traits.List(mandatory=True, desc="Input list")
+
+
+class _ListOutputSpec(TraitedSpec):
+    output = traits.List(desc="Output list")
+
+
+class FlattenList(SimpleInterface):
+    """Flatten nested lists"""
+    input_spec = _ListInputSpec
+    output_spec = _ListOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["output"] = list(itertools.chain.from_iterable(self.inputs.input))
+        return runtime
+
+
+class _CreateListInputSpec(BaseInterfaceInputSpec):
+    input1 = traits.Any(mandatory=True, desc="Input item 1")
+    input2 = traits.Any(mandatory=True, desc="Input item 2")
+
+
+class CreateList(SimpleInterface):
+    """Flatten nested lists"""
+    input_spec = _CreateListInputSpec
+    output_spec = _ListOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["output"] = [self.inputs.input1, self.inputs.input2]
+        return runtime
+
+
+class _CombineStringsInputSpec(BaseInterfaceInputSpec):
+    input1 = traits.Str(mandatory=True, desc="Input string 1")
+    input2 = traits.Str(mandatory=True, desc="Input string 2")
+    input3 = traits.Str("", desc="Input string 3")
+    input4 = traits.Str("", desc="Input string 4")
+
+
+class _StringOutputSpec(TraitedSpec):
+    output = traits.Str(desc="Output string")
+
+
+class CombineStrings(SimpleInterface):
+    """Combine multile strings into one"""
+    input_spec = _CombineStringsInputSpec
+    output_spec = _StringOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["output"] = f"{self.inputs.input1}{self.inputs.input2}" \
+                                  f"{self.inputs.input3}{self.inputs.input4}"
+        return runtime
+
+
+class _ListItemInputSpec(BaseInterfaceInputSpec):
+    input = traits.List(mandatory=True, desc="Input list")
+    index = traits.Any(mandatory=True, desc="Index to pick from")
+
+
+class ListItem(SimpleInterface):
+    """Pick a string item from a list by index"""
+    input_spec = _ListItemInputSpec
+    output_spec = _StringOutputSpec
+
+    def _run_interface(self, runtime):
+        self._results["output"] = str(self.inputs.input[self.inputs.index])
+        return runtime

+ 334 - 80
hcpdiffpy/main.py

@@ -1,101 +1,355 @@
+from os import getenv
 from pathlib import Path
 import argparse
-import logging
-from typing import Union
 
-import pandas as pd
+from nipype.interfaces import fsl, freesurfer
+import datalad.api as dl
 import nipype.pipeline as pe
-from nipype import config
 
-from hcpdiffpy.interfaces.data import InitDiffusionData
-from hcpdiffpy.interfaces.preproc import HCPMinProc
-from hcpdiffpy.utilities.utilities import SimgCmd
+from hcpdiffpy.container import SimgCmd
+from hcpdiffpy.interfaces.data import InitData, SaveData
+from hcpdiffpy.interfaces.preproc import (
+    EddyIndex, EddyPostProc, ExtractB0, DilateMask, MergeBFiles, Rescale, RotateBVec2Str,
+    PrepareTopup, WBDilate)
+from hcpdiffpy.interfaces.utilities import (
+    CreateList, CombineStrings, DiffRes, FlattenList, Flirtsch, ListItem, PickDiffFiles,
+    SplitDiffFiles, FSEye, SplitT1Files, UpdateDiffFiles)
 
-logging.getLogger('datalad').setLevel(logging.WARNING)
+base_dir = Path(__file__).resolve().parent.parent
 
 
 def main() -> None:
     parser = argparse.ArgumentParser(
-        description='Multimodal neuroimaging feature extraction',
+        description="HCP Pipeline for diffusion preprocessing",
         formatter_class=lambda prog: argparse.ArgumentDefaultsHelpFormatter(prog, width=100))
-    parser.add_argument('dataset', type=str, help='Dataset (HCP-YA, HCP-A, HCP-D, ABCD)')
-    parser.add_argument('sublist', type=str, help='Absolute path to the subject list (.csv).')
     parser.add_argument(
-        '--diffusion', dest='diffusion', action='store_true',
-        help='Process diffusion data and extract diffusion features')
+        "subject_dir", type=Path,
+        help="Absolute path to the subject's data folder (organised in HCP-like structure)")
+    parser.add_argument("subject", type=str, help="Subject ID")
+    parser.add_argument("ndirs", nargs="+", help="List of numbers of directions")
+    parser.add_argument("phases", nargs="+", help="List of 2 phase encoding directions")
+    parser.add_argument("echo_spacing", type=float, help="Echo spacing used for acquisition in ms")
     parser.add_argument(
-        '--diff_int_dir', type=Path, dest='int_dir', default=None,
-        help='Directory containing diffusion intermediate files from previous runs')
+        "--dataset_dir", type=Path, dest="dataset_dir", default=None,
+        help="Absolute path to the Datalad dataset directory")
     parser.add_argument(
-        '--workdir', type=Path, dest='work_dir', default=Path.cwd(), help='Work directory')
+        "--source", type=str, dest="source", default=None,
+        help="Source to use for Datalad get command")
     parser.add_argument(
-        '--output_dir', type=Path, dest='output_dir', default=Path.cwd(), help='Output directory')
+        "--workdir", type=Path, dest="work_dir", default=Path.cwd(),
+        help="Absolute path to work directory")
     parser.add_argument(
-        '--simg', type=Path, dest='simg', default=None,
-        help='singularity image to use for command line functions from FSL / FreeSurfer / '
-             'Connectome Workbench / MRTrix3.')
+        "--output_dir", type=Path, dest="output_dir", default=Path.cwd(),
+        help="Absolute path to output directory")
     parser.add_argument(
-        '--overwrite', dest='overwrite', action="store_true", help='Overwrite existing results')
+        "--fsl_simg", type=Path, dest="fsl_simg", default=None,
+        help="singularity image to use for command line functions from FSL")
     parser.add_argument(
-        '--condordag', dest='condordag', action='store_true',
-        help='Submit graph workflow to HTCondor')
+        "--fs_simg", type=Path, dest="fs_simg", default=None,
+        help="singularity image to use for command line functions from FreeSurfer")
     parser.add_argument(
-        '--wrapper', type=str, dest='wrapper', default='', help='Wrapper script for HTCondor')
+        "--wb_simg", type=Path, dest="wb_simg", default=None,
+        help="singularity image to use for command line functions from Connectome Workbench")
     parser.add_argument(
-        '--debug', dest='debug', action="store_true", help='Use debug configuration')
-    args = parser.parse_args()
-
-    simg_cmd = SimgCmd(
-        simg=args.simg, work_dir=args.work_dir, out_dir=args.output_dir, int_dir=args.int_dir)
-
-    args.output_dir.mkdir(parents=True, exist_ok=True)
-    sublist = pd.read_csv(args.sublist, header=None).squeeze('columns')
-    for subject in sublist:
-        output_file = Path(args.output_dir, f'{args.dataset}_{subject}.h5')
-        if not output_file.is_file() or args.overwrite:
-            subject_wf = init_d_wf(
-                args.dataset, str(subject), args.work_dir, args.output_dir, simg_cmd,
-                args.overwrite, args.int_dir)
-
-            subject_wf.config['execution']['try_hard_link_datasink'] = 'false'
-            subject_wf.config['execution']['crashfile_format'] = 'txt'
-            subject_wf.config['execution']['stop_on_first_crash'] = 'true'
-            subject_wf.config['monitoring']['enabled'] = 'true'
-            if args.debug:
-                config.enable_debug_mode()
-
-            subject_wf.write_graph()
-            if args.condordag:
-                subject_wf.run(
-                    plugin='CondorDAGMan',
-                    plugin_args={
-                        'dagman_args': f'-outfile_dir {args.work_dir}', 'wrapper_cmd': args.wrapper,
-                        'dagman_args': '-import_env',
-                        'override_specs': 'request_memory = 5 GB\nrequest_cpus = 1'})
-            else:
-                subject_wf.run()
-
-
-def init_d_wf(
-        dataset: str, subject: str, work_dir: Path, output_dir: Path, simg_cmd: SimgCmd,
-        overwrite: bool, int_dir: Union[Path, None]) -> pe.Workflow:
-    d_wf = pe.Workflow(f'subject_{subject}_diffusion_wf', base_dir=work_dir)
-    work_curr = Path(work_dir, f'subject_{subject}_diffusion_wf')
-    init_data = pe.Node(
-        InitDiffusionData(
-            dataset=dataset, work_dir=work_curr, subject=subject, output_dir=output_dir),
-        name='init_data')
-    if (dataset == 'HCP-A' or dataset == 'HCP-D') and int_dir is None:
-        hcp_proc = HCPMinProc(
-            dataset=dataset, work_dir=work_curr, subject=subject, simg_cmd=simg_cmd).run()
-        hcp_proc_wf = hcp_proc.outputs.hcp_proc_wf
-        d_wf.connect([
-            (init_data, hcp_proc_wf, [
-                ('d_files', 'inputnode.d_files'), ('t1_files', 'inputnode.t1_files'),
-                ('fs_files', 'inputnode.fs_files'), ('fs_dir', 'inputnode.fs_dir')])])
-
-    return d_wf
-
-
-if __name__ == '__main__':
+        "--condordag", dest="condordag", action="store_true",
+        help="Submit workflow as DAG to HTCondor")
+    config = vars(parser.parse_args())
+
+    # Set-up
+    config["tmp_dir"] = Path(config["work_dir"], "hcp_proc_tmp")
+    config["tmp_dir"].mkdir(parents=True, exist_ok=True)
+    config["keys"] = [
+        f"dir{ndir}_{phase}" for ndir in sorted(config["ndirs"])
+        for phase in sorted(config["phases"])]
+    d_iterables = [("ndir", config["ndirs"]), ("phases", config["phases"])]
+    fsl_cmd = SimgCmd(config, config["fsl_simg"])
+    fs_cmd = SimgCmd(config, config["fs_simg"])
+    wb_cmd = SimgCmd(config, config["wb_simg"])
+    topup_config = Path(config["tmp_dir"], "HCP_pipeline", "global", "config", "b02b0.cnf")
+    if not topup_config.is_file():
+        dl.clone(
+            "git@github.com:Washington-University/HCPpipelines.git",
+            path=Path(config["tmp_dir"], "HCP_pipeline"))
+    if config["fsl_simg"] is None:
+        sch_file = Path(getenv("FSLDIR"), "etc", "flirtsch", "bbr.sch")
+    else: # guess path in container
+        sch_file = "/usr/local/fsl/etc/flirtsch/bbr.sch"
+    fs_dir = Path(config["subject_dir"], "T1w", config["subject"])
+
+    # Workflow set-up
+    hcpdiff_wf = pe.Workflow(f"hcpdiff_{config['subject']}_wf", base_dir=config["work_dir"])
+    hcpdiff_wf.config["execution"]["try_hard_link_datasink"] = "false"
+    hcpdiff_wf.config["execution"]["crashfile_format"] = "txt"
+    hcpdiff_wf.config["execution"]["stop_on_first_crash"] = "true"
+
+    # Get data
+    init_data = pe.Node(InitData(config=config), "init_data")
+    d_files = pe.Node(PickDiffFiles(), "split_diff_files", ietrables=d_iterables)
+
+    hcpdiff_wf.connect([(init_data, d_files, [("d_files", "d_files")])])
+
+    # 1. PreEddy
+    # 1.1. Normalise intensity
+    mean_dwi = pe.Node(
+        fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-Xmean -Ymean -Zmean"), "mean_dwi")
+    extract_b0s = pe.Node(ExtractB0(fsl_cmd=fsl_cmd, config=config), "extract_b0s")
+    merge_b0s = pe.Node(fsl.Merge(command=fsl_cmd.cmd("fslmerge"), dimension="t"), "merge_b0s")
+    mean_b0 = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-Tmean"), "mean_b0")
+    scale = pe.Node(fsl.ImageMeants(command=fsl_cmd.cmd("fslmeants")), "scale")
+    rescale = pe.JoinNode(
+        Rescale(fsl_cmd=fsl_cmd, config=config), "rescale",
+        joinfield=["scale_files"], joinsource="split_diff_files")
+
+    hcpdiff_wf.connect([
+        (d_files, mean_dwi, [("data_file", "in_file")]),
+        (d_files, extract_b0s, [("bval_file", "bval_file")]),
+        (mean_dwi, extract_b0s, [("out_file", "data_file")]),
+        (extract_b0s, merge_b0s, [("roi_files", "in_files")]),
+        (merge_b0s, mean_b0, [("merged_file", "in_file")]),
+        (mean_b0, scale, [("out_file", "in_file")]),
+        (init_data, rescale, [("d_files", "d_files")]),
+        (scale, rescale, [("out_file", "scale_file")])])
+
+    # 1.2. Prepare b0s and index files for topup
+    update_d_files = pe.Node(UpdateDiffFiles(config=config), "update_d_files")
+    rescaled_d_files = pe.Node(PickDiffFiles(), "split_rescaled", iterables=d_iterables)
+    rescaled_b0s = pe.Node(ExtractB0(config=config, rescale=True, fsl_cmd=fsl_cmd), "rescaled_b0s")
+    b0_list = pe.JoinNode(FlattenList(), "b0_list", joinfield="input", joinsource="split_rescaled")
+    pos_b0_list = pe.JoinNode(
+        FlattenList(), "pos_b0_list", joinfield="input", joinsource="split_rescaled")
+    neg_b0_list = pe.JoinNode(
+        FlattenList(), "neg_b0_list", joinfield="input", joinsource="split_rescaled")
+    merge_rescaled_b0s = pe.Node(
+        fsl.Merge(command=fsl_cmd.cmd("fslmerge"), dimension="t"), "merge_rescaled_b0s")
+    merge_pos_b0s = pe.Node(
+        fsl.Merge(command=fsl_cmd.cmd("fslmerge"), dimension="t"), "merge_pos_b0s")
+    merge_neg_b0s = pe.Node(
+        fsl.Merge(command=fsl_cmd.cmd("fslmerge"), dimension="t"), "merge_neg_b0s")
+
+    hcpdiff_wf.connect([
+        (init_data, update_d_files, [("d_files", "d_files")]),
+        (rescale, update_d_files, [("rescaled_files", "data_files")]),
+        (update_d_files, rescaled_d_files, [("d_files", "d_files")]),
+        (rescaled_d_files, rescaled_b0s, [("bval_file", "bval_file"), ("data_file", "data_file")]),
+        (rescaled_b0s, b0_list, [("roi_files", "input")]),
+        (rescaled_b0s, pos_b0_list, [("pos_files", "input")]),
+        (rescaled_b0s, neg_b0_list, [("neg_files", "input")]),
+        (b0_list, merge_rescaled_b0s, [("output", "in_files")]),
+        (pos_b0_list, merge_pos_b0s, [("output", "in_files")]),
+        (neg_b0_list, merge_neg_b0s, [("output", "in_files")])])
+
+    # 1.3. Topup
+    prepare_topup = pe.Node(PrepareTopup(config=config), "prepare_topup")
+    topup = pe.Node(fsl.TOPUP(command=fsl_cmd.cmd("topup"), config=str(topup_config)), "topup")
+    pos_b01 = pe.Node(fsl.ExtractROI(command=fsl_cmd.cmd("fslroi"), t_min=0, t_size=1), "pos_b01")
+    neg_b01 = pe.Node(fsl.ExtractROI(command=fsl_cmd.cmd("fslroi"), t_min=0, t_size=1), "neg_b01")
+    b01_files = pe.Node(CreateList(), "b01_files")
+    apply_topup = pe.Node(
+        fsl.ApplyTOPUP(command=fsl_cmd.cmd("applytopup"), method="jac"), "apply_topup")
+    nodiff_mask = pe.Node(fsl.BET(command=fsl_cmd.cmd("bet"), frac=0.2, mask=True), "nodiff_mask")
+
+    hcpdiff_wf.config([
+        (update_d_files, prepare_topup, [("d_files", "d_files")]),
+        (b0_list, prepare_topup, [("output", "roi_files")]),
+        (merge_pos_b0s, prepare_topup, [("merged_file", "pos_b0_file")]),
+        (merge_rescaled_b0s, topup, [("merged_file", "in_file")]),
+        (prepare_topup, topup, [("enc_dir", "encoding_direction"), ("ro_time", "readout_time")]),
+        (merge_pos_b0s, pos_b01, [("merged_file", "in_file")]),
+        (merge_neg_b0s, neg_b01, [("merged_file", "in_file")]),
+        (pos_b01, b01_files, [("roi_file", "input1")]),
+        (neg_b01, b01_files, [("roi_file", "input2")]),
+        (prepare_topup, apply_topup, [("indices_t", "in_index")]),
+        (topup, apply_topup, [
+            ("out_enc_file", "encoding_file"), ("out_fieldcoef", "in_topup_fieldcoef"),
+            ("out_movpar", "in_topup_movpar")]),
+        (b01_files, apply_topup, [("output", "in_files")]),
+        (apply_topup, nodiff_mask, [("out_corrected", "in_file")])])
+
+    # 2. Eddy
+    d_filetype = pe.Node(SplitDiffFiles(), "split_d_filetype")
+    merge_bfiles = pe.Node(MergeBFiles(config=config), "merge_bfiles")
+    merge_dwi = pe.Node(fsl.Merge(command=fsl_cmd.cmd("fslmerge"), dimension="t"), "merge_dwi")
+    eddy_index = pe.Node(EddyIndex(config=config), "eddy_index")
+    eddy = pe.Node(fsl.Eddy(command=fsl_cmd.cmd("eddy"), fwhm=0, args="-v"), name="eddy")
+
+    hcpdiff_wf.connect([
+        (update_d_files, d_filetype, [("d_files", "d_files")]),
+        (d_filetype, merge_bfiles, [
+            ("bval_files", "bval_files"), ("bvec_files", "bvec_files")]),
+        (d_filetype, merge_dwi, [("data_files", "in_files")]),
+        (b0_list, eddy_index, [("output", "roi_files")]),
+        (d_filetype, eddy_index, [("data_files", "dwi_files")]),
+        (merge_dwi, eddy, [("merged_file", "in_file")]),
+        (merge_bfiles, eddy, [("bval_nerged", "in_bval"), ("bvec_merged", "in_bvec")]),
+        (topup, eddy, [("out_enc_file", "in_acqp")]),
+        (eddy_index, eddy, [("index_file", "in_index")]),
+        (nodiff_mask, eddy, [("mask_file", "in_mask")]),
+        (topup, eddy, [
+            ("out_fieldcoef", "in_topup_fieldcoef"), ("out_movpar", "in_topup_movpar")])])
+
+    # 3. PostEddy
+    # 3.1. Postproc
+    postproc = pe.Node(EddyPostProc(fsl_cmd=fsl_cmd, config=config), "postproc")
+    fov_mask = pe.Node(
+        fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-abs -Tmin -bin -fillh"), "fov_mask")
+    mask_args = pe.Node(CombineStrings(input1="-mas "), "mask_args")
+    mask_data = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths")), "mask_data")
+    thr_data = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-thr 0"), "thr_data")
+
+    hcpdiff_wf.connect([
+        (d_filetype, postproc, [
+            ("bval_files", "bval_files"), ("bvec_files", "bvec_files"),
+            ("data_files", "rescaled_files")]),
+        (postproc, fov_mask, [("combined_dwi_file", "in_file")]),
+        (fov_mask, mask_args, [("out_file", "input2")]),
+        (postproc, mask_data, [("combined_dwi_file", "in_file")]),
+        (mask_args, mask_data, [("output", "args")]),
+        (mask_data, thr_data, [("out_file", "in_file")])])
+
+    # 3.2. DiffusionToStructural
+    # 3.2.1. nodiff-to-T1
+    nodiff_brain = pe.Node(
+        fsl.ExtractROI(command=fsl_cmd.cmd("fslroi"), t_min=0, t_size=1), "nodiff_brain")
+    t1_files = pe.Node(SplitT1Files(), "t1_files")
+    wm_seg = pe.Node(fsl.FAST(command=fsl_cmd.cmd("fast"), output_type="NIFTI_GZ"), "wm_seg")
+    pve_file = pe.Node(ListItem(index=-1), "pve_file")
+    wm_thr = pe.Node(
+        fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-thr 0.5 -bin"), "wm_thr")
+    flirt_init = pe.Node(fsl.FLIRT(command=fsl_cmd.cmd("flirt"), dof=6), "flirt_init")
+    flirt_nodiff2t1 = pe.Node(
+        Flirtsch(
+            fsl_cmd=fsl_cmd, schedule=sch_file, out_matrix_file=Path(
+                config["tmp_dir"], "flirt_sch_nodiff2t1.mat")),
+        "flirt_nodiff2t1")
+    nodiff2t1 = pe.Node(
+        fsl.ApplyWarp(command=fsl_cmd.cmd("applywarp"), interp="spline", relwarp=True), "nodiff2t1")
+    bias_args = pe.Node(CombineStrings(input1="-div "), "bias_args")
+    nodiff_bias = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths")), "nodiff_bias")
+
+    hcpdiff_wf.connect([
+        (thr_data, nodiff_brain, [("out_file", "in_file")]),
+        (init_data, t1_files, [("t1_files", "t1_files")]),
+        (t1_files, wm_seg, [("t1_brain_file", "in_files")]),
+        (wm_seg, pve_file, [("partial_volume_files", "in_list")]),
+        (pve_file, wm_thr, [("output", "in_file")]),
+        (nodiff_brain, flirt_init, [("roi_file", "in_file")]),
+        (t1_files, flirt_init, [("t1_brain_file", "reference")]),
+        (nodiff_brain, flirt_nodiff2t1, [("roi_file", "in_file")]),
+        (t1_files, flirt_nodiff2t1, [("t1", "reference")]),
+        (wm_thr, flirt_nodiff2t1, [("out_file", "wm_seg")]),
+        (flirt_init, flirt_nodiff2t1, [("out_matrix_file", "in_matrix_file")]),
+        (nodiff_brain, nodiff2t1, [("roi_file", "in_file")]),
+        (t1_files, nodiff2t1, [("t1_file", "ref_file")]),
+        (flirt_nodiff2t1, nodiff2t1, [("out_matrix_file", "premat")]),
+        (t1_files, bias_args, [("bias_file", "input2")]),
+        (nodiff2t1, nodiff_bias, [("out_file", "in_file")]),
+        (bias_args, nodiff_bias, [("output", "args")])])
+
+    # 3.2.2. diff-to-struct
+    fs_eye = pe.Node(FSEye(), "fs_eye")
+    bbr_epi2t1 = pe.Node(
+        freesurfer.BBRegister(
+            command=fs_cmd.cmd("bbregister", options=f"--env SUBJECTS_DIR={fs_dir}"),
+            contrast_type="bold", dof=6, args="--surf white.deformed", subjects_dir=fs_dir,
+            subject_id=config["subject"]),
+        "bbr_epi2t1")
+    tkr_diff2str = pe.Node(
+        freesurfer.Tkregister2(command=fs_cmd.cmd("tkregister2"), noedit=True), "tkr_diff2str")
+    diff2str = pe.Node(
+        fsl.ConvertXFM(command=fsl_cmd.cmd("convert_xfm"), concat_xfm=True), "diff2str")
+
+    hcpdiff_wf.connect([
+        (init_data, fs_eye, [("fs_files", "fs_files")]),
+        (nodiff_bias, bbr_epi2t1, [("out_file", "source_file")]),
+        (fs_eye, bbr_epi2t1, [("eye_file", "init_reg_file")]),
+        (nodiff_bias, tkr_diff2str, [("out_file", "moving_image")]),
+        (bbr_epi2t1, tkr_diff2str, [("out_reg_file", "reg_file")]),
+        (t1_files, tkr_diff2str, [("t1_file", "target_image")]),
+        (flirt_nodiff2t1, diff2str, [("out_matrix_file", "in_file")]),
+        (tkr_diff2str, diff2str, [("fsl_file", "in_file2")])])
+
+    # 3.2.3. resampling
+    res_dil = pe.Node(DiffRes(), "res_dil")
+    flirt_resamp = pe.Node(fsl.FLIRT(command=fsl_cmd.cmd("flirt")), "flirt_resamp")
+    t1_resamp = pe.Node(
+        fsl.ApplyWarp(command=fsl_cmd.cmd("applywarp"), interp="spline", relwarp=True), "t1_resamp")
+    dilate_data = pe.Node(WBDilate(config=config, wb_cmd=wb_cmd), "dilate_data")
+    resamp_data = pe.Node(
+        fsl.FLIRT(command=fsl_cmd.cmd("flirt"), apply_xfm=True, interp="spline"), "resamp_data")
+    resamp_mask = pe.Node(
+        fsl.FLIRT(command=fsl_cmd.cmd("flirt"), interp="nearestneighbour"), "resamp_mask")
+    resamp_fmask = pe.Node(
+        fsl.FLIRT(command=fsl_cmd.cmd("flirt"), apply_xfm=True, interp="trilinear"), "resamp_fmask")
+
+    hcpdiff_wf.connect([
+        (thr_data, res_dil, [("out_file", "data_file")]),
+        (t1_files, flirt_resamp, [
+            ("t1_restore_file", "in_file"), ("t1_resotre_file", "reference")]),
+        (res_dil, flirt_resamp, [("res", "apply_isoxfm")]),
+        (t1_files, t1_resamp, [("t1_restore_file", "in_file")]),
+        (flirt_resamp, t1_resamp, [("out_file", "ref_file")]),
+        (thr_data, dilate_data, [("out_file", "data_file")]),
+        (res_dil, dilate_data, [("dilate", "dilate")]),
+        (dilate_data, resamp_data, [("out_file", "in_file")]),
+        (t1_resamp, resamp_data, [("out_file", "reference")]),
+        (diff2str, resamp_data, [("out_file", "in_matrix_file")]),
+        (t1_files, resamp_mask, [("mask_file", "in_file"), ("mask_file", "reference")]),
+        (res_dil, resamp_mask, [("res", "apply_isoxfm")]),
+        (fov_mask, resamp_fmask, [("out_file", "in_file")]),
+        (t1_resamp, resamp_fmask, [("out_file", "reference")]),
+        (diff2str, resamp_fmask, [("out_file", "in_matrix_file")])])
+
+    # 3.2.4. postprocessing
+    dilate_mask = pe.Node(DilateMask(fsl_cmd=fsl_cmd), "dilate_mask")
+    thr_fmask = pe.Node(
+        fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-thr 0.999 -bin"), "thr_fmask")
+    masks_args = pe.Node(CombineStrings(input1="-mas ", input3="-mas "), "masks_args")
+    mask_data = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths")), "fmask_data")
+    nonneg_data = pe.Node(
+        fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-thr 0"), "nonneg_data")
+    mean_mask = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths"), args="-Tmean"), "mean_mask")
+    mean_args = pe.Node(CombineStrings(input1="-mas "), "mean_args")
+    mask_mask = pe.Node(fsl.ImageMaths(command=fsl_cmd.cmd("fslmaths")), "mask_mask")
+    rot_matrix = pe.Node(fsl.AvScale(command=fsl_cmd.cmd("avscale")), "rot_matrix")
+    rotate_bvec = pe.Node(RotateBVec2Str(config=config), "rotate_bvec")
+
+    hcpdiff_wf.connect([
+        (resamp_mask, dilate_mask, [("out_file", "mask_file")]),
+        (resamp_fmask, thr_fmask, [("out_file", "in_file")]),
+        (dilate_mask, masks_args, [("out_file", "input2")]),
+        (thr_fmask, masks_args, [("out_file", "input4")]),
+        (resamp_data, mask_data, [("out_file", "in_file")]),
+        (masks_args, mask_data, [("output", "args")]),
+        (mask_data, nonneg_data, [("out_file", "in_file")]),
+        (nonneg_data, mean_mask, [("out_file", "in_file")]),
+        (mean_mask, mean_args, [("out_file", "input2")]),
+        (dilate_mask, mask_mask, [("dil0_file", "in_file")]),
+        (mean_args, mask_mask, [("output", "args")]),
+        (diff2str, rot_matrix, [("out_file", "mat_file")]),
+        (postproc, rotate_bvec, [("rot_bvecs", "bvecs_file")]),
+        (rot_matrix, rotate_bvec, [("rotation_translation_matrix", "rot")])])
+
+    # Save data
+    save_data = pe.Node(SaveData(config=config), "save_data")
+
+    hcpdiff_wf.connect([
+        (postproc, save_data, [("rot_bvals", "bval_file")]),
+        (nonneg_data, save_data, [("out_file", "data_file")]),
+        (mask_mask, save_data, [("out_file", "mask_file")]),
+        (rotate_bvec, save_data, [("rotated_file", "bvec_file")])])
+
+    # Run workflow
+    hcpdiff_wf.write_graph()
+    if config["condordag"]:
+        hcpdiff_wf.run(
+            plugin="CondorDAGMan",
+            plugin_args={
+                "dagman_args": f"-outfile_dir {config['work_dir']} -import_env",
+                "wrapper_cmd": Path(base_dir, "hcpdiffpy", "venv_wrapper.sh"),
+                "override_specs": "request_memory = 5 GB\nrequest_cpus = 1"})
+    else:
+        hcpdiff_wf.run()
+
+
+if __name__ == "__main__":
     main()

+ 0 - 136
hcpdiffpy/utilities/preproc.py

@@ -1,136 +0,0 @@
-from hcpdiffpy.exceptions import DatasetError
-
-
-def d_files_dirsphase(d_files: dict, dirs: str, phase: str) -> tuple[str, ...]:
-    key = f'dir{dirs}_{phase}'
-    image = d_files[f'{key}.nii.gz']
-    bval = d_files[f'{key}.bval']
-    bvec = d_files[f'{key}.bvec']
-
-    return str(image), str(bval), str(bvec)
-
-
-def d_files_type(d_files: dict) -> tuple[list, ...]:
-    image = [d_files[key] for key in d_files if '.nii.gz' in key]
-    bval = [d_files[key] for key in d_files if '.bval' in key]
-    bvec = [d_files[key] for key in d_files if '.bvec' in key]
-
-    return image, bval, bvec
-
-
-def t1_files_type(t1_files: dict) -> tuple[str, ...]:
-    t1_file = t1_files['t1']
-    t1_restore_file = t1_files['t1_restore']
-    t1_brain_file = t1_files['t1_restore_brain']
-    bias_file = t1_files['bias']
-    mask_file = t1_files['fs_mask']
-    t1_to_mni = t1_files['t1_to_mni']
-
-    return (
-        str(t1_file), str(t1_restore_file), str(t1_brain_file), str(bias_file), str(mask_file),
-        str(t1_to_mni))
-
-
-def fs_files_type(fs_files: dict) -> tuple[str, ...]:
-
-    lh_whitedeform_file = fs_files['lh_white_deformed']
-    rh_whitedeform_file = fs_files['rh_white_deformed']
-    eye_file = fs_files['eye']
-    orig_file = fs_files['orig']
-    lh_thick_file = fs_files['lh_thickness']
-    rh_thick_file = fs_files['rh_thickness']
-
-    return (
-        str(lh_whitedeform_file), str(rh_whitedeform_file), str(eye_file), str(orig_file),
-        str(lh_thick_file), str(rh_thick_file))
-
-
-def fs_files_aparc(fs_files: dict) -> tuple[str, ...]:
-    lh_aparc = fs_files['lh_aparc']
-    rh_aparc = fs_files['rh_aparc']
-    lh_white = fs_files['lh_white']
-    rh_white = fs_files['rh_white']
-    lh_pial = fs_files['lh_pial']
-    rh_pial = fs_files['rh_pial']
-    lh_ribbon = fs_files['lh_ribbon']
-    rh_ribbon = fs_files['rh_ribbon']
-    ribbon = fs_files['ribbon']
-
-    return (
-        str(lh_aparc), str(rh_aparc), str(lh_white), str(rh_white), str(lh_pial), str(rh_pial),
-        str(lh_ribbon), str(rh_ribbon), str(ribbon))
-
-
-def update_d_files(d_files: dict, dataset: str, dwi_replacements: list) -> dict:
-    if dataset == 'HCP-A' or dataset == 'HCP-D':
-        keys = ['dir98_AP', 'dir98_PA', 'dir99_AP', 'dir99_PA']
-    else:
-        raise DatasetError()
-
-    for key in keys:
-        dwi_key = [d_key for d_key in d_files if key in d_key and '.nii.gz' in d_key]
-        dwi_replace = [d_file for d_file in dwi_replacements if key in str(d_file)]
-        d_files[dwi_key[0]] = dwi_replace[0]
-
-    return d_files
-
-
-def flatten_list(in_list: list) -> list:
-    import itertools
-    return list(itertools.chain.from_iterable(in_list))
-
-
-def create_2item_list(item1: str, item2: str) -> list:
-    return [item1, item2]
-
-
-def combine_2strings(str1: str, str2: str) -> str:
-    return f'{str1}{str2}'
-
-
-def combine_4strings(str1: str, str2: str, str3: str, str4: str) -> str:
-    return f'{str1}{str2}{str3}{str4}'
-
-
-def last_list_item(in_list: list) -> str:
-    return str(in_list[-1])
-
-
-def diff_res(data_file: str) -> tuple[float, int]:
-    import nibabel as nib
-    res = nib.load(data_file).header.get_zooms()[0]
-
-    return res, int(res*4)
-
-
-def flirt_bbr_sch(
-        in_file: str, reference: str, wm_seg: str, in_matrix_file: str, run_cmd: str,
-        sch_file: str, out_dir: str, out_prefix: str) -> str:
-    import subprocess
-    from pathlib import Path
-
-    out_matrix_file = Path(out_dir, f'flirt_sch_{out_prefix}.mat')
-    subprocess.run(
-        run_cmd.split() + [
-            '-in', in_file, '-ref', reference, '-wmseg', wm_seg,
-            '-init', in_matrix_file, '-omat', out_matrix_file, '-dof', '6',
-            '-cost', 'bbr', '-schedule', sch_file],
-        check=True
-    )
-
-    return str(out_matrix_file)
-
-
-def bet_nodif_mask(in_file: str, run_cmd: str, work_dir: str) -> str:
-    import subprocess
-    from pathlib import Path
-
-    out_file = Path(work_dir, f'{Path(in_file).stem}.nii.gz')
-    mask_file = Path(work_dir, f'{Path(in_file).stem}_mask.nii.gz')
-    subprocess.run(run_cmd.split() + [in_file, out_file, '-f', '0.20', '-m'], check=True)
-
-    return str(mask_file)
-
-
-def d_files_intermediate(d_files: dict) -> tuple[str, ...]:
-    return d_files['data'], d_files['bval'], d_files['bvec'], d_files['mask']

+ 0 - 29
hcpdiffpy/utilities/utilities.py

@@ -1,29 +0,0 @@
-from typing import Union
-from pathlib import Path
-
-base_dir = Path(__file__).resolve().parent.parent.parent
-
-
-class SimgCmd:
-    def __init__(
-            self, simg: Union[str, None], work_dir: Path, out_dir: Path,
-            int_dir: Union[Path, None]) -> None:
-        if simg is None:
-            self.cmd = None
-        else:
-            self.cmd = (f'singularity run -B {work_dir}:{work_dir},'
-                        f'{out_dir}:{out_dir},{base_dir}:{base_dir}')
-            if int_dir is not None:
-                self.cmd = f'{self.cmd},{int_dir}:{int_dir}'
-            self._simg = simg
-
-    def run_cmd(self, cmd: str, options: Union[str, None] = None) -> str:
-        if self.cmd is None:
-            run_cmd = cmd
-        else:
-            if options is None:
-                run_cmd = f'{self.cmd} {self._simg} {cmd}'
-            else:
-                run_cmd = f'{self.cmd} {options} {self._simg} {cmd}'
-
-        return run_cmd

+ 5 - 0
hcpdiffpy/venv_wrapper.sh

@@ -0,0 +1,5 @@
+#!/usr/bin/env bash
+
+cmd="$1 $2"
+echo "$cmd"
+eval "$cmd"

+ 1 - 1
pyproject.toml

@@ -18,7 +18,7 @@ classifiers = [
 ]
 dependencies = [
     "pandas ~= 1.3.5",
-    "nipype ~= 1.8.5",
+    "nipype @ git+https://github.com/nipy/nipype.git@b4cc4db4152e2ec60ee9c6fe922d90560211a4ed",
     "numpy ~= 1.21.6",
     "datalad ~= 0.17.7",
     "nibabel ~= 4.0.2",