Source code for ymp.stage.reference

import logging
import os
import re
from hashlib import sha1
from typing import Dict, Optional, Union, Set, List
from collections.abc import Mapping, Sequence

from snakemake.rules import Rule

from ymp.snakemake import make_rule
from ymp.util import make_local_path
from ymp.stage import ConfigStage, Activateable, Stage
from ymp.exceptions import YmpConfigError


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class Archive(object): name = None hash = None tar = None dirname = None strip_components = None files = None def __init__(self, name, dirname, tar, url, strip, files): self.name = name self.dirname = dirname self.tar = tar self.url = url self.strip = strip self.files = files self.hash = sha1(self.tar.encode('utf-8')).hexdigest()[:8] self.prefix = os.path.join(self.dirname, "_unpacked_" + self.hash)
[docs] def get_files(self): if isinstance(self.files, Sequence): return {fn: os.path.join(self.prefix, fn) for fn in self.files} elif isinstance(self.files, Mapping): return {fn_ymp: os.path.join(self.prefix, fn_arch) for fn_ymp, fn_arch in self.files.items()} else: raise Exception("unknown data type for reference.files")
[docs] def make_unpack_rule(self, baserule: 'Rule'): docstr_tpl = """ Unpacks {} archive: URL: {} Files: """ item_tpl = """ - {} """ docstr = "\n".join([docstr_tpl.format(self.name, self.url)] + [item_tpl.format(fn) for fn in self.files]) return make_rule( name="unpack_{}_{}".format(self.name, self.hash), docstring=docstr, lineno=0, snakefile=__name__, parent=baserule, input=([], {'tar': self.tar}), output=([], {'files': list(self.get_files().values())}), params=([], {'strip': self.strip, 'prefix': self.prefix}) )
[docs]class Reference(Activateable, ConfigStage): """ Represents (remote) reference file/database configuration """ def __init__(self, name, cfg): super().__init__("ref_" + name, cfg) #: Files provided by the reference. Keys are the file names #: within ymp ("target.extension"), symlinked into dir.ref/ref_name/ and #: values are the path to the reference file from workspace root. self.files: Dict[str, str] = {} self.archives = [] self._ids: Set[str] = set() self._outputs = None import ymp self.dir = os.path.join(ymp.get_config().dir.references, name) if isinstance(cfg, Mapping): self.add_resource(cfg) elif isinstance(cfg, Sequence) and not isinstance(cfg, str): for item in cfg: self.add_resource(item) else: raise YmpConfigError(cfg, "Reference config must list or key-value mapping") # Copy rules defined in primary references stage stage_references = Stage.get_registry().get("references") if not stage_references: raise YmpConfigError( cfg, "Reference base stage not found. Main rules not loaded?" ) self.rules = stage_references.rules.copy()
[docs] def get_group( self, stack: "StageStack", default_groups: List[str] ) -> List[str]: if len(self._ids) > 1: groups = [self.name] else: groups = [] return super().get_group(stack, groups)
[docs] def get_ids( self, stack: "StageStack", groups: List[str], match_groups: Optional[List[str]] = None, match_value: Optional[str] = None ) -> List[str]: if self._ids: return list(self._ids) return super().get_ids(stack, groups, match_groups, match_value)
@property def outputs(self) -> Union[Set[str], Dict[str, str]]: if self._outputs is None: keys = self._ids if self._ids else ["ALL"] self._outputs = { "/" + re.sub(f"(^|.)({'|'.join(keys)})\.", r"\1{sample}.", fname) : "."+self.name for fname in self.files } return self._outputs
[docs] def add_resource(self, rsc): if not isinstance(rsc, Mapping): raise YmpConfigError(rsc, "Reference resource config must be a key-value mapping") if not "url" in rsc: raise YmpConfigError(rsc, "Reference resource must have 'url' field") maybeurl = str(rsc["url"]) import ymp local_path = make_local_path(ymp.get_config(), maybeurl) isurl = local_path != maybeurl if not isurl: local_path = rsc.get_path("url") type_name = rsc.get('type', 'fasta').lower() if 'id' in rsc: self._ids.add(rsc['id']) if type_name in ("fasta", "fastp"): self.files[f"ALL.{type_name}.gz"] = local_path elif type_name in ("gtf", "snp", "tsv", "csv"): self.files[f"ALL.{type_name}"] = local_path elif type_name == 'dir': archive = Archive( name=self.name, dirname=self.dir, tar=local_path, url=maybeurl, files=rsc['files'], strip=rsc.get('strip_components', 0) ) self.files.update(archive.get_files()) self.archives.append(archive) elif type_name == 'dirx': self.files.update({ key: os.path.join(local_path, val) for key, val in rsc.get('files', {}).items() }) elif type_name == 'path': self.dir = local_path.rstrip("/") try: filenames = os.listdir(local_path) except FileNotFoundError: log.error("Directory %s required by %s %s does not exist", local_path, self.__class__.__name__, self.name) filenames = [] for filename in filenames: for regex in rsc.get('match', []): match = re.fullmatch(regex, filename) if not match: continue self._ids.add(match.group('sample')) self.files[filename] = os.path.join(local_path, filename) else: raise YmpConfigError(rsc, f"Unknown type {type_name}", key="type")
[docs] def get_path(self, _stack): return self.dir
[docs] def get_all_targets(self, stack: "StageStack") -> List[str]: return [os.path.join(self.dir, fname) for fname in self.files]
[docs] def get_file(self, filename): local_path = self.files.get(filename) if local_path: return local_path log.error(f"{self!r}: Failed to find {filename}") log.warning(f" Available: {self.files}") return ("YMP_FILE_NOT_FOUND__" + "No file {} in Reference {}" "".format(filename, self.name).replace(" ", "_"))
[docs] def make_unpack_rules(self, baserule: 'Rule'): for archive in self.archives: yield archive.make_unpack_rule(baserule)
def __str__(self): return os.path.join(self.dir, "ALL")
[docs] def this(self, args=None, kwargs=None): item = kwargs['item'] if kwargs.get('field') == 'output': suffix = self.register_inout("this", set(), item).lstrip('/') self.files[suffix] = os.path.join(self.dir, suffix) self._outputs = None # will need refresh return self.dir
[docs] def prev(self, args=None, kwargs=None): return self.dir