"""Pipelines Module
Contains classes for pre-configured pipelines comprising multiple
stages.
"""
import logging
import os
from collections import OrderedDict
from collections.abc import Mapping
from typing import Dict, List, Set, Optional
from ymp.stage import StageStack, find_stage
from ymp.stage.base import ConfigStage
from ymp.stage.params import Parametrizable
from ymp.exceptions import YmpConfigError, YmpRuleError
log = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]class Pipeline(Parametrizable, ConfigStage):
"""
A virtual stage aggregating a sequence of stages, i.e. a pipeline
or sub-workflow.
Pipelines are configured via ``ymp.yml``.
Example:
pipelines:
my_pipeline:
hide: false
params:
length:
key: L
type: int
default: 20
stages:
- stage_1{length}:
hide: true
- stage_2
- stage_3
"""
def __init__(self, name: str, cfg) -> None:
super().__init__(name, cfg)
self._params = None
self._outputs: Optional[Dict[str, str]] = None
#: If true, outputs of stages are hidden by default
self.hide_outputs = getattr(cfg, "hide", False)
if 'params' in cfg and cfg.params is not None:
if not isinstance(cfg.params, Mapping):
raise YmpConfigError(cfg, "Params must contain a mapping", key="params")
self._init_params(cfg.params)
#: Dictionary of stages with configuration options for each
self.stages = OrderedDict()
path = ""
if not "stages" in cfg:
raise YmpConfigError(cfg, "Pipeline must have stages entry")
for stage in cfg.stages:
if stage is None:
raise YmpConfigError(self, f"Empty stage name in pipeline '{name}'")
if isinstance(stage, str):
stage_name = stage
stage_cfg = {}
else:
stage_name = next(iter(stage))
stage_cfg = stage[stage_name]
path = ".".join((path, stage_name))
self.stages[path] = stage_cfg
#: Path fragment describing this pipeline
self.pipeline = path
def _init_params(self, params):
for param, data in params.items():
if not isinstance(data, Mapping):
raise YmpConfigError(data, "Param must contain a mapping", key=param)
try:
key = data['key']
typ = data['type']
except KeyError as exc:
raise YmpConfigError(data, "Param must have at least key and type defined") from exc
self.add_param(
key, typ, param, data.get("value"), data.get("default")
)
@property
def params(self):
if self._params is None:
params = {}
for stage_path in self.stages:
stage_name = stage_path.rsplit(".", 1)[-1]
if "{" in stage_name:
# Cannot inherit params from stages with param wildcard in name
continue
stage = find_stage(stage_name)
if not isinstance(stage, Parametrizable):
continue
stage_params = stage.parse(stage_name)
for param in stage.params:
try:
default = stage_params.get(param.name, param.default)
self.add_param(param.key, param.type_name, param.name, param.value, default)
params.setdefault(stage_path, []).append(param.name)
except YmpRuleError:
pass
self._params = params
return super().params
[docs] def get_path(self, stack, typ=None):
pipeline_parameters = self.parse(stack.stage_name)
param_map = {
key.format(**pipeline_parameters): value
for key, value in self._params.items()
}
if typ is None:
pipeline = self.pipeline
else:
pipeline = self.outputs[typ]
pipeline = pipeline.format(**pipeline_parameters)
stages = []
path = ""
for stage_name in pipeline.lstrip(".").split("."):
path = ".".join((path, stage_name))
takes_params = param_map.get(path)
if not takes_params:
stages.append(stage_name)
continue
stage = find_stage(stage_name)
stage_parameters = stage.parse(stage_name)
for param in takes_params:
if param in pipeline_parameters:
stage_parameters[param] = pipeline_parameters[param]
stages.append(stage.format(stage_parameters))
prefix = stack.name.rsplit(".", 1)[0]
return ".".join([prefix]+stages)
def _make_outputs(self) -> Dict[str, str]:
outputs = {}
for stage_path, cfg in self.stages.items():
if cfg.get("hide", self.hide_outputs):
continue
stage_name = stage_path.rsplit(".", 1)[-1]
stage = find_stage(stage_name)
new_outputs = stage.get_outputs(stage_path)
outputs.update(new_outputs)
return outputs
@property
def outputs(self) -> Dict[str, str]:
"""The outputs of a pipeline are the sum of the outputs
of each component stage. Outputs of stages further down
the pipeline override those generated earlier.
"""
if self._outputs is None:
self._outputs = self._make_outputs()
return self._outputs
[docs] def can_provide(self, inputs: Set[str]) -> Dict[str, str]:
"""Determines which of ``inputs`` this stage can provide.
The result dictionary values will point to the "real" output.
"""
res = {
output: path
for output, path in self.outputs.items()
if output in inputs
}
return res
[docs] def get_all_targets(self, stack):
targets = []
# First add the symlink for ourselves, but only if it
# does not exist yet, due to a bug in Snakemake 5.20.1.
if not os.path.exists(stack.name):
targets += [stack.name]
# Now add the target the last part of the pipeline
# points to.
realstack = stack.instance(self.get_path(stack))
targets.extend(realstack.stage.get_all_targets(realstack))
return targets
[docs] def get_group(
self,
stack: "StageStack",
default_groups: List[str]
) -> List[str]:
realstack = stack.instance(self.get_path(stack))
return realstack.stage.get_group(realstack, default_groups)
[docs] def get_ids(self, stack, groups, mygroups=None, target=None):
realstack = stack.instance(self.get_path(stack))
return realstack.stage.get_ids(realstack, groups, mygroups, target)