"""Implements subcommands for ``ymp make`` and ``ymp submit``"""
import functools
import logging
import os
import shutil
import sys
import click
import ymp
from ymp.cli.shared_options import command, nohup_option, Log
from ymp.exceptions import YmpException, YmpStageError
from ymp.stage import StageStack
log = logging.getLogger(__name__) # pylint: disable=invalid-name
DEBUG_LOGFILE_NAME = os.environ.get("YMP_DEBUG_EXPAND")
if DEBUG_LOGFILE_NAME:
import time
start_time = time.time()
if DEBUG_LOGFILE_NAME == "stderr":
DEBUG_LOGFILE = sys.stderr
else:
DEBUG_LOGFILE = open(DEBUG_LOGFILE_NAME, "a")
[docs]def debug(msg, *args, **kwargs):
if DEBUG_LOGFILE_NAME:
tim = (time.time() - start_time)
msg = "{:4.4f}: " + msg
DEBUG_LOGFILE.write(msg.format(tim, *args, **kwargs) + '\n')
debug("started")
[docs]class TargetParam(click.ParamType):
"""Handles tab expansion for build targets"""
[docs] @classmethod
def complete(cls, ctx, incomplete):
"""Try to complete incomplete command
This is executed on tab or tab-tab from the shell
Args:
ctx: click context object
incomplete: last word in command line up until cursor
Returns:
list of words incomplete can be completed to
"""
# Turn of logging!
log = ctx.ensure_object(Log)
log.mod_level(10)
result: list = []
stack, _, tocomplete = incomplete.rpartition(".")
debug("complete(stack={},incomplete={})", stack, tocomplete)
if not stack:
cfg = ymp.get_config()
options = cfg.projects.keys()
result += (o for o in options if o.startswith(tocomplete))
result += (o + "." for o in options if o.startswith(tocomplete))
else:
from ymp.stage import StageStack
try:
stackobj = StageStack.get(stack)
except YmpStageError as e:
debug(e.format_message().replace("{", "{{").replace("}", "}}"))
return []
debug("stacko = {}", repr(stack))
options = stackobj.complete(tocomplete)
debug("options = {}", options)
# reduce items sharing prefix before "_"
prefixes = {}
for option in options:
prefix = option.split("_", 1)[0]
group = prefixes.setdefault(prefix, [])
group.append(option)
if len(prefixes) == 1:
extensions = options
else:
extensions = []
for prefix, group in prefixes.items():
if len(group) > 1:
extensions.append(prefix + "_")
else:
extensions.append(group[0])
result += ('.'.join((stack, ext)) for ext in extensions)
result += ('.'.join((stack, ext))+"." for ext in extensions
if not ext[-1] == "_")
debug("res={}", result)
return result
[docs]def snake_params(func):
"""Default parameters for subcommands launching Snakemake"""
@click.argument(
"targets", nargs=-1, metavar="TARGET_FILES",
type=TargetParam()
)
@click.option(
"--dryrun", "-n", default=False, is_flag=True,
help="Only show what would be done"
)
@click.option(
"--printshellcmds", "-p", default=False, is_flag=True,
help="Print shell commands to be executed on shell"
)
@click.option(
"--keepgoing", "-k", default=False, is_flag=True,
help="Don't stop after failed job"
)
@click.option(
"--lock/--no-lock",
help="Use/don't use locking to prevent clobbering of files"
" by parallel instances of YMP running"
)
@click.option(
"--rerun-incomplete", "--ri", 'force_incomplete', is_flag=True,
help="Re-run jobs left incomplete in last run"
)
@click.option(
"--forceall", "-F", is_flag=True, default=False,
help="Force rebuilding of all stages leading to target"
)
@click.option(
"--force", "-f", "forcetargets", is_flag=True, default=False,
help="Force rebuilding of target"
)
@click.option(
"--notemp", is_flag=True, default=False,
help="Do not remove temporary files"
)
@click.option(
"--touch", "-t", is_flag=True, default=False,
help="Only touch files, faking update"
)
@click.option(
"--shadow-prefix", default=None,
help="Directory to place data for shadowed rules"
)
@click.option(
"--reason", "-r", "printreason", is_flag=True, default=False,
help="Print reason for executing rule"
)
@nohup_option
@functools.wraps(func)
def decorated(*args, **kwargs): # pylint: disable=missing-docstring
return func(*args, **kwargs)
return decorated
[docs]class YmpConfigNotFound(YmpException):
"""
Exception raised by YMP if no config was found in current path
"""
pass
[docs]def start_snakemake(kwargs):
"""Execute Snakemake with given parameters and targets
Fixes paths of kwargs['targets'] to be relative to YMP root.
"""
cfg = ymp.get_config()
if not cfg.projects:
log.warning("No projects configured. Are you in the right folder?")
log.warning(" Config files loaded:")
for fname in cfg.conffiles:
log.warning(" - %s", fname)
root_path = cfg.root
cur_path = os.path.abspath(os.getcwd())
if not cur_path.startswith(root_path):
raise YmpException("internal error - CWD moved out of YMP root?!")
cur_path = cur_path[len(root_path):]
# translate renamed arguments to snakemake synopsis
arg_map = {
'immediate': 'immediate_submit',
'wrapper': 'jobscript',
'scriptname': 'jobname',
'cluster_cores': 'nodes',
'snake_config': 'config',
'drmaa': None,
'sync': None,
'sync_arg': None,
'command': None,
'args': None,
'nohup': None
}
kwargs = {arg_map.get(key, key): value
for key, value in kwargs.items()
if arg_map.get(key, key) is not None}
kwargs['workdir'] = root_path
# our debug flag sets a new excepthoook handler, to we use this
# to decide whether snakemake should run in debug mode
if sys.excepthook.__module__ != "sys":
log.warning(
"Custom excepthook detected. Having Snakemake open stdin "
"inside of run: blocks")
kwargs['debug'] = True
# map our logging level to snakemake logging level
if log.getEffectiveLevel() > logging.WARNING:
kwargs['quiet'] = True
if log.getEffectiveLevel() < logging.WARNING:
kwargs['verbose'] = True
kwargs['use_conda'] = True
stage_stack_failure = None
if 'targets' in kwargs:
if cur_path:
kwargs['targets'] = [os.path.join(cur_path, t)
for t in kwargs['targets']]
else:
targets = []
for t in kwargs['targets']:
try:
stack = StageStack.get(t)
targets.extend(stack.all_targets())
except YmpStageError as exc:
stage_stack_failure = exc
targets.append(t)
except YmpException as exc:
log.error("Failure assembling stack:")
exc.show()
return False
kwargs['targets'] = targets
log.info("Making targets:")
for target in targets:
log.info(" - %s", target)
log.debug("Running snakemake.snakemake with args: %s", kwargs)
# A snakemake workflow was created above to resolve the
# stage stack. Unload it so things run correctly from within
# snakemake.
cfg.unload()
import snakemake
res = snakemake.snakemake(ymp._snakefile, **kwargs)
if not res and stage_stack_failure:
log.error("Incomplete stage stack: %s", stage_stack_failure)
return res
@command()
@snake_params
@click.option(
"--cores", "-j", default=1, metavar="CORES",
help="The number of parallel threads used for scheduling jobs"
)
@click.option(
"--dag", "printdag", default=False, is_flag=True,
help="Print the Snakemake execution DAG and exit"
)
@click.option(
"--rulegraph", "printrulegraph", default=False, is_flag=True,
help="Print the Snakemake rule graph and exit"
)
@click.option(
"--debug-dag", default=False, is_flag=True,
help="Show candidates and selections made while the rule execution graph "
"is being built"
)
@click.option(
"--debug", default=False, is_flag=True,
help="Set the Snakemake debug flag"
)
def make(**kwargs):
"Build target(s) locally"
rval = start_snakemake(kwargs)
if not rval:
sys.exit(1)
@command()
@snake_params
@click.option(
"--profile", "-P", metavar="NAME",
help="Select cluster config profile to use. Overrides cluster.profile "
"setting from config."
)
@click.option(
"--snake-config", "-c", metavar="FILE",
help="Provide snakemake cluster config file"
)
@click.option(
"--drmaa", "-d", is_flag=True,
help="Use DRMAA to submit jobs to cluster. Note: Make sure you have "
"a working DRMAA library. Set DRMAA_LIBRAY_PATH if necessary."
)
@click.option(
"--sync", "-s", is_flag=True,
help="Use synchronous cluster submission, keeping the submit command "
"running until the job has completed. Adds qsub_sync_arg to cluster "
"command"
)
@click.option(
"--immediate", "-i", is_flag=True,
help="Use immediate submission, submitting all jobs to the cluster "
"at once."
)
@click.option(
"--command", metavar="CMD",
help="Use CMD to submit job script to the cluster"
)
@click.option(
"--wrapper", metavar="CMD",
help="Use CMD as script submitted to the cluster. See Snakemake "
"documentation for more information."
)
@click.option(
"--max-jobs-per-second", metavar="N",
help="Limit the number of jobs submitted per second"
)
@click.option(
"--latency-wait", "-l", metavar="T",
help="Time in seconds to wait after job completed until files are "
"expected to have appeared in local file system view. On NFS, this "
"time is governed by the acdirmax mount option, which defaults to "
"60 seconds."
)
@click.option(
"--cluster-cores", "-J", type=int, metavar="N",
help="Limit the maximum number of cores used by jobs submitted at a time"
)
@click.option(
"--cores", "-j", default=16, metavar="N",
help="Number of local threads to use"
)
@click.option(
"--args", "args", metavar="ARGS",
help="Additional arguments passed to cluster submission command. "
"Note: Make sure the first character of the argument is not '-', "
"prefix with ' ' as necessary."
)
@click.option(
"--scriptname", metavar="NAME",
help="Set the name template used for submitted jobs"
)
def submit(profile, **kwargs):
"""Build target(s) on cluster
The parameters for cluster execution are drawn from layered profiles. YMP
includes base profiles for the "torque" and "slurm" cluster engines.
"""
cfg = ymp.get_config()
# The cluster config uses profiles, which are assembled by layering
# the default profile, the selected profile and additional command
# line parameters. The selected profile is either specified via
# "
config = cfg.cluster.profiles.default
profile_name = profile or cfg.cluster.profile
if profile_name:
config.add_layer(profile_name,
cfg.cluster.profiles[profile_name])
cli_params = {key: value
for key, value in kwargs.items()
if value is not None}
config.add_layer("<shell arguments>",
cli_params)
# prepare cluster command
if config.command is None:
raise click.UsageError("""
No cluster submission command configured.
Please check the manual on how to configure YMP for your cluster"
""")
cmd = config.command.split() + config.args.values()
if config.drmaa:
param = 'drmaa'
cmd[0] = ''
elif config.sync:
param = 'cluster_sync'
cmd.append(config.sync_arg)
else:
param = 'cluster'
if cmd[0] and not shutil.which(cmd[0]):
raise click.UsageError(f"""
The configured cluster submission command '{cmd[0]}' is does not
exist or is not executable. Please check your cluster configuration.
""")
config.add_layer("<computed>", {param: cfg.expand(" ".join(cmd))})
rval = start_snakemake(config)
if not rval:
sys.exit(1)