Source code for ymp.stage.groupby
"""Implements forward grouping
Grouping allows processing multiple input datasets at once, such as in
a co-assembly. It is initiated by adding the virtual stage
"group_<COL>" directly before the stage that should be grouping its
output. "<COL>" may be a project data column, in which case all data
for which column COL shares a value will be combined, or "ALL", which
combines all samples. The output filename prefix will be either the
column value or "ALL".
>>> ymp make mock.group_sample.assemble_megahit
>>> ymp make mock.group_ALL.assemble_megahit
Subsequent stages will use the most finegrained grouping required by
their input data.
# FIXME: How to avoid re-specifying groupby?
"""
import logging
from typing import List
from ymp.stage.base import BaseStage
from ymp.exceptions import YmpStageError
log = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]class GroupBy(BaseStage):
"""Virtual stage for grouping"""
PREFIX = "group_"
def __init__(self, name: str) -> None:
super().__init__(name)
[docs] def modify_next_group(
self,
stack: "StageStack",
) -> List[str]:
name = stack.stage_names[-1]
if not self.match(name):
raise YmpStageError(f"Internal Error: {name} not a group?")
# fetch directly previous grouoing
if stack.prev_stack is not None:
group = stack.prev_stack.stage.modify_next_group(stack.prev_stack) or []
else:
group = []
group_name = name[len(self.PREFIX):]
if group_name == "ALL":
if group:
raise YmpStageError("Regrouping to ALL means previous group statement has no effect")
elif group_name == "BIN":
group += ["__bin__"]
else:
group += [group_name]
return group
[docs] def get_group(
self,
stack: "StageStack",
default_groups: List[str],
) -> List[str]:
return []
[docs] def match(self, name: str) -> bool:
return name.startswith(self.PREFIX)