import atexit
import glob
import logging
import os
from xdg import XDG_CACHE_HOME
import ymp.yaml
from ymp.common import AttrDict, Cache, MkdirDict, parse_number
from ymp.env import CondaPathExpander
from ymp.exceptions import YmpSystemError
from ymp.stage import Pipeline, Project, Reference
from ymp.snakemake import \
BaseExpander, \
ColonExpander, \
DefaultExpander, \
ExpandableWorkflow, \
InheritanceExpander, \
RecursiveExpander, \
SnakemakeExpander
from ymp.stage import StageExpander
from ymp.string import PartialFormatter
log = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]class ConfigExpander(ColonExpander):
def __init__(self, config_mgr):
super().__init__()
self.config_mgr = config_mgr
[docs] def expands_field(self, field):
return field not in 'func'
[docs]class OverrideExpander(BaseExpander):
"""
Apply rule attribute overrides from ymp.yml config
Example:
Set the ``wordsize`` parameter in the `bmtagger_bitmask` rule to
12:
.. code-block:: yaml
:caption: ymp.yml
overrides:
rules:
bmtagger_bitmask:
params:
wordsize: 12
"""
def __init__(self, cfgmgr):
if 'overrides' not in cfgmgr._config:
return
self.rule_overrides = cfgmgr._config['overrides'].get('rules', {})
super().__init__()
[docs] def expand(self, rule, ruleinfo, **kwargs):
overrides = self.rule_overrides.get(rule.name, {})
for attr_name, values in overrides.items():
attr = getattr(ruleinfo, attr_name)[1]
for val_name, value in values.items():
log.debug("Overriding {}.{}={} in {} with {}".format(
attr_name, val_name, attr[val_name], rule.name, value))
attr[val_name] = value
[docs]class ConfigMgr(object):
"""Manages workflow configuration
This is a singleton object of which only one instance should be around
at a given time. It is available in the rules files as ``icfg`` and
via `ymp.get_config()` elsewhere.
ConfigMgr loads and maintains the workflow configuration as given
in the ``ymp.yml`` files located in the workflow root directory,
the user config folder (``~/.ymp``) and the installation ``etc``
folder.
"""
KEY_PROJECTS = 'projects'
KEY_REFERENCES = 'references'
KEY_PIPELINES = 'pipelines'
CONF_FNAME = 'ymp.yml'
CONF_DEFAULT_FNAME = ymp._defaults_file
CONF_USER_FNAME = os.path.expanduser("~/.ymp/ymp.yml")
RULE_MAIN_FNAME = ymp._snakefile
__instance = None
[docs] @classmethod
def find_config(cls):
"""Locates ymp config files and ymp root
The root ymp work dir is determined as the first (parent)
directory containing a file named ``ConfigMgr.CONF_FNAME``
(default ``ymp.yml``).
The stack of config files comprises 1. the default config
``ConfigMgr.CONF_DEFAULT_FNAME`` (``etc/defaults.yml`` in the
ymp package directory), 2. the user config
``ConfigMgr.CONF_USER_FNAME`` (``~/.ymp/ymp.yml``) and 3. the
``yml.yml`` in the ymp root.
Returns:
root: Root working directory
conffiles: list of active configuration files
"""
# always include defaults
conffiles = [cls.CONF_DEFAULT_FNAME]
# include user config if present
if os.path.exists(cls.CONF_USER_FNAME):
conffiles.append(cls.CONF_USER_FNAME)
# try to find an ymp.yml in CWD and upwards
filename = cls.CONF_FNAME
log.debug("Locating '%s'", filename)
try:
curpath = os.path.abspath(os.getcwd())
except FileNotFoundError:
raise YmpSystemError("The current work directory has been deleted?!")
while not os.path.exists(os.path.join(curpath, filename)):
log.debug(" not in '%s'", curpath)
curpath, removed = os.path.split(curpath)
if not removed:
break
if os.path.exists(os.path.join(curpath, filename)):
root = curpath
log.debug(" Found '%s' in '%s'", filename, curpath)
conffiles.append(os.path.join(root, cls.CONF_FNAME))
else:
root = os.path.abspath(os.getcwd())
log.debug(" No '%s' found; using %s as root", filename, root)
return root, conffiles
[docs] @classmethod
def instance(cls):
"""Returns the active Ymp ConfigMgr instance
"""
if cls.__instance is None:
cls.__instance = cls(*cls.find_config())
return cls.__instance
[docs] @classmethod
def activate(cls):
ExpandableWorkflow.activate()
[docs] @classmethod
def unload(cls):
log.debug("Unloading ConfigMgr")
ExpandableWorkflow.clear()
if cls.__instance:
cls.__instance.cache.close()
cls.__instance = None
from ymp.stage import Stage, StageStack
StageStack.stacks = {}
Stage.active = None
def __init__(self, root, conffiles):
log.debug("Inizializing ConfigMgr")
self.root = root
self.conffiles = conffiles
if os.path.dirname(conffiles[-1]) == root:
self.cachedir = os.path.join(self.root, ".ymp")
else:
self.cachedir = os.path.join(XDG_CACHE_HOME, "ymp")
self._config = ymp.yaml.load(conffiles)
self.cache = cache = Cache(self.cachedir)
# lazy filled by accessors
self._snakefiles = None
self.projects = cache.get_cache(
"projects",
itemloadfunc=Project,
itemdata=self._config.get(self.KEY_PROJECTS) or {},
dependfiles=conffiles
)
self.references = cache.get_cache(
"references",
itemloadfunc=Reference,
itemdata=self._config.get(self.KEY_REFERENCES) or {},
dependfiles=conffiles
)
self.pipelines = cache.get_cache(
"pipelines",
itemloadfunc=Pipeline,
itemdata=self._config.get(self.KEY_PIPELINES) or {},
dependfiles=conffiles
)
self._workflow = ExpandableWorkflow.register_expanders(
SnakemakeExpander(),
RecursiveExpander(),
CondaPathExpander(self),
StageExpander(),
ConfigExpander(self),
OverrideExpander(self),
DefaultExpander(params=([], {
'mem': self.mem(),
'walltime': self.limits.default_walltime
})),
InheritanceExpander(),
)
@property
def ref(self):
"""
Configure references
"""
return self.references
@property
def pipeline(self):
"""
Configure pipelines
"""
return self.pipelines
@property
def pairnames(self):
return self._config.pairnames
@property
def conda(self):
return self._config.conda
@property
def dir(self):
"""
Dictionary of relative paths of named YMP directories
The directory paths are relative to the YMP root workdir.
"""
return self._config.directories
@property
def absdir(self):
"""
Dictionary of absolute paths of named YMP directories
"""
return AttrDict({
name: os.path.normpath(os.path.join(self.root, os.path.expanduser(value)))
for name, value in self.dir.items()
})
@property
def ensuredir(self):
"""
Dictionary of absolute paths of named YMP directories
Directories will be created on the fly as they are requested.
"""
return MkdirDict(self.absdir)
@property
def cluster(self):
"""
The YMP cluster configuration.
"""
return self._config.cluster
@property
def limits(self):
"""
The YMP limits configuration.
"""
return self._config.limits
@property
def snakefiles(self):
"""
Snakefiles used under this config in parsing order
"""
if not self._snakefiles:
self._snakefiles = [
fn
for dn in (os.path.dirname(self.RULE_MAIN_FNAME),
self.absdir.rules)
for fn in sorted(glob.glob(os.path.join(dn, "**", "*.rules"),
recursive=True),
key=lambda v: v.lower())
if os.path.basename(fn)[0] != "."
]
return self._snakefiles
[docs] def expand(self, item, **kwargs):
expander = ConfigExpander(self)
res = expander.expand(None, item, kwargs)
return res
[docs] def mem(self, base="0", per_thread=None, unit="m"):
"""Clamp memory to configuration limits
Params:
base: base memory requested
per_thread: additional mem required per allocated thread
unit: output unit (b, k, m, g, t)
"""
mem = parse_number(base)
max_mem = parse_number(self.limits.max_mem)
if mem > max_mem:
mem = max_mem
min_mem = parse_number(self.limits.min_mem)
if mem < min_mem:
mem = min_mem
div = parse_number("1"+unit)
return int(mem / div)
@property
def shell(self):
"""The shell used by YMP
Change by adding e.g. ``shell: /path/to/shell`` to ``ymp.yml``.
"""
return self._config.shell
@property
def platform(self):
"""Name of current platform (macos or linux)"""
if not (hasattr(self, '_platform') and self._platform):
import platform
system = platform.system()
if system == "Darwin":
self._platform = "macos"
elif system == "Linux":
self._platform = "linux"
else:
raise YmpSystemError(f"YMP does not support system '{system}'")
return self._platform
atexit.register(ConfigMgr.unload)