Source code for ymp.cluster

"""
Module handling talking to cluster management systems

>>> python -m ymp.cluster slurm status <jobid>
"""

import subprocess as sp
import sys
import re

[docs]def error(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
[docs]class ClusterMS(object): pass
[docs]class Slurm(ClusterMS): "Talking to Slurm" states = { 'BOOT_FAIL': 'failed', # node launch failure 'CANCELLED': 'failed', # killed manually or by resource monitoring 'COMPLETED': 'success', # terminated with exit code 0 'CONFIGURING': 'running', # resources allocated, waiting for node 'COMPLETING': 'running', # some processes still active 'DEADLINE': 'failed', # job ran into deadline and was killed 'FAILED': 'failed', # job exited with code <> 0 or similar 'NODE_FAIL': 'failed', # job terminated due to node(s) failed 'PENDING': 'running', # job waiting for resources 'PREEMPTED': 'failed', # job terminated to make room 'RUNNING': 'running', # job has allocation and should be working 'RESIZING': 'running', # job is about to change size 'SUSPENDED': 'running', # job is paused 'TIMEOUT': 'failed', # job reached time limit # questionable states: 'SPECIAL_EXIT': 'running', # job failed but flagged "special_exit" 'REVOKED': 'running' # job removed due to other cluster starting it # Both of the above technically mean that the job isn't "done" yet, # running either on another cluster or on hold for a re-run after # initial failure. }
[docs] @staticmethod def status(jobid): """Print status of job @param jobid to stdout (as needed by snakemake) Anectotal benchmarking shows 200ms per invocation, half used by Python startup and half by calling sacct. Using ``scontrol show job`` instead of ``sacct -pbs`` is faster by 80ms, but finished jobs are purged after unknown time window. """ header = None res = sp.run(['sacct', '-pbj', jobid], stdout=sp.PIPE) jobs = [] for line in res.stdout.decode('ascii').splitlines(): line = line.strip().split("|") if header is None: header = line continue try: job = {key: line[header.index(key)] for key in ('JobID', 'State', 'ExitCode')} job['snakestate'] = Slurm.states[job['State'].split(' ')[0]] jobs.append(job) except ValueError as e: error(e) error(res.stdout) sys.exit(1) snakestates = [job['snakestate'] for job in jobs] if 'running' in snakestates: print('running') elif 'failed' in snakestates: print('failed') else: # job doesn't exist... assuming success print('success') sys.exit(0)
[docs]class Lsf(ClusterMS): "Talking to LSF" states = { 'PEND': 'running', 'RUN': 'running', 'DONE': 'success', 'PSUSP': 'running', 'USUSP': 'running', 'SSUSP': 'running', 'WAIT': 'running', 'EXIT': 'failed', 'POST_DONE': 'success', 'POST_ERR': 'failed', 'UNKWN': 'running', }
[docs] @staticmethod def status(jobid): header = None jobs = [] res = sp.run(['bjobs', jobid], stdout=sp.PIPE, stderr=sp.PIPE) for line in res.stdout.decode('ascii').splitlines(): if line[0] == " ": continue if header is None: colstarts = [(x, line.index(x)) for x in line.strip().split()] endcol = None header = {} for col in reversed(colstarts): header[col[0]] = (col[1], endcol) endcol=col[1] continue try: start, end = header['STAT'] status = line[start:end].strip() snakestate = Lsf.states[status] print(snakestate) sys.exit(0) except (KeyError, ValueError) as e: error(e) for l in res.stdout.splitlines(): print(b"STDOUT: " + l) for l in res.stderr.splitlines(): print(b"ERROUT: " + l) raise
[docs] @staticmethod def submit(args): res = sp.run(['bsub'] + args, stdout=sp.PIPE) match = re.search("Job <(\d+)>", res.stdout.decode('ascii')) if match: print(match.group(1)) else: print(res.stdout) return -1
if __name__ == "__main__": if len(sys.argv) < 3: error("""{self} ENGINE COMMAND ... Talks to cluster engine. Supported engines: - slurm - lsf Supported commands: - status <jobid> Returns running|success|failed status for - submit <args> jobid to be used by snakemake. """.format(self=sys.argv[0])) sys.exit(0 if sys.argv[1] == "-h" else 1) engine_name = sys.argv[1] command = sys.argv[2] if engine_name == "slurm": engine = Slurm() elif engine_name == "lsf": engine = Lsf() else: error("Don't know about cluster engine named '{}'".format(engine_name)) sys.exit(1) if command == "status": if len(sys.argv) != 4: error("Status command needs exactly one argument, the job id.") sys.exit(1) engine.status(sys.argv[3]) if command == "submit": engine.submit(sys.argv[3:]) else: error("Command '{}' not understood.".format(command)) sys.exit(1)