Source code for ymp.env

"""
This module manages the conda environments.
"""

import io
import logging
import os
import os.path as op
import shutil
import subprocess
from typing import Optional, Union

from ruamel.yaml import YAML

import snakemake.deployment.conda as snakemake_conda

from snakemake.rules import Rule

import ymp
from ymp.common import AttrDict, ensure_list
from ymp.exceptions import YmpRuleError, YmpWorkflowError
from ymp.snakemake import BaseExpander, WorkflowObject


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class Env(WorkflowObject, snakemake_conda.Env): """Represents YMP conda environment Snakemake expects the conda environments in a per-workflow directory configured by ``conda_prefix``. YMP sets this value by default to ``~/.ymp/conda``, which has a greater chance of being on the same file system as the conda cache, allowing for hard linking of environment files. Within the folder ``conda_prefix``, each environment is created in a folder named by the hash of the environment definition file's contents and the ``conda_prefix`` path. This class inherits from ``snakemake.deployment.conda.Env`` to ensure that the hash we use is identical to the one Snakemake will use during workflow execution. The class provides additional features for updating environments, creating environments dynamically and executing commands within those environments. Note: This is not called from within the execution. Snakemake instanciates its own Env object purely based on the filename. """
[docs] @staticmethod def get_installed_env_hashes(): cfg = ymp.get_config() return [ dentry.name for dentry in os.scandir(cfg.absdir.conda_prefix) if dentry.is_dir() ]
def __new__(cls, *args, **kwargs): if args and "name" not in kwargs: for env in cls.get_registry().values(): if env.file == args[0]: return env return super().__new__(cls) def __init__( self, # Snakemake Params: env_file: Optional[str] = None, workflow = None, env_dir = None, container_img=None, cleanup=None, # YMP Params: name: Optional[str] = None, packages: Optional[Union[list, str]] = None, base: str = "none", channels: Optional[Union[list, str]] = None ) -> None: """Creates an inline defined conda environment Args: name: Name of conda environment (and basename of file) packages: package(s) to be installed into environment. Version constraints can be specified in each package string separated from the package name by whitespace. E.g. ``"blast =2.6*"`` channels: channel(s) to be selected for the environment base: Select a set of default channels and packages to be added to the newly created environment. Sets are defined in conda.defaults in ``yml.yml`` """ if 'name' in self.__dict__: # already initialized return cfg = ymp.get_config() if env_file: if name: import pdb; pdb.set_trace() raise YmpRuleError( self, "Env must not have both 'name' and 'env_file' parameters'" ) self.dynamic = False self.name, _ = op.splitext(op.basename(env_file)) self.packages = None self.base = None self.channels = None # Override location for exceptions: self.filename = env_file self.lineno = 1 elif name: self.dynamic = True self.name = name self.packages = ensure_list(packages) + cfg.conda.defaults[base].dependencies self.channels = ensure_list(channels) + cfg.conda.defaults[base].channels env_file = op.join(cfg.ensuredir.dynamic_envs, f"{name}.yml") contents = self._get_dynamic_contents() self._update_file(env_file, contents) else: raise YmpRuleError( self, "Env must have either 'name' or 'env_file' parameter" ) # Unlike within snakemake, we create these objects before the workflow is fully # initialized, which means we need to create a fake one: if not workflow: workflow = AttrDict({ 'persistence': { 'conda_env_path': cfg.ensuredir.conda_prefix, 'conda_env_archive_path': cfg.ensuredir.conda_archive_prefix, }, 'conda_frontend': cfg.conda.frontend, 'singularity_args': '', }) super().__init__( env_file, workflow, env_dir if env_dir else cfg.ensuredir.conda_prefix, container_img, cleanup) self.register() def _get_dynamic_contents(self): cfg = ymp.get_config() defaults = { 'name': self.name, 'dependencies': self.packages, 'channels': self.channels, } yaml = YAML(typ='rt') yaml.default_flow_style = False buf = io.StringIO() yaml.dump(defaults, buf) return buf.getvalue() @staticmethod def _update_file(env_file, contents): disk_contents = "" if op.exists(env_file): with open(env_file, "r") as inf: disk_contents = inf.read() if contents != disk_contents: with open(env_file, "w") as out: out.write(contents) def __getstate__(self): state = self.__dict__.copy() del state['workflow'] return state def __setstate(self, state): self.__dict__.update(state) self.workflow = ymp.get_config().workflow @property def _env_archive_dir(self): cfg = ymp.get_config() return cfg.ensuredir.conda_archive_prefix def _get_content(self): if self.dynamic: return self._get_dynamic_contents().encode("utf-8") cfg = ymp.get_config() if cfg.workflow: self.workflow = cfg.workflow return super()._get_content() return open(self.file, "rb").read()
[docs] def set_prefix(self, prefix): self._env_dir = op.abspath(prefix)
[docs] def create(self, dryrun=False, reinstall=False, nospec=False, noarchive=False): """Ensure the conda environment has been created Inherits from snakemake.deployment.conda.Env.create Behavior of super class - Resolve remote file - If containerized, check environment path exists and return if true - Check for interrupted env create, delete if so - Return if environment exists - Install from archive if env_archive exists - Install using self.frontent if not_careful Handling pre-computed environment specs In addition to freezing environments by maintaining a copy of the package binaries, we allow maintaining a copy of the package binary URLs, from which the archive folder is populated on demand. We just download those to self.archive and pass on. """ if self.installed: if reinstall: log.info("Environment '%s' already exists. Removing...", self.name) if not dryrun: shutil.rmtree(self.path, ignore_errors = True) else: log.info("Environment '%s' already exists", self.name) return self.path log.warning("Creating environment '%s'", self.name) log.debug("Target dir is '%s'", self.path) if noarchive and self.archive_file: log.warning("Removing archived environment packages...") if not dryrun: shutil.rmtree(self.archive_file, ignore_errors = True) if not self._have_archive() and not nospec: urls, files, md5s = self._get_env_from_spec() if files: if dryrun: log.info("Would download %i files", len(urls)) else: self._download_files(urls, md5s) packages = op.join(self.archive_file, "packages.txt") with open(packages, "w") as f: f.write("\n".join(files) + "\n") else: log.warning("Neither spec file nor package archive found for '%s'," " falling back to native resolver", self.name) res = super().create(dryrun) log.info("Created env %s", self.name) return res
def _have_archive(self): packages_txt = op.join(self.archive_file, "packages.txt") log.info("Checking for archive in %s", self.archive_file) if not op.exists(packages_txt): return False log.info("... found. Checking archive.") with open(packages_txt) as f: packages = [package.strip() for package in f] missing_packages = [ package for package in packages if not op.exists(op.join(self.archive_file, package)) ] if missing_packages: log.warning( "Ignoring incomplete package archive for environment %s", self.name) log.debug( "Missing packages: %s", missing_packages) return False return True def _get_env_from_spec(self): """Parses conda spec file Conda spec files contain a list of URLs pointing to the packages comprising the environment. Each URL may have an md5 sum appended as "anchor" using "#". Comments are placed at the top in lines beginnig with "#" and a single line "@EXPLICIT" indicates the type of the file. Returns: urls: list of URLs files: list of file names md5s: list of md5 sums """ cfg = ymp.get_config() for spec_path in cfg.conda.env_specs.get_paths(): if spec_path.startswith("BUILTIN:"): spec_path = spec_path.replace("BUILTIN:", "") spec_path = op.join(ymp._env_dir, spec_path) for path in (op.join(spec_path, cfg.platform), spec_path): spec_file = op.join(path, self.name + ".txt") log.debug("Trying %s", spec_file) if op.exists(spec_file): log.info("Using %s", spec_file) break else: continue break else: return [], [], [] log.debug("Using env spec '%s'", spec_file) with open(spec_file) as sf: urls = [line for line in sf if line and line[0] != "@" and line[0] != "#"] md5s = [url.split("#")[1] for url in urls] files = [url.split("#")[0].split("/")[-1] for url in urls] return urls, files, md5s def _download_files(self, urls, md5s): from ymp.download import FileDownloader if not op.exists(self.archive_file): os.makedirs(self.archive_file) cfg = ymp.get_config() fd = FileDownloader(alturls=cfg.conda.alturls) if not fd.get(urls, self.archive_file, md5s): # remove partially download archive folder? # shutil.rmtree(self.archive_file, ignore_errors=True) raise YmpWorkflowError( f"Unable to create environment {self.name}, " f"because downloads failed. See log for details.") @property def installed(self): if self.is_containerized: return True # Not checking if not op.exists(self.path): return False start_stamp = op.join(self.path, "env_setup_start") finish_stamp = op.join(self.path, "env_setup_done") if op.exists(start_stamp) and not op.exists(finish_stamp): return False return True
[docs] def update(self): "Update conda environment" self.create() # call create to make sure environment exists log.warning("Updating environment '%s'", self.name) return subprocess.run([ self.frontend, "env", "update", "--prune", "-p", self.path, "-f", self.file, "-v" ]).returncode
[docs] def run(self, command): """Execute command in environment Returns exit code of command run. """ command = " ".join(command) command = snakemake_conda.Conda().shellcmd(self.path, command) cfg = ymp.get_config() log.debug("Running: %s", command) return subprocess.run( command, shell=True, executable=cfg.shell ).returncode
[docs] def export(self, stream, typ='yml'): """Freeze environment""" log.warning("Exporting environment '%s'", self.name) if typ == 'yml': res = subprocess.run([ "conda", "env", "export", "-p", self.path, ], stdout=subprocess.PIPE) yaml = YAML(typ='rt') yaml.default_flow_style = False env = yaml.load(res.stdout) env['name'] = self.name if 'prefix' in env: del env['prefix'] yaml.dump(env, stream) elif typ == 'txt': res = subprocess.run([ "conda", "list", "--explicit", "--md5", "-p", self.path, ], stdout=stream) return res.returncode
def __lt__(self, other): "Comparator for sorting" return self.name < other.name def __repr__(self): return f"{self.__class__.__name__}({self.__dict__!r})" def __eq__(self, other): if isinstance(other, Env): return self.hash == other.hash
# Patch Snakemake's Env class with our own snakemake_conda.Env = Env
[docs]class CondaPathExpander(BaseExpander): """Applies search path for conda environment specifications File names supplied via ``rule: conda: "some.yml"`` are replaced with absolute paths if they are found in any searched directory. Each ``search_paths`` entry is appended to the directory containing the top level Snakefile and the directory checked for the filename. Thereafter, the stack of including Snakefiles is traversed backwards. If no file is found, the original name is returned. """ def __init__(self, config, *args, **kwargs): super().__init__(*args, **kwargs) self._search_paths = config.conda.env_path self._envs = None
[docs] def expands_field(self, field): return field == 'conda_env'
[docs] def format(self, conda_env, *args, **kwargs): if conda_env[0] == "/" and op.exists(conda_env): return conda_env if not self._envs: self._envs = Env.get_registry() if conda_env in self._envs: return self._envs[conda_env].file for snakefile in reversed(self.workflow.included_stack): basepath = op.dirname(snakefile) for _, relpath in sorted(self._search_paths.items()): searchpath = op.join(basepath, relpath) abspath = op.abspath(op.join(searchpath, conda_env)) for ext in "", ".yml", ".yaml": env_file = abspath+ext if op.exists(env_file): Env(env_file) return env_file return conda_env