Source code for ymp.stage.pipeline

"""Pipelines Module

Contains classes for pre-configured pipelines comprising multiple
stages.
"""

import logging
import os

from typing import Dict, List, Set

from ymp.stage import StageStack, find_stage
from ymp.stage.base import ConfigStage
from ymp.exceptions import YmpConfigError


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class Pipeline(ConfigStage): """ A virtual stage aggregating a sequence of stages, i.e. a pipeline or sub-workflow. Pipelines are configured via ``ymp.yml``. Example: pipelines: my_pipeline: - stage_1 - stage_2 - stage_3 """ def __init__(self, name: str, cfg: List[str]) -> None: super().__init__(name, cfg) self.stage_names: List[str] = cfg self._outputs: Dict[str, str] = None @property def outputs(self) -> Dict[str, str]: """The outputs of a pipeline are the sum of the outputs of each component stage. Outputs of stages further down the pipeline override those generated earlier. TODO: Allow hiding the output of intermediary stages. """ outputs = {} path = "" for stage_name in self.stage_names: stage = find_stage(stage_name) path = ".".join((path, stage_name)) stage_outputs = stage.outputs if isinstance(stage_outputs, set): outputs.update({output: path for output in stage_outputs}) else: outputs.update(stage_outputs) return outputs
[docs] def can_provide(self, inputs: Set[str]) -> Dict[str, str]: """Determines which of ``inputs`` this stage can provide. The result dictionary values will point to the "real" output. """ return { output: path for output, path in self.outputs.items() if output in inputs }
@property def pipeline(self): return "." + ".".join(self.stage_names)
[docs] def get_path(self, stack): prefix = stack.name.rsplit('.',1)[0] return prefix + self.pipeline
[docs] def get_all_targets(self, stack): targets = [] # First add the symlink for ourselves, but only if it # does not exist yet, due to a bug in Snakemake 5.20.1. if not os.path.exists(stack.name): targets += [stack.name] # Now add the target the last part of the pipeline # points to. realstack = stack.get(self.get_path(stack)) targets.extend(realstack.stage.get_all_targets(realstack)) return targets