"""
Extends Snakemake Features
"""
import functools
import logging
import re
import sys
from copy import copy, deepcopy
from inspect import Parameter, signature, stack
from typing import Optional
from snakemake.exceptions import CreateRuleException, RuleException
from snakemake.io import AnnotatedString, apply_wildcards, \
strip_wildcard_constraints
from snakemake.io import Namedlist as _Namedlist
from snakemake.rules import Rule
from snakemake.workflow import RuleInfo, Workflow
import ymp
from ymp.common import ensure_list, flatten, is_container
from ymp.exceptions import YmpRuleError
from ymp.string import ProductFormatter, make_formatter
log = logging.getLogger(__name__) # pylint: disable=invalid-name
partial_formatter = make_formatter(partial=True, quoted=True)
partial_format = partial_formatter.format
get_names = partial_formatter.get_names
[docs]def check_snakemake() -> bool:
prev_result = getattr("check_snakemake", "result", None)
if prev_result is not None:
return prev_result
import snakemake
check_snakemake.result = snakemake.__version__ in ymp.snakemake_versions
if not check_snakemake.result:
log.fatal("YMP-%s was not verified to work with Snakemake-%s",
ymp.__version__, snakemake.__version__)
return check_snakemake.result
[docs]def networkx():
import networkx
if networkx.__version__[0] != "2":
log.fatal("Networkx version 2.* required by YMP but {} found"
"".format(networkx.__version__))
sys.exit(1)
return networkx
[docs]def print_ruleinfo(rule: Rule, ruleinfo: RuleInfo, func=log.debug):
"""Logs contents of Rule and RuleInfo objects.
Arguments:
rule: Rule object to be printed
ruleinfo: Matching RuleInfo object to be printed
func: Function used for printing (default is log.error)
"""
func("rule {}".format({'n': rule.name,
'l': rule.lineno,
's': rule.snakefile}))
for attr in dir(ruleinfo):
if attr.startswith("__"):
continue
func(" {}: {}".format(attr,
getattr(ruleinfo, attr, "")))
func(ruleinfo.func.__code__)
[docs]def load_workflow(snakefile):
"Load new workflow"
return ExpandableWorkflow.load_workflow(snakefile)
[docs]def get_workflow():
"Get active workflow, loading one if necessary"
return ExpandableWorkflow.ensure_global_workflow()
[docs]class ExpandLateException(Exception):
pass
[docs]class CircularReferenceException(YmpRuleError):
"""Exception raised if parameters in rule contain a circular reference"""
def __init__(self, deps, rule):
nodes = [n[0] for n in networkx().find_cycle(deps)]
message = "Circular reference in rule {}\n'{}'".format(
rule, " => ".join(nodes + [nodes[0]]))
rule.filename = rule.snakefile
super().__init__(rule, message)
[docs]class InheritanceException(RuleException):
"""Exception raised for errors during rule inheritance"""
def __init__(self, msg, rule, parent,
include=None, lineno=None, snakefile=None):
message = "'{}' when deriving {} from {}".format(
msg, rule.name, parent)
super().__init__(message=message,
include=include,
lineno=lineno,
snakefile=snakefile,
rule=rule)
[docs]class NamedList(_Namedlist):
"""Extended version of Snakemake's :class:`~snakemake.io.Namedlist`
- Fixes array assignment operator:
Writing a field via ``[]`` operator updates the value accessed
via ``.`` operator.
- Adds ``fromtuple`` to constructor:
Builds from Snakemake's typial ``(args, kwargs)`` tuples as
present in ruleinfo structures.
- Adds `update_tuple` method:
Updates values in ``(args,kwargs)`` tuples as present in
:class:`ruleinfo` structures.
"""
[docs] def get_names(self, *args, **kwargs):
"""Export ``get_names`` as public func"""
return self._get_names(*args, *kwargs)
def __init__(self, fromtuple=None, **kwargs):
super().__init__(**kwargs)
self._fromtuple = fromtuple
if fromtuple:
for value in fromtuple[0]:
self.append(value)
for key, value in fromtuple[1].items():
if is_container(value):
start = len(self)
for subvalue in value:
self.append(subvalue)
self._set_name(key, start, len(self))
else:
self.append(value)
self._add_name(key)
def __setitem__(self, idx, value):
# set value in list
super().__setitem__(idx, value)
# set value in attributes (copied references)
for name, (i, j) in self.get_names():
if idx >= i and (j is None or idx < j):
self._set_name(name, i, j)
[docs] def update_tuple(self, totuple):
"""Update values in ``(args, kwargs)`` tuple.
The tuple must be the same as used in the constructor and
must not have been modified.
"""
args, kwargs = totuple
for n, value in enumerate(args):
if args[n] != self[n]:
args[n] = self[n]
for key, value in kwargs.items():
start, end = self._names[key]
if end:
assert is_container(value)
for k, j in enumerate(range(start, end)):
if kwargs[key][k] != self[j]:
kwargs[key][k] = self[j]
else:
if kwargs[key] != self[start]:
kwargs[key] = self[start]
#: describes attributes of :py:class:`snakemake.workflow.RuleInfo`
ruleinfo_fields = {
'wildcard_constraints': {
'format': 'argstuple', # len(t[0]) must be == 0
},
'input': {
'format': 'argstuple',
'funcparams': ('wildcards',),
'apply_wildcards': True,
},
'output': {
'format': 'argstuple',
'apply_wildcards': True,
},
'threads': {
'format': 'int',
'funcparams': ('wildcards', 'input', 'attempt', 'threads')
# stored as resources._cores
},
'resources': {
'format': 'argstuple', # len(t[0]) must be == 0, t[1] must be ints
'funcparams': ('wildcards', 'input', 'attempt', 'threads'),
},
'params': {
'format': 'argstuple',
'funcparams': ('wildcards', 'input', 'resources', 'output', 'threads'),
'apply_wildcards': True,
},
'shadow_depth': {
'format': 'string_or_true',
},
'priority': {
'format': 'numeric',
},
'version': {
'format': 'object',
},
'log': {
'format': 'argstuple',
'apply_wildcards': True,
},
'message': {
'format': 'string',
'format_wildcards': True,
},
'benchmark': {
'format': 'string',
'apply_wildcards': True,
},
'wrapper': {
'format': 'string',
# sets conda_env
},
'conda_env': {
'format': 'string', # path, relative to cwd or abs
'apply_wildcards': True,
# works only with shell/script/wrapper, not run
},
'container_img': {
'format': 'string',
# works ony with shell/script/wrapper, not run
},
'shellcmd': {
'format': 'string',
'format_wildcards': True
},
'docstring': {
'format': 'string',
},
'norun': { # does the rule have executable data?
'format': 'bool',
},
'func': {
'format': 'callable',
},
'script': {
'format': 'string',
}
# restart_times
# env_modules
# shadow_depth
# group
# notebook
# cwl
# cache
}
[docs]class ExpandableWorkflow(Workflow):
"""Adds hook for additional rule expansion methods to Snakemake"""
global_workflow = None
__expanders = []
[docs] @classmethod
def activate(cls):
"""Installs the ExpandableWorkflow
Replaces the Workflow object in the snakemake.workflow module
with an instance of this class and initializes default expanders
(the snakemake syntax).
"""
check_snakemake()
try:
from snakemake.workflow import workflow
except ImportError:
workflow = None
if workflow and workflow.__class__ != ExpandableWorkflow:
# Monkey patch the existing, initialized workflow class
workflow.__class__ = ExpandableWorkflow
# ExpandableWorkflow.__init__ understands it may be
# called on already initialized superclass, so this works:
workflow.__init__()
cls.global_workflow = workflow
# Remove log stream handler installed by Snakemake
from snakemake.logging import logger, ColorizingStreamHandler
for handler in logger.logger.handlers:
if isinstance(handler, ColorizingStreamHandler):
logger.logger.removeHandler(handler)
[docs] @classmethod
def load_workflow(cls, snakefile=ymp._snakefile):
workflow = cls(snakefile=snakefile)
cls.global_workflow = workflow
workflow.include(snakefile)
return workflow
[docs] @classmethod
def ensure_global_workflow(cls):
if cls.global_workflow is None:
log.debug("Trying to activate %s...", cls.__name__)
cls.activate()
if cls.global_workflow is None:
log.debug("Failed; loading default workflow")
cls.load_workflow()
return cls.global_workflow
def __init__(self, *args, **kwargs):
"""Constructor for ExpandableWorkflow overlay attributes
This may be called on an already initialized Workflow object.
"""
# Only call super().__init__ if that hasn't happened yet
# (as indicated by no instance attributes written to __dict__)
if not self.__dict__:
# only call constructor if this object hasn't been initialized yet
super().__init__(*args, **kwargs)
# There can only be one
ExpandableWorkflow.global_workflow = self
for expander in self.__expanders:
expander.link_workflow(self)
self._ruleinfos = {}
self._last_rule_name = None
[docs] @classmethod
def register_expanders(cls, *expanders):
"""
Register an object the expand() function of which will be called
on each RuleInfo object before it is passed on to snakemake.
"""
cls.__expanders = expanders
if cls.global_workflow:
for expander in cls.__expanders:
expander.link_workflow(cls.global_workflow)
[docs] @classmethod
def clear(cls):
if cls.global_workflow:
cls.global_workflow = None
# make sure there is no workflow in snakemake either
# (we try to load that in activate())
import snakemake.workflow
snakemake.workflow.workflow = None
[docs] def add_rule(self, name=None, lineno=None, snakefile=None, checkpoint=False):
"""Add a rule.
Arguments:
name: name of the rule
lineno: line number within the snakefile where the rule was defined
snakefile: name of file in which rule was defined
"""
# super().add_rule() dynamically creates a name if `name` is None
# stash the name so we can access it from `get_rule`
self._last_rule_name = super().add_rule(name, lineno, snakefile)
return self._last_rule_name
[docs] def get_rule(self, name=None):
"""
Get rule by name. If name is none, the last created rule is returned.
Arguments:
name: the name of the rule
"""
if name is None:
name = self._last_rule_name
return super().get_rule(name)
[docs] def rule(self, name=None, lineno=None, snakefile=None, checkpoint=None):
"""Intercepts "rule:"
Here we have the entire ruleinfo object
"""
decorator = super().rule(name, lineno, snakefile, checkpoint)
rule = self.get_rule(name)
def decorate(ruleinfo):
if ymp.print_rule == 1:
log.error("#### BEGIN expansion")
print_ruleinfo(rule, ruleinfo, log.error)
rule._ymp_print_rule = True
for expander in reversed(self.__expanders):
expander.expand(rule, ruleinfo)
if ymp.print_rule == 1:
log.error("### expanded with " + type(expander).__name__)
print_ruleinfo(rule, ruleinfo, log.error)
if ymp.print_rule:
log.error("#### END expansion")
# Conditionaly dump rule after YMP formatting
if ymp.print_rule == 1:
ymp.print_rule = 0
# register rule with snakemake
try:
decorator(ruleinfo) # does not return anything
except AttributeError:
print_ruleinfo(rule, ruleinfo, log.error)
raise
return decorate
[docs]def make_rule(name: str=None, lineno: int=None, snakefile: str=None,
**kwargs):
log.debug("Synthesizing rule {}".format(name))
ruleinfo = RuleInfo(lambda: None)
for arg in kwargs:
setattr(ruleinfo, arg, kwargs[arg])
ruleinfo.norun = True
workflow = get_workflow()
try:
return workflow.rule(name, lineno, snakefile)(ruleinfo)
except CreateRuleException:
log.debug(" failed. Rule already exists?")
return None
[docs]class BaseExpander(object):
"""
Base class for Snakemake expansion modules.
Subclasses should override the :meth:expand method if they need to
work on the entire RuleInfo object or the :meth:format and
:meth:expands_field methods if they intend to modify specific fields.
"""
def __init__(self):
self.workflow = None
[docs] def link_workflow(self, workflow):
"""Called when the Expander is associated with a workflow
May be called multiple times if a new workflow object is created.
"""
log.debug("Linking %s with %s",
self.__class__.__name__, workflow.__class__.__name__)
self.workflow = workflow
[docs] def expands_field(self, field):
"""Checks if this expander should expand a Rule field type
Arguments:
field: the field to check
Returns:
True if *field* should be expanded.
"""
return False
[docs] def expand(self, rule, item, expand_args=None, rec=-1, cb=False):
"""Expands RuleInfo object and children recursively.
Will call :meth:format (via :meth:format_annotated) on `str` items
encountered in the tree and wrap encountered functions to be called
once the wildcards object is available.
Set ``ymp.print_rule = 1`` before a ``rule:`` statement in snakefiles
to enable debug logging of recursion.
Arguments:
rule: The :class:snakemake.rules.Rule object to be populated with
the data from the RuleInfo object passed from *item*
item: The item to be expanded. Initially a
:class:snakemake.workflow.RuleInfo object into which is recursively
decendet. May ultimately be `None`, `str`, `function`, `int`,
`float`, `dict`, `list` or `tuple`.
expand_args: Parameters passed on late expansion (when the ``dag``
tries to instantiate the `rule` into a ``job``.
rec: Recursion level
"""
rec = rec + 1
if expand_args is None:
expand_args = {}
debug = ymp.print_rule or getattr(rule, "_ymp_print_rule", False)
if debug:
log.debug("{}{}: ({}) {} in rule {} with args {}"
"".format(" "*rec*4, type(self).__name__,
type(item).__name__, item, rule, expand_args))
if item is None:
item = None
elif isinstance(item, RuleInfo):
item = self.expand_ruleinfo(rule, item, expand_args, rec)
elif isinstance(item, str):
item = self.expand_str(rule, item, expand_args, rec, cb)
elif hasattr(item, '__call__'):
item = self.expand_func(rule, item, expand_args, rec, debug)
elif isinstance(item, int) or isinstance(item, float):
pass
elif isinstance(item, dict):
item = self.expand_dict(rule, item, expand_args, rec)
elif isinstance(item, list):
item = self.expand_list(rule, item, expand_args, rec, cb)
elif isinstance(item, tuple):
item = self.expand_tuple(rule, item, expand_args, rec, cb)
else:
log.debug("Not expanding item '{}' of type {}".format(
repr(item), type(item)))
if debug:
log.debug("{}=> {} {}"
"".format(" "*(rec*4), type(item).__name__, item))
return item
[docs] def expand_ruleinfo(self, rule, item, expand_args, rec):
self.current_rule = rule
for field in filter(self.expands_field, ruleinfo_fields):
expand_args['field'] = field
attr = getattr(item, field)
value = self.expand(rule, attr, expand_args=expand_args, rec=rec)
setattr(item, field, value)
self.current_rule = None
return item
[docs] def expand_str(self, rule, item, expand_args, rec, cb):
expand_args['rule'] = rule
try:
return self.format_annotated(item, expand_args)
except (KeyError, TypeError, ExpandLateException):
# avoid recursion:
if cb:
raise
def item_wrapped(wc):
return self.expand(rule, item,
expand_args={'wc': wc, 'rule': rule},
cb=True)
return item_wrapped
[docs] def expand_func(self, rule, item, expand_args, rec, debug):
@functools.wraps(item)
def late_expand(*args, **kwargs):
if debug:
log.debug("{}{} late {} {} ".format(
" "*rec*4, type(self).__name__, args, kwargs))
res = self.expand(rule, item(*args, **kwargs),
expand_args={'wc': args[0]}, rec=rec, cb=True)
if debug:
log.debug("{}=> '{}'".format(" "*rec*4, res))
return res
return late_expand
def _make_list_wrapper(self, value):
def wrapper(*args, **kwargs):
res = []
for subitem in value:
if callable(subitem):
subparms = signature(subitem).parameters
extra_args = {
k: v
for k, v in kwargs.items()
if k in subparms
}
res.append(subitem(*args, **extra_args))
else:
res.append(subitem)
return res
# Gather the arguments
parms = tuple(set(flatten([
list(signature(x).parameters.values())
for x in value if callable(x)
])))
# Rewrite signature
wrapper.__signature__ = signature(wrapper).replace(parameters=parms)
return wrapper
[docs] def expand_dict(self, rule, item, expand_args, rec):
path = expand_args.get('path', list())
for key, value in item.items():
expand_args['path'] = path + [key]
value = self.expand(rule, value, expand_args=expand_args, rec=rec)
# Snakemake can't have functions in lists in dictionaries.
# Let's fix that, even if we have to jump a lot of hoops here.
if isinstance(value, list) and any(callable(x) for x in value):
item[key] = self._make_list_wrapper(value)
else:
item[key] = value
return item
[docs] def expand_list(self, rule, item, expand_args, rec, cb):
path = expand_args.get('path', list())
res = list()
for n, subitem in enumerate(item):
expand_args['path'] = path + [str(n)]
res.append(self.expand(rule, subitem, expand_args=expand_args,
rec=rec, cb=cb))
return res
[docs] def expand_tuple(self, rule, item, expand_args, rec, cb):
return tuple(self.expand_list(rule, item, expand_args, rec, cb))
[docs]class SnakemakeExpander(BaseExpander):
"""Expand wildcards in strings returned from functions.
Snakemake does not do this by default, leaving wildcard expansion to
the functions provided themselves. Since we never want ``{input}`` to be
in a string returned as a file, we expand those always.
"""
[docs] def expands_field(self, field):
return field in ('input', 'output')
[docs]class ColonExpander(FormatExpander):
"""
Expander using ``{:xyz:}`` formatted variables.
"""
regex = re.compile(
r"""
\{:
(?=(
\s*
(?P<name>(?:.(?!\s*\:\}))*.)
\s*
))\1
:\}
""", re.VERBOSE)
spec = "{{:{}:}}"
def __init__(self):
super().__init__()
[docs]class RecursiveExpander(BaseExpander):
"""Recursively expands ``{xyz}`` wildcards in Snakemake rules."""
[docs] def expands_field(self, field):
"""
Returns true for all fields but ``shell:``, ``message:`` and
``wildcard_constraints``.
We don't want to mess with the regular expressions in the fields
in ``wildcard_constraints:``, and there is little use in expanding
``message:`` or ``shell:`` as these already have all wildcards applied
just before job execution (by :meth:`format_wildcards`).
"""
return field not in (
'shellcmd',
'message',
'wildcard_constraints'
)
[docs] def expand(self, rule, ruleinfo):
"""Recursively expand wildcards within :class:`RuleInfo` object"""
fields = list(filter(None.__ne__,
filter(self.expands_field, ruleinfo_fields)))
# normalize field values and create namedlist dictionary
args = {}
for field in fields:
attr = getattr(ruleinfo, field)
if isinstance(attr, tuple):
if len(attr) != 2:
raise Exception("Internal Error")
# flatten named lists
for key in attr[1]:
if is_container(attr[1][key]):
attr[1][key] = list(flatten(attr[1][key]))
# flatten unnamed and overwrite tuples
# also turn attr[0] into a list, making it mutable
attr = (list(flatten(attr[0])), attr[1])
setattr(ruleinfo, field, attr)
args[field] = NamedList(fromtuple=attr)
else:
args[field] = NamedList()
args[field].append(attr)
# build graph of expansion dependencies
deps = networkx().DiGraph()
for field, nlist in args.items():
for n, value in enumerate(nlist):
if not isinstance(value, str): # only strings can be expanded
continue
s = "{}[{}]".format(field, n)
# create node for value itself
deps.add_node(s, core=True, name=field, idx=n)
# node depends on wildcards contained in value
deps.add_edges_from((s, t)
for t in get_names(value)
if t.split(".")[0].split("[")[0] in fields)
# field node depends on all it's value nodes
deps.add_edge(field, s)
# create edges field.name -> field[n]
for name, (i, j) in nlist.get_names():
s = "{}.{}".format(field, name)
if j is None:
j = i + 1
deps.add_edges_from((s, "{}[{}]".format(field, n))
for n in range(i, j))
# sort variables so that they can be expanded in order
try:
nodes = list(reversed([
node
for node in networkx().algorithms.dag.topological_sort(deps)
if deps.out_degree(node) > 0 and 'core' in deps.nodes[node]
]))
except networkx().NetworkXUnfeasible:
raise CircularReferenceException(deps, rule) from None
# expand variables
for node in nodes:
var_name = deps.nodes[node]['name']
var_idx = deps.nodes[node]['idx']
value = args[var_name][var_idx]
if not isinstance(value, str):
continue
# format what we can
valnew = partial_format(value, **args)
# check if any remaining wilcards refer to rule fields
names = [re.split(r'\.|\[', name, maxsplit=1)[0]
for name in get_names(valnew)]
field_names = ruleinfo_fields[var_name].get('funcparams', [])
parm_names = [name for name in field_names if name in names]
if parm_names:
# Snakemake won't expand wildcards in output of functions,
# so we need to format everything here
def late_recursion(val, fparms):
def wrapper(wildcards, **kwargs):
# no partial here, fail if anything left
return strip_wildcard_constraints(val).format(
**kwargs, **wildcards)
# adjust the signature so that snakemake will pass us
# everything we need
parms = (Parameter(pname, Parameter.POSITIONAL_OR_KEYWORD)
for pname in fparms)
newsig = signature(wrapper).replace(parameters=parms)
wrapper.__signature__ = newsig
return wrapper
valnew = late_recursion(valnew, parm_names)
args[var_name][var_idx] = valnew
if ymp.print_rule == 1:
log.debug("{}::{}: {} => {}".format(rule.name,
node, value, valnew))
# update ruleinfo
for name in fields:
attr = getattr(ruleinfo, name)
if isinstance(attr, tuple):
if len(attr) != 2:
raise Exception("Internal Error")
args[name].update_tuple(attr)
else:
setattr(ruleinfo, name, args[name][0])
[docs]class InheritanceExpander(BaseExpander):
"""Adds class-like inheritance to Snakemake rules
To avoid redundancy between closely related rules, e.g. rules for
single ended and paired end data, YMP allows Snakemake rules
to inherit from another rule.
Example:
.. code-block: snakemake
rule count_reads:
input: "{file}.R1.fq.gz", "{file}.R2.fq.gz"
output: "{file}.readcount.txt"
shell: "gunzip -c {input} | wc -l > {output}"
rule count_reads_SE: # ymp: extends count_reads
input: "{file}.fq.gz"
Derived rules are always created with an implicit ``ruleorder`` statement,
making Snakemake prefer the parent rule if either parent or child rule
could be used to generate the requested output file(s).
Derived rules initially contain the same attributes as the parent rule.
Each attribute assigned to the child rule overrides the matching attribute
in the parent. Where attributes may contain named and unnamed values,
specifying a named value overrides only the value of that name while
specifying an unnamed value overrides all unnamed values in the parent
attribute.
"""
# FIXME: link to http://snakemake.readthedocs.io/en/latest/snakefiles/
# rules.html#handling-ambiguous-rules
#: Comment keyword enabling inheritance
KEYWORD = "ymp: extends"
def __init__(self):
super().__init__()
self.ruleinfos = {}
self.snakefiles = {}
self.linemaps = None
[docs] def get_code_line(self, rule: Rule) -> str:
"""Returns the source line defining *rule*"""
# Load and cache Snakefile
if rule.snakefile not in self.snakefiles:
try:
with open(rule.snakefile, "r") as sf:
self.snakefiles[rule.snakefile] = sf.readlines()
except IOError:
raise Exception("Can't parse ...")
# `rule.lineno` refers to compiled code. Convert to source line number.
if self.linemaps is None:
self.linemaps = ExpandableWorkflow.global_workflow.linemaps
real_lineno = self.linemaps[rule.snakefile][rule.lineno]
return self.snakefiles[rule.snakefile][real_lineno - 1]
[docs] def get_super(self, rule: Rule, ruleinfo: RuleInfo) -> Optional[RuleInfo]:
"""Find rule parent
Args:
rule: Rule object being built
ruleinfo: RuleInfo object describing rule being built
Returns:
2-Tuple: name of parent rule and RuleInfo describing parent rule
or (None, None).
"""
self.ruleinfos[rule.name] = ruleinfo # stash original ruleinfos
if hasattr(ruleinfo, 'parent'):
return ruleinfo.parent.name, self.ruleinfos[ruleinfo.parent.name]
line = self.get_code_line(rule)
if "#" in line:
comment = line.split("#")[1].strip()
if comment.startswith(self.KEYWORD):
superrule_name = comment[len(self.KEYWORD):].strip()
try:
return superrule_name, self.ruleinfos[superrule_name]
except KeyError:
raise InheritanceException("Unable to find parent",
rule, superrule_name)
return None, None
[docs] def expand(self, rule, ruleinfo):
super_name, super_ruleinfo = self.get_super(rule, ruleinfo)
if super_ruleinfo is None:
return
base_ruleinfo = deepcopy(super_ruleinfo)
if not ruleinfo.norun: # deriving rule is runnable, clear out base
base_ruleinfo.shellcmd = None
base_ruleinfo.wrapper = None
base_ruleinfo.script = None
base_ruleinfo.func = None
elif not base_ruleinfo.norun: # base is runnable, clear our deriving
ruleinfo.norun = False
ruleinfo.shellcmd = None
ruleinfo.wrapper = None
ruleinfo.script = None
ruleinfo.func = None
for field in dir(ruleinfo):
if field.startswith("__") or field == "parent":
continue
base_attr = getattr(base_ruleinfo, field)
override_attr = getattr(ruleinfo, field)
if isinstance(override_attr, tuple):
if base_attr is None:
base_attr = ([], {})
if override_attr[0]:
base_attr = (override_attr[0], base_attr[1])
if override_attr[1]:
base_attr[1].update(override_attr[1])
elif override_attr is not None:
base_attr = override_attr
setattr(ruleinfo, field, base_attr)
if not super_ruleinfo.norun:
if super_name in self.ruleinfos:
self.workflow.ruleorder(super_name, rule.name)
[docs]class DefaultExpander(InheritanceExpander):
"""
Adds default values to rules
The implementation simply makes all rules inherit from a defaults
rule.
"""
def __init__(self, **kwargs):
"""
Creates DefaultExpander
Each parameter passed is considered a RuleInfo default value. Where
applicable, Snakemake's argtuples ``([],{})`` must be passed.
"""
super().__init__()
self.defaults = RuleInfo(None)
self.defaults.norun = True
for key, value in kwargs.items():
setattr(self.defaults, key, value)
[docs] def get_super(self, rule: Rule, ruleinfo: RuleInfo) -> RuleInfo:
return ("__default__", self.defaults)
[docs]class WorkflowObject(object):
"""
Base for extension classes defined from snakefiles
This currently encompasses `ymp.env.Env` and `ymp.stage.Stage`.
This mixin sets the properties ``filename`` and ``lineno`` according
to the definition source in the rules file. It also maintains a registry
within the Snakemake workflow object and provides an accessor method
to this registry.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Fill filename and lineno
# We assume the creating call is the first up the stack
# that is not a constructor call (i.e. not __init__)
try:
caller = next(fi for fi in stack() if fi.function != "__init__")
if not hasattr(self, 'filename'):
#: str: Name of file in which object was defined
self.filename = caller.filename
if not hasattr(self, 'lineno'):
#: int: Line number of object definition
self.lineno = caller.lineno
except IndexError:
log.error("Failed to find source code defining %s", self)
[docs] def register(self):
"""Add self to registry"""
cache = self.get_registry()
names = []
for attr in 'name', 'altname':
if hasattr(self, attr):
names += ensure_list(getattr(self, attr))
for name in names:
if (name in cache
and self != cache[name]
and (self.filename != cache[name].filename
or self.lineno != cache[name].lineno)):
other = cache[name]
raise YmpRuleError(
self,
f"Failed to create {self.__class__.__name__} '{names[0]}':"
f" already defined in {other.filename}:{other.lineno}"
)
for name in names:
cache[name] = self
@property
def defined_in(self):
return self.filename
[docs] @classmethod
def new_registry(cls):
return cls.get_registry(clean=True)
[docs] @classmethod
def get_registry(cls, clean=False):
"""
Return all objects of this class registered with current workflow
"""
import ymp
cfg = ymp.get_config()
return cfg.cache.get_cache(
cls.__name__,
loadfunc=ExpandableWorkflow.ensure_global_workflow,
clean=clean)