Designing Analyses¶
An great way to contribute to the development of Arcana is to implement new analysis classes or extend existing ones. The architecture of analysis classes is intended to facilitate the implementation of generic analysis suites for wide-spread use, which can then be tailored to meet the specific requirements of particular research studies via class inheritance (see Inheritance).
This page builds upon the description of analysis-class design introduced in Analysis classes. The basic building blocks of the design are described in detail in the Basics section, while more advanced concepts involved in extending existing classes are covered in the Advanced section.
Basics¶
There are two main components of analysis classes, column specifications (DataColumn and parameter specification), which define the data to be provided to and derived by the class, and pipeline builder methods (Pipeline builders), which construct the Pydra workflows used to generate the derivatives. Parameter attributes (DataColumn and parameter specification) expose key parameters used by the workflow construction and output methods (Output methods) provide a convenient way to include the final steps analyses (e.g. plotting figures) all in the one place.
DataColumn and parameter specification¶
While columns in an Analysis
class can be specified using the
dataclass-like syntax of column_name: Format
, in most cases you will want to
explicitly use the arcana.core.mark.column
function to include some basic
metadata for the column, such as a description of what the column represents
in the desc
keyword arg.
@analysis(ExampleDataSpace)
class ExampleAnalysis():
recorded_datafile: DatFile = column(
desc=("Datafile acquired from an example scanner. Contains key "
"data to analyse"))
recorded_metadata: Json = column(
desc="Metadata accompanying the recorded data")
preprocessed: Zip[Text] = column(
desc="Preprocessed data file, corrected for distortions")
derived_image: Png = column(
desc="Map of the processed data")
summary_metric: field.Decimal = column(
desc="A summary metric extracted from the derived image",
row_frequency='dataset')
The column spec descriptions will be shown to the user when they use the Dataset.menu()
or arcana menu
CLI command. The row row_frequency of the column (e.g. per-session,
per-subject, per-group, once per-dataset etc…, see Spaces and
Frames: Rows and Columns) is specified by the row_frequency
keyword argument. The row_frequency should be a member of the data space(see Spaces)
provided to the arcana.core.mark.analysis()
class decorator.
Not all columns specifications are created equal. Some refer to key inputs
(e.g. the primary MRI image) or outputs (e.g. lesion load) and others just need
to be sanity checked or useful in debugging. Therefore, to avoid the menu being
cluttered up with non-salient specifications, the “salience” of the columns can
be specified in addition to a description via the salience
keyword arg.
Values for salience
must be drawn from the arcana.core.enum.ColumnSalience
enum:
primary - Primary input data, e.g. raw data or data reconstructed on the scanner
output - Results that would typically be used as main outputs in publications
supplementary - Derivatives that would typically only be provided in supplementary material
qa - Derivatives that would typically be only kept for quality assurance of analysis workflows
debug - Derivatives that would typically only need to be checked when debugging analysis workflows
temp - Data only temporarily stored to pass between pipelines
Descriptions and saliences can also be set for parameter attributes, where the
saliences are drawn from arcana.core.enum.ParameterSalience
enum.
debug - typically only needs to be altered for debugging
recommended - recommended to keep default value
dependent - can be dependent on the context of the analysis but default should work for most cases
check - the default should be at checked for validity for particular use case
arbitrary - a default is provided, but it is not clear which value is best
required - no sensible default value, the parameter should be set manually
With the exception of required parameters, default values should be provided
to the parameter specification via the default
keyword. The default
value should match the type of the parameter specification. Parameters can
be any of the following types:
float
int
bool
str
list[float]
list[int]
list[bool]
list[str]
See comprehensive_example L4-29 for examples of these attributes of column and parameter specifications.
Pipeline builders¶
“Pipeline builders” are called by Arcana to construct the Pydra workflows that
derive data columns. The arcana.core.mark.pipeline()
decorator is used to mark a method as a pipeline builder and specify the
columns the workflow it builds derives.
The first argument to a builder method is the Pipeline
object
that is being constructed. The initialisation of the pipeline and rows to iteract
with the data store are handled by Arcana, the builder method just needs to add
the rows that actually perform the analysis. Pipeline rows are added using
Pydra’s workflow syntax.
(the only exception being that the newly added row is returned from
Pipeline.add()
for convenience).
The remaining arguments to the builder should be named after any columns
and parameters that are required for the pipeline rows to be added. Arcana will
automagically provide LazyField
pointers to the arguments named after
column specs, and values to the arguments named after parameter specs.
For file formats with side cars, lazy-field pointers to side car
files can be accessed as attributes of the primary LazyField
, e.g.
from fileformats.field import Decimal
from fileformats.medimage import DicomCollection
from arcana.common import Clinical
from arcana.core import mark
from arcana.core.tasks.misc import ExtractFromJson
from arcana.core.data.salience import ColumnSalience as ds
@mark.analysis(Clinical)
class AnotherExampleAnalysis():
primary_image: DicomCollection = mark.column(
desc="The primary image set to be analysed",
salience=ds.primary)
repetition_time: Decimal = mark.column(
"The repetition time of the MR sequence used",
salience=ds.debug)
slice_timing_interval: Decimal = mark.column(
"The time interval between slices",
salience=ds.debug)
@mark.pipeline(repetition_time, slice_timing_interval)
def preprocess_pipeline(self, wf, primary_image: NiftiGzX):
wf.add(
ExtractFromJson(
name='extract_tr',
# JSON side car is accessed by an attribute of the primary image
in_file=primary_image.json,
field='tr'))
wf.add(
ExtractFromJson(
name='extract_st',
# JSON side car is accessed by an attribute of the primary image
in_file=primary_image.json,
x=wf.extract_tr.lzout.out,
field='SliceTiming'))
return wf.extract_tr.lzout.out, wf.extract_st.lzout.out
The “row_frequency” (see Spaces and Frames: Rows and Columns) of a pipeline,
(whether it is run per-session, per-subject, per-timepoint, etc… for example)
is determined by the row_frequency of its output columns. Therefore, all columns
derived from a single pipeline need to have the same row row_frequency. If the
row_frequency of an input column provided to the builder method is higher than that
of the pipeline then the lazy field provided will point to a list (sorted by the
axis IDs they are combined over) rather than a single value. If the row_frequency
of an input is lower than that of the pipeline then that value is simply
repeated. For example, an analysis of flood levels using datasets in the Weather
data space (see weather_example) to calculate the average rainfall per
station, could look like
import numpy
import pydra.mark
from arcana.weather.data import Weather # See example in Data spaces section
# A basic Pydra function task used in the analysis
@pydra.mark.task
def average(measurements: list[float]) -> float:
"A simple function task to convert daily to yearly figures"
return numpy.average(measurements)
# Another basic Pydra function task used in the analysis
@pydra.mark.task
def delta(measurements: list[float], average: float) -> list[float]:
"A simple function task to convert daily to yearly figures"
return list(numpy.asarray(measurements) - average)
@analysis(Weather)
class FloodAnalysis():
record_time: datetime = column(
desc="The time/date the recording was taken"
row_frequency='recording')
rain: float = column(
desc="Daily rain measurements at different locations",
row_frequency='recording')
avg_rainfall: float = column(
desc="Average rainfall for a given location",
row_frequency='station')
delta_rain: float = column(
desc="Deviation from average rainfall for a given month"
row_frequency='recording')
# Pipeline is of 'per-station' row_frequency due to row_frequency of output column
# 'avg_rainfall'
@pipeline(avg_rainfall)
# 'rain' arg is a lazy-field to a list[float] over all dates since the
# row_frequency of the 'rain' column ('recording') is higher than
# the pipeline's row_frequency ('station')
def average_rainfall_pipeline(self, wf: pydra.Workflow, rain: list[float]):
wf.add(
average(
name='average_rain',
measurements=rainfall))
return wf.average_rain.lzout.out
# Pipeline is of 'per-recording' row_frequency due to delta_rainfall
# output column
@pipeline(delta_rain)
def delta_pipeline(self, wf: pydra.Workflow, rain: float, avg_rainfall: float):
pipeline.add(
delta(
name="delta_rain",
measurements=rain,
average=avg_rainfall))
return wf.delta_rain.lzout.out
Output methods¶
“Output methods” take derivatives and produce the visualisations or tables to be included in publications or reports. Since these methods typically rely on graphical libraries, they are executed on the local workstation/row and therefore should not contain any heavy computations. The feature that differentiates them from a regular method is that they are accessible from the CLI
$ arcana derive output '/data/my-dataset' connectivity_matrix_plot \
--save '~/Documents/papers/my-connectivity-paper/' \
--option figsize 10,10
The arcana.core.mark.output
decorator is used to specify an output method
and the outputs that are generated by it. Output methods should take the
directory to save the outputs in as its first argument and use keyword
arguments for “options” of the method following that. The save directory
should have a default of None
, and display the results in the case that it
isn’t provided.
import matplotlib.pyplot as plt
from arcana.medimage.data import Clinical
@analysis(Clinical)
class ExampleAnalysis2():
...
@output
def connectivity_matrix_plot(self, save_dir: str=None, figsize: tuple[float]=(5, 5)):
"""Plots the connectivity matrix as an image
"""
plt.figure(figsize=figsize)
plt.imshow(self['connectivity_matrix'].data)
if save_dir:
plt.savefig(save_dir)
else:
plt.show()
Advanced¶
In every software framework, there are always corner cases that are more complicated than the basic logic can handle. In designing informatics frameworks, these challenges often arise when attempting to write portable workflows, due to slight differences in the data and and end goals of the application. This is particularly true in academia, where novelty is a key criteria. To address these requirements, this section introduces some more complex concepts, which can be used to customise and combine analysis methods into powerful new classes: class inheritance (Inheritance), conditional pipelines (Conditionals and switches), quality-control checks (Quality-control checks) and sub-analyses (Sub-analyses).
Inheritance¶
Given a toy example analysis class that has two text-file source columns, file1
and
file2
. The concat_pipeline
builds a workflow that generates data for the sink
column concatenated
and can be modified by the duplicates
parameter.
@analysis(Samples)
class Concat:
# Source columns
file1: Text = column("an arbitrary text file")
file2: Text = column("another arbitrary text file")
# Sink columns
concatenated: Text = column("the output of concatenating file1 and file2")
# Parameters
duplicates: int = parameter(
"the number of times to duplicate the concatenation", default=1
)
@pipeline(concatenated)
def concat_pipeline(self, wf, file1: Text, file2: Text, duplicates: int):
"""Concatenates the contents of `file1` with the contents of `file2` to produce
a new text file. The concatenation can be repeated multiple times within
the produced text file by specifying the number of repeats to the `duplicates`
parameter
"""
wf.add(
concatenate(
name="concat", in_file1=file1, in_file2=file2, duplicates=duplicates
)
)
return wf.concat.lzout.out # Output Pydra LazyField for concatenated file
The Concat
class can be subclassed to create the ExtendedConcat
class, which adds
one additional source column file3
and another sink column doubly_concatenated
.
Data for doubly_concatenated
is generated by the doubly_concat_pipeline
.
@analysis(Samples)
class ExtendedConcat(Concat):
# Source columns
file3: Text = column("Another file to concatenate")
# Sink columns
concatenated = inherit()
doubly_concatenated: Text = column("The doubly concatenated file")
# Parameters
duplicates = inherit(default=3)
@pipeline(doubly_concatenated)
def doubly_concat_pipeline(
self, wf, concatenated: Text, file3: Text, duplicates: int
):
wf.add(
concatenate(
name="concat",
in_file1=concatenated,
in_file2=file3,
duplicates=duplicates,
)
)
return wf.concat.lzout.out
Because the concatenated
column and duplicates
parameter are used in the
doubly_concat_pipeline
, they are explicitly referenced in the subclass using the
inherit_from
function. Note, that this is enforced due a design decision to make it
clear where columns and parameters are defined when reading the code. Columns that
aren’t explicitly referenced in the class (e.g. file1
and file2
) can be omitted
from the subclass definition (but will still be present in the subclass). When
explicitly inheriting columns and parameters it is possible to override their attributes,
such as the default value for a given parameter (see duplicates
in above example).
Conditionals and switches¶
There are cases where different analysis methods need to be applied depending on the requirements of a particular study or to deal with idiosyncrasies of a particular dataset. There are two mechanisms for handling such cases in Arcana: “condition expressions” and “switches”.
Both condition expressions and switches are referenced within the @pipeline
decorator.
When a condition expression or switch is set on a pipeline builder, that pipeline will
be used to generate data for a sink column only when certain criteria are met. If the criteria
aren’t met, then either the default pipeline builder (one without either a switch or
condition expression) will be used if it is present or an “not produced” error will be
raised instead.
The difference between a condition expression and a switch is that a condition expression is true or false over a whole dataset given a specific parameterisation, whereas a switch can be true or false for different rows of the dataset depending on the nature of the input data.
Condition expressions are specified as using the functions value_of(parameter)
and is_provided(column)
as placeholders for parameter values or whether a column
specification in the analysis is linked to a column in the dataset or not. In the
following example, a condition is used to enable the user whether concatenated
should be generated by the concat_pipeline
method (default) or
the reverse_concat_pipeline
by setting the value of the order
parameter.
@analysis(Samples)
class OverridenConcat(Concat):
# Source columns
file1: Zip[Text] = inherit()
file2: Text = inherit()
# Sinks columns
concatenated: Text = inherit()
# Parameters
duplicates = inherit(default=2) # default value changed because we can
order: str = parameter(
"perform the concatenation in reverse order, i.e. file2 and then file1",
choices=["forward", "reversed"],
default="forward",
)
@pipeline(
concatenated,
condition=(value_of(order) == "reversed"),
)
def reverse_concat_pipeline(
self, wf, file1: Text, file2: Text, duplicates: int
):
wf.add(
concatenate_reverse(
name="concat", in_file1=file1, in_file2=file2, duplicates=duplicates
)
)
return wf.concat.lzout.out
Switches are defined in methods of the analysis class using the @switch
decorator
and are similar pipeline builders in that they add nodes to a Pydra workflow passed to the
first argument. The sole output field of a switch must contain either be a boolean or
string, which specifies which branch of processing is to be performed. The switch
method is then passed to the @pipeline
decorator via the switch
keyword. If
the switch returns a string then the value passed to the switch
keyword must be
tuple, with the first element the switch method and the second the value of the string
that will activate that branch of the pipeline to be run.
In the following example, the contents of the files in the concatenated
column are
multiplied the value passed to the arbitrary multiplier
parameter if the contents of
the input files file1
and file2
are numeric for the corresponding row as
determined by the inputs_are_numeric
switch.
@analysis(Samples)
class ConcatWithSwitch(Concat):
# Source columns
file1: Zip[Text] = inherit()
file2: Text = inherit()
# Sink columns
concatenated: Text = inherit()
multiplied: Text = column("contents of the concatenated files are multiplied")
# Parameters
multiplier: int = parameter(
"the multiplier used to apply", salience=ps.arbitrary
)
@switch
def inputs_are_numeric(self, wf, file1: Text, file2: Text):
wf.add(contents_are_numeric(in_file=file1, name="check_file1"))
wf.add(contents_are_numeric(in_file=file2, name="check_file2"))
@pydra.mark.task
def boolean_and(val1, val2) -> bool:
return val1 and val2
wf.add(
boolean_and(
val1=wf.check_file1.out, val2=wf.check_file2.out, name="bool_and"
)
)
return wf.bool_and.out
@pipeline(multiplied, switch=inputs_are_numeric)
def multiply_pipeline(self, wf, concatenated, multiplier):
wf.add(
multiply_contents(
name="concat", in_file=concatenated, multiplier=multiplier
)
)
return wf.concat.lzout.out
Quality-control checks¶
When running complex analyses it is important to inspect generated derivatives to make sure the workflows completed properly. In Arcana, it is possible to semi-automate this process by adding quality-control “checks” to an analysis class.
In the following example the number of lines produced by the concatation step is checked
to see if it matches the number expected given the value of the duplicates
parameter.
@analysis(Samples)
class ConcatWithCheck(Concat):
# Sink columns
concatenated = inherit()
# Parameters
duplicates = inherit()
@check(concatenated, salience=CheckSalience.recommended)
def check_file3(self, wf, concatenated: Text, duplicates: int):
"""Checks the number of lines in the concatenated file to see whether they
match what is expected for the number of duplicates specified"""
@pydra.mark.task
def num_lines_equals(in_file, num_lines):
with open(in_file) as f:
contents = f.read()
if len(contents.splitlines()) == num_lines:
status = CheckStatus.probable_pass
else:
status = CheckStatus.failed
return status
wf.add(
num_lines_equals(
in_file=concatenated, num_lines=2 * duplicates, name="num_lines_check"
)
)
return wf.num_lines_check.out
Sub-analyses¶
When dealing with separate data streams that can be largely analysed in parallel
(e.g. multiple MRI contrasts), it can be convenient to combine multiple analyses tailored
to each stream into a single conglomerate analysis. This pattern can implemented in
Arcana using subanalysis
attributes.
The type annotation of the subanalysis
attribute specifies the analysis to be performed,
and the keyword arguments of specify mappings from the column specs and parameters
in the global namespace of the outer class to the namespace of the subanalysis. With these
mappings, source columns linked to specs in the global namespace can be passed to
the subanalysis, and sink columns generated by pipelines in the global namespace
can be linked to any column within the subanalysis.
The mapped_from
function is used to map columns and parameters from subanalyses into
the global namespace, and takes two arguments, the name of the subanalysis and the name
of the column/parameter to map. By mapping a column/parameter into the global namespace
from one subanalysis and then mapping it back into another subanalysis the designer
can be stitched together. For example, the cortical surface reconstruction column from
a subanalysis for analysing anatomical MRI images could be mapped to a source column
in another subanalysis for analysing white matter tracts diffusion-weighted contrast
MRI images in order to constrain the potential endpoints of the tracts.
In the following example, two of the classes defined above, ExtendedConcat
and
ConcatWithSwitch
are stitched together, so that the multiplied
output column of
ConcatWithSwitch
is passed to the file3
input column of ExtendedConcat
.
The duplicates
parameter in each subanalysis are linked together so they are always
consistent by mapping it from the ExtendedConcat
subanalysis to the global namespace
and then back into the ConcatWithSwitch
.
@analysis(Samples)
class _ConcatWithSubanalyses:
# Source columns mapped from "sub1" subanalysis so they can be shared across
# both sub-analyses. Note that they could just as easily have been mapped from
# "sub1" or recreated from scratch and mapped into both
file1 = map_from("sub1", "file1")
file2 = map_from("sub1", "file2")
# Sink columns generated within the subanalyses mapped back out to the global
# namespace so they can be mapped into the other subanalysis
concat_and_multiplied = map_from("sub2", "multiplied")
# Link the duplicates parameter across both subanalyses so it is always the same
# by mapping a global parameter into both subanalyses
common_duplicates = map_from(
"sub1", "duplicates", default=5, salience=ps.check
)
# Additional parameters such as "multiplier" can be accessed within the subanalysis
# class after the analysis class has been initialised using the 'sub2.multiplier'
sub1: ExtendedConcat = subanalysis(
"sub-analysis to add the 'doubly_concat' pipeline",
# Feed the multiplied sink column from sub2 into the source column file3 of
# the extended class
file3=concat_and_multiplied,
)
sub2: ConcatWithSwitch = subanalysis(
"sub-analysis to add the 'multiply' pipeline",
file1=file1,
file2=file2,
# Use the concatenated generated by sub1 to avoid running it twice
duplicates=common_duplicates,
)