Source code for ymp.util

import functools
import os
import re
import textwrap

from snakemake.io import Namedlist
from snakemake.utils import format as snake_format


[docs]def make_local_path(icfg, url: str): url_match = re.match("^(http|https|ftp|ftps)://", url) if url_match: return os.path.join( icfg.dir.downloads, url_match.group(1), url[url_match.end():] ) return url
[docs]def is_fq(path): return isinstance(path, str) and ( path.endswith(".fq.gz") or path.endswith(".fastq.gz") or path.endswith(".fq") or path.endswith(".fastq") )
[docs]def file_not_empty(fn): "Checks is a file is not empty, accounting for gz mininum size 20" if fn.endswith('gz'): return os.path.getsize(fn) > 20 return os.path.getsize(fn) > 0
[docs]def filter_out_empty(*args): """ Removes empty sets of files from input file lists. Takes a variable number of file lists of equal length and removes indices where any of the files is empty. Strings are converted to lists of length 1. Returns a generator tuple. Example: r1, r2 = filter_out_empty(input.r1, input.r2) """ args = ([arg] if isinstance(arg, str) else arg for arg in args) return zip(*(t for t in zip(*args) if all(map(file_not_empty, t))))
[docs]@functools.lru_cache() def fasta_names(fasta_file): print("Calling fasta_names on {}".format(fasta_file)) res = [] with open(fasta_file, "r") as f: for line in f: if line[0] != ">": continue res += [line[1:].split(" ", 1)[0]] return res
[docs]def read_propfiles(files): if isinstance(files, str): files = [files] props = {} for file in files: with open(file, "r") as f: props.update({ key: int(float(value)) for line in f for key, value in [line.strip().split(maxsplit=1)] }) return props
[docs]def glob_wildcards(pattern, files=None): """ Glob the values of the wildcards by matching the given pattern to the filesystem. Returns a named tuple with a list of values for each wildcard. """ from snakemake.io import _wildcard_regex, namedtuple, regex import regex as re pattern = os.path.normpath(pattern) first_wildcard = re.search("{[^{]", pattern) dirname = os.path.dirname(pattern[:first_wildcard.start( )]) if first_wildcard else os.path.dirname(pattern) if not dirname: dirname = "." names = [match.group('name') for match in _wildcard_regex.finditer(pattern)] Wildcards = namedtuple("Wildcards", names) wildcards = Wildcards(*[list() for name in names]) pattern = regex(pattern) # work around partial matching bug in python regex module # by replacing matches for "\" with "[/\0]" (0x0 can't occur in filenames) pattern = re.sub('\\\\/', '[/\0]', pattern) cpattern = re.compile(pattern) def walker(dirname, pattern): """finds files/dirs matching `pattern` in `dirname`""" for dirpath, dirnames, filenames in os.walk(dirname): dirpath = os.path.normpath(dirpath) for f in filenames: if dirpath != ".": f = os.path.join(dirpath, f) match = pattern.match(f) if match: yield match for i in range(len(dirnames)-1, -1, -1): d = dirnames[i] if dirpath != ".": d = os.path.join(dirpath, d) match = pattern.match(os.path.join(d, ""), partial=True) if not match: del dirnames[i] continue if match.partial: continue yield match print("searching {}".format(pattern)) if files is None: for match in walker(dirname, cpattern): for name, value in match.groupdict().items(): getattr(wildcards, name).append(value) else: for f in files: match = re.match(cpattern, os.normpath(f)) if match: for name, value in match.groupdict().items(): getattr(wildcards, name).append(value) print("searching {}: done".format(pattern)) return wildcards
[docs]def activate_R(): from rpy2.robjects import default_converter, conversion from rpy2 import robjects, rinterface @default_converter.py2ri.register(dict) def _1(obj): keys = list(obj.keys()) res = rinterface.ListSexpVector([ conversion.py2ri(obj[x]) for x in keys ]) res.do_slot_assign('names', rinterface.StrSexpVector(keys)) return res @default_converter.py2ri.register(tuple) def _2(obj): return conversion.py2ri(list(obj)) @default_converter.py2ri.register(list) def _3(obj): # return sequence_to_vector(obj) obj = rinterface.ListSexpVector([conversion.py2ri(x) for x in obj]) return robjects.r.unlist(obj, recursive=False)
[docs]def R(code="", **kwargs): """Execute R code This function executes the R code given as a string. Additional arguments are injected into the R environment. The value of the last R statement is returned. The function requires rpy2 to be installed. Args: code (str): R code to be executed **kwargs (dict): variables to inject into R globalenv Yields: value of last R statement >>> R("1*1", input=input) """ try: import rpy2.robjects as robjects from rpy2.rlike.container import TaggedList from rpy2.rinterface import RNULLType except ImportError: raise ValueError( "Python 3 package rpy2 needs to be installed to use" "the R function.") activate_R() # translate Namedlists into rpy2's TaggedList to have named lists in R for key in kwargs: value = kwargs[key] if isinstance(value, Namedlist): kwargs[key] = TaggedList([y for x, y in value.allitems()], [x for x, y in value.allitems()]) code = snake_format(textwrap.dedent(code), stepout=2) # wrap code in function to preserve clean global env and execute rval = robjects.r("function({}){{ {} }}" "".format(",".join(kwargs), code))(**kwargs) # Reduce vectors of length 1 to scalar, as implicit in R. if isinstance(rval, RNULLType): rval = None if rval and len(rval) == 1: return rval[0] return rval
[docs]def Rmd(rmd, out, **kwargs): R(""" library(rmarkdown) print(out) rmarkdown::render(rmd, params=paramx, output_file=out) """, rmd=rmd, out=os.path.abspath(out[0]), paramx=kwargs)