Snakefile "ymp/rules/megahit.rules"

from ymp.util import filter_input, check_input

Env(name="megahit", base="bioconda", packages=[
    "megahit",
    "pigz",
    "coreutils",  # for tac
    "sed"
])

with Stage("assemble_megahit") as S:
    S.doc("""
    Assemble metagenome using MegaHit.

    >>> ymp make toy.assemble_megahit.map_bbmap
    >>> ymp make toy.group_ALL.assemble_megahit.map_bbmap
    >>> ymp make toy.group_Subject.assemble_megahit.map_bbmap
    """)
    rule megahit:
        """
        Runs MegaHit. 
        """
        message:
            "(Co-)Assembling {wildcards.target} with megahit"
        input:
            r1 = "{:prev:}/{:target:}.{:pairnames[0]:}.fq.gz",
            r2 = "{:prev:}/{:target:}.{:pairnames[1]:}.fq.gz",
        output:
            fasta = "{:this:}/{target}.fasta.gz",
            fastg = "{:this:}/{target}.fastg.gz"
        log:
            log     = "{:this:}/{target}.log",
            megahit = "{:this:}/{target}.megahit.log.gz",
        params:
            r1 = filter_input("r1", also=["r2"], minsize=1, join=","),
            r2 = filter_input("r2", also=["r1"], minsize=1, join=","),
            # Allow failure if we have <50 sequences
            sufficient_input = check_input(["r1", "r2"], minlines=50*4*2),
            workdir = "{:this:}/{target}.tmp",
            preset  = "meta-sensitive",
            tmpdir = "{:ensuredir.tmp:}",
        resources:
            mem = "500g",
        threads:
            32
        conda:
            "megahit"
        shell: r"""
        # iff there is an existing options.json with an existing megahit temp dir,
        # then we can continue an existing run
        MHTMP=$(sed -n '/MEGAHIT_TEMP_DIR:/ s/.*:\s*//p' "{params.workdir}/options.json" 2>/dev/null||true)
        CONTINUE=""
        if [ -e "$MHTMP" ]; then
            echo "YMP: Trying to restart aborted assembly"
            CONTINUE="--continue"
        else
            rm -rf "{params.workdir}"
        fi
        make_empty_result() {{
          echo "YMP: skipping assembly - $*" >> {log.log}
          mkdir -p {params.workdir}
          touch {params.workdir}/options.json
          touch {params.workdir}/log
          touch {params.workdir}/final.contigs.fa
          mkdir -p {params.workdir}/intermediate_contigs
          touch {params.workdir}/intermediate_contigs/k21.contigs.fa
        }}
        if [ -z "{params.r1}" ]; then
          make_empty_result "all input empty"
        else
          ARGS=""
          ARGS="$ARGS -1 {params.r1} -2 {params.r2}"
          ARGS="$ARGS --presets {params.preset}"
          ARGS="$ARGS --num-cpu-threads {threads}"
          ARGS="$ARGS --out-dir {params.workdir}"
          ARGS="$ARGS --tmp-dir {params.tmpdir}"
          ARGS="$ARGS $CONTINUE"
          echo "YMP: Running megahit $ARGS" >>{log.log}
          res=0
          megahit $ARGS >>{log.log} 2>&1 || res=$?
          [ -e {params.workdir}/done ] || res=1
          if [ $res -ne 0 ]; then
            if [ "{params.sufficient_input}" = "False" ]; then
              make_empty_result "Assuming failure due to low read count"
            else
              exit $res
            fi
          fi
        fi

        # output zipped contigs
        pigz -p {threads} -9 -c {params.workdir}/final.contigs.fa > {output.fasta}

        # output the zipped log
        cat {params.workdir}/{{options.json,log}} |\
          pigz -p {threads} -9 -c > {log.megahit}

        # determine largest K used
        MAXK=$(ls {params.workdir}/intermediate_contigs -1 |\
               sed -n 's%^k\([0-9]*\).*%\1%p' | sort -nr | head -n1)

        # output the zipped fastg
        megahit_toolkit contig2fastg $MAXK \
          {params.workdir}/intermediate_contigs/k${{MAXK}}.contigs.fa |\
          pigz -p {threads} -9 -c > {output.fastg}

        # remove intermediate contigs
        rm -rf {params.workdir}
        """