Pipelines ========= Derivatives are generated within FrameTree by modular "pipelines". Pipeline outputs are connected to sink columns (see :ref:`columns`). Pipeline inputs can draw data from either source columns or sink columns containing derivatives generated by prerequisite pipelines. By connecting pipeline inputs to the outputs of other pipelines, complex processing chains/webs can be created (reminiscent of a makefile), in which intermediate products will be stored in the dataset for subsequent analysis. FrameTree uses the Pydra_ dataflow engine under the hood, and Pydra_ tasks or workflows can be "applied" to a dataset, where they will be wrapped by a pipeline. However, shell commands can be wrapped using the generic :func:`frametree.common.shell`` task. Pipelines can be applied to the dataset when it is created, and then run incrementally as the data is acquired, ensuring the same parameters are used consistently. Additional management features that FrameTree pipelines provide are * iteration logic over the dataset * storage and retrieval of data to and from the data store * conversion between between mismatching file formats * provenance * consistent parameterisations and software versions To connect a workflow via the CLI mapping the inputs and outputs of the Pydra_ workflow/task (``in_file``, ``peel`` and ``out_file`` in the example below) to appropriate columns in the dataset (``T1w``, ``T2w`` and ``freesurfer/recon-all`` respectively) .. code-block:: console $ frametree add-source 'myuni-xnat//myproject@training' T1w \ medimage/dicom-series --regex '.*mprage.*' $ frametree add-source 'myuni-xnat//myproject@training' T2w \ medimage/dicom-series --regex '.*t2spc.*' $ frametree add-sink 'myuni-xnat//myproject@training' freesurfer/recon-all \ application/zip $ frametree apply 'myuni-xnat//myproject@training' freesurfer \ pydra.tasks.freesurfer:Freesurfer \ --input T1w in_file medimage/niftiGz \ --input T2w peel medimage/niftiGz \ --output freesurfer/recon-all out_file generic/directory \ --parameter param1 10 \ --parameter param2 20 If there is a mismatch in the data datatype (see FileFormats_) between the workflow inputs/outputs and the columns they are connected to, a datatype conversion task will be inserted into the pipeline if converter method between the two formats exists (see FileFormats_). Alternatively via the Python API: .. toggle:: Show/Hide Python Code Example .. code-block:: python from pydra.tasks.freesurfer import Freesurfer from frametree.core import FrameSet from fileformats import generic, medimage frameset = FrameSet.load('myuni-xnat//myproject:training') frameset.add_source('T1w', datatype=medimage.Dicom, path='.*mprage.*', is_regex=True) frameset.add_source('T2w', datatype=medimage.Dicom, path='.*t2spc.*', is_regex=True) frameset.add_sink('freesurfer/recon-all', common.Directory) frameset.apply( workflow=Freesurfer( name='freesurfer, param1=10.0, param2=20.0), inputs=[('T1w', 'in_file', medimage.NiftiGz), ('T2w', 'peel', medimage.NiftiGz)], outputs=[('freesurfer/recon-all', 'out_file', generic.Directory)]) frameset.save() If the source can be referenced by its path alone and the formats of the source and sink columns match those expected and produced by the workflow, then you can all add the sources and sinks in one step .. code-block:: console $ frametree apply pipeline '/data/enigma/alzheimers@test' segmentation \ pydra.tasks.fsl.preprocess.fast:FAST \ --source T1w in_file medimage/nifti-gz \ --sink fast/gm gm medimage/nifti-gz \ --parameter method a-method By default, pipelines will iterate all "leaf rows" of the data tree (e.g. ``session`` for datasets in the :class:`.Clinical` space). However, pipelines can be run at any row row_frequency of the dataset (see :ref:`axes`), e.g. per subject, per visit, or on the dataset as a whole (to create single templates/statistics). Pipeline outputs must be connected to sinks of the same row row_frequency. However, inputs can be drawn from columns of any row row_frequency. In this case, inputs from more frequent rows will be provided to the pipeline as a list sorted by their ID. For example, when the pipeline in the following code-block runs, it will receive a list of T1w filenames, run one workflow row, and then sink a single template back to the dataset. .. code-block:: console $ # Add sink column with "constant" row frequency $ frametree add-sink bids///data/openneuro/ds00014 vbm_template medimage/nifti-gz \ --row-frequency constant $ # NB: we don't need to add the T1w source as it is auto-detected when using BIDS $ # Connect pipeline to a "constant" row-frequency sink column. Needs to be $ # of `constant` row_frequency itself or Arcana will raise an error $ frametree apply bids///data/openneuro/ds00014 vbm_template \ --input T1w in_file \ --output vbm_template out_file \ --row-frequency constant Alternatively via the Python API: .. toggle:: Show/Hide Python Code Example .. code-block:: python from myworkflows import vbm_template from fileformats import common, medimage from frametree.common import Clinical frameset = FrameSet.load('bids///data/openneuro/ds00014') # Add sink column with "constant" row frequency frameset.add_sink( name='vbm_template', datatype=medimage.NiftiGz row_frequency='constant') # NB: we don't need to add the T1w source as it is automatically detected # when using BIDS # Connect pipeline to a "dataset" row-row_frequency sink column. Needs to be # of `dataset` row_frequency itself or Arcana will raise an error frameset.apply( name='vbm_template', workflow=vbm_template, inputs=[('in_file', 'T1w')], outputs=[('out_file', 'vbm_template')], row_frequency='constant') .. _derivatives: Generating derivatives ---------------------- After workflows and/or analysis classes have been connected to a dataset, derivatives can be generated using :meth:`.FrameSet.derive` or alternatively :meth:`.FrameSet.derive` for single columns. These methods check the data store to see whether the source data is present and executes the pipelines over all rows of the dataset with available source data. If pipeline inputs are sink columns to be derived by prerequisite pipelines, then the prerequisite pipelines will be prepended onto the execution stack. To generate derivatives via the CLI .. code-block:: console $ frametree derive 'myuni-xnat//myproject@training' freesurfer/recon-all Alternatively via the API .. toggle:: Show/Hide Python Code Example .. code-block:: python frameset = FrameSet.load('/data/openneuro/ds00014@test') frameset.derive('fast/gm', cache_dir='/work/temp-dir') # Print URI of generated dataset print(frameset['fast/gm']['sub11'].uri) By default Pydra_ uses the "concurrent-futures" (`'cf'`) plugin, which splits workflows over multiple processes. You can specify which plugin, and thereby how the workflow is executed via the ``pydra_plugin`` option, and pass options to it with ``pydra_option``. .. code-block:: console $ frametree derive 'myuni-xnat//myproject@training' freesurfer/recon-all \ --plugin slurm --pydra-option poll_delay 5 --pydra-option max_jobs 10 To list the derivatives that can be derived from a dataset after workflows have been applied you can use the ``menu`` command .. code-block:: console $ frametree menu '/data/a-dataset' Derivatives ----------- recorded_datafile (application/zip) recorded_metadata (application/json) preprocessed (application/zip) derived_image (image/png) summary_metric (field/float) Parameters ---------- contrast (field/float): 0.6 (default=0.5) kernel_fwhms (field/float+array): [0.2, 0.2. 0.6] (default=[0.5, 0.3, 0.1]) Provenance ---------- Provenance metadata is saved alongside derivatives in the data store. The metadata includes: * MD5 Checksums of all pipeline inputs and outputs * Full workflow graph with connections between, and parameterisations of, Pydra tasks * Container image tags for tasks that ran inside containers * Python dependencies and versions used. How these provenance metadata are stored will depend on the type data store, but often it will be stored in a JSON file. For example, a provenance JSON file would look like .. code-block:: javascript { "store": { "class": "", "server": "https://central.xnat.org" }, "dataset": { "id": "MYPROJECT", "name": "passed-dwi-qc", "exclude": ['015', '101'] "id_composition": { "subject": "(?PTEST|CONT)(?P\d+3)" } }, "pipelines": [ { "name": "anatomically_constrained_tractography", "inputs": { // MD5 Checksums for all files in the file group. "." refers to the // "primary file" in the file group. "T1w_reg_dwi": { "datatype": "", "checksums": { ".": "4838470888DBBEADEAD91089DD4DFC55", "json": "7500099D8BE29EF9057D6DE5D515DFFE" } }, "T2w_reg_dwi": { "datatype": "", "checksums": { ".": "4838470888DBBEADEAD91089DD4DFC55", "json": "5625E881E32AE6415E7E9AF9AEC59FD6" } }, "dwi_fod": { "datatype": "", "checksums": { ".": "92EF19B942DD019BF8D32A2CE2A3652F" } } }, "outputs": { "wm_tracks": { "task": "tckgen", "field": "out_file", "datatype": "", "checksums": { ".": "D30073044A7B1239EFF753C85BC1C5B3" } } } "workflow": { "name": "workflow", "class": "", "tasks": { "5ttgen": { "class": "", "package": "pydra-mrtrix", "version": "0.1.1", "inputs": { "in_file": { "field": "T1w_reg_dwi" } "t2": { "field": "T1w_reg_dwi" } "sgm_amyg_hipp": true }, "container": { "type": "docker", "image": "mrtrix3/mrtrix3:3.0.3" } }, "tckgen": { "class": "", "package": "pydra-mrtrix", "version": "0.1.1", "inputs": { "in_file": { "field": "dwi_fod" }, "act": { "task": "5ttgen", "field": "out_file" }, "select": 100000000, }, "container": { "type": "docker", "image": "mrtrix3/mrtrix3:3.0.3" } }, }, }, "execution": { "machine": "hpc.myuni.edu", "processor": "intel9999", "python-packages": { "pydra-mrtrix3": "0.1.0", "fileformats-medimage": "0.8.1", "frametree-xnat": "0.5.0" } }, }, ], } Before derivatives are generated, provenance metadata of prerequisite derivatives (i.e. inputs of the pipeline and prerequisite pipelines, etc...) are checked to see if there have been any alterations to the configuration of the pipelines that generated them. If so, any affected rows will not be processed, and a warning will be generated by default. To override this behaviour and reprocesse the derivatives, set the ``reprocess`` flag when calling :meth:`.Dataset.derive` .. code-block:: console $ frametree derive 'myuni-xnat//myproject@training' freesurfer/recon-all --reprocess via the API: .. toggle:: Show/Hide Python Code Example .. code-block:: python dataset.derive('fast/gm', reprocess=True) To ignore differences between pipeline configurations you can use the :meth:`.Dataset.ignore` method .. code-block:: console $ frametree ignore-diff 'myuni-xnat//myproject@training' freesurfer --param freesurfer_task num_iterations 3 via the API: .. toggle:: Show/Hide Python Code Example .. code-block:: python dataset.ignore_diff('freesurfer_pipeline', ('freesurfer_task', 'num_iterations', 3)) .. _Pydra: https://pydra.readthedocs.io .. _FileFormats: https://arcanaframework.github.io/fileformats .. _attrs: https://www.attrs.org/en/stable/ .. _dataclasses: https://docs.python.org/3/library/dataclasses.html