Snakefile "ymp/rules/00_download.rules"
localrules: download_file_ftp
rule download_file_ftp:
"""
Downloads remote file using *wget*
"""
message:
"Downloading {params.url}"
output:
"{:dir.downloads:}/{protocol}/{server}/{path}"
log:
"{:dir.downloads:}/{protocol}/{server}/{path}.log"
wildcard_constraints:
protocol="(ftp|ftps)",
server="[^/]+"
params:
url="{protocol}://{server}/{path}"
conda: "wget.yml"
shell: """
wget "{params.url}" -O "{output[0]}" > {log} 2>&1
"""
ruleorder: download_file_ftp > gunzip
ruleorder: download_file_ftp > fq2fa
rule download_file_http:
"""
Downloads remote file using internal downloader
"""
message:
"Downloading {params.url}"
output:
"{:dir.downloads:}/{protocol}/{server}/{path}"
log:
"{:dir.downloads:}/{protocol}/{server}/{path}.log"
wildcard_constraints:
protocol="(http|https)",
server="[^/]+"
params:
url="{protocol}://{server}/{path}"
run:
from ymp.download import FileDownloader
FileDownloader(loglevel=0).get(params.url, output[0])
ruleorder: download_file_http > gunzip
ruleorder: download_file_http > fq2fa
with Stage("references") as S:
S.doc("""
This is a "virtual" stage. It does not process read data, but comprises
rules used for reference provisioning.
""")
localrules: prepare_reference
rule prepare_reference:
"""
Provisions files in ``<reference_dir>/<reference_name>``
- Creates symlinks to downloaded references
- Compresses references provided uncompressed upstream
- Connects files requested by stages with downloaded files and unpacked archives
"""
message:
"Preparing {output}"
input:
files = lambda wc: icfg.ref[wc.refname].get_file(wc.path)
output:
"{:dir.references:}/{refname}/{path}"
run:
if isinstance(input.files, str):
infile = input.files
else:
infiles = [fn for fn in input.files if fn.endswith(wildcards.path)]
if len(infiles) == 1:
infile = infiles[0]
elif not infiles:
raise RuleException(
"No reference file found matching path '{}' in reference '{}'"
"\nCandidates: {}"
"".format(wildcards.path, wildcards.refname, input.files)
)
else:
raise RuleException(
"Ambiguous reference file in reference '{}' for path '{}': '{}'"
"".format(wildcards.refname, wildcards.path, infiles)
)
if (wildcards.path.endswith(".gz")
and not infile.endswith(".gz")):
shell("gzip -c '{infile}' > '{output}'")
elif (not wildcards.path.endswith(".gz")
and infile.endswith(".gz")):
shell("gunzip -c '{infile}' > '{output}'")
else:
outdir = os.path.dirname(output[0])
input_relpath = os.path.relpath(infile, outdir)
os.symlink(input_relpath, output[0])
localrules: unpack_archive
rule unpack_archive:
"""
Template rule for unpacking references provisioned upstream as archive.
"""
message:
"Unpacking {input.tar} into {params.prefix}"
shell: """
rm -rf {params.prefix}
mkdir -p {params.prefix}
tar -xf '{input.tar}' --strip-components {params.strip} -C '{params.prefix}'
"""
ruleorder: unpack_archive > prepare_reference
ruleorder: unpack_archive > gunzip
for ref in icfg.ref:
for unpack_rule in icfg.ref[ref].make_unpack_rules(workflow._rules['unpack_archive']):
unpack_rule
if (icfg.pipelines):
pipeline_pattern = "{{pipeline_stack_link,[^/]*.({})}}".format(
"|".join(icfg.pipelines)
)
def _get_pipeline_realpath(wc):
from ymp.stage import StageStack, Pipeline
link = wc.pipeline_stack_link
stack = StageStack.get(link)
if not isinstance(stack.stage, Pipeline):
raise IndexError
depends = [tgt for tgt in stack.all_targets()
if tgt != link]
return depends
localrules: pipeline_link
rule pipeline_link:
output: pipeline_pattern
input: _get_pipeline_realpath
run:
from ymp.stage import StageStack
os.symlink(
StageStack.get(output[0]).path,
output[0]
)