#!/usr/bin/env python3
# import sys
import os
import re
import subprocess
import json
import argparse
import shutil
from snakemake.utils import read_job_properties
[docs]class SbatchScheduler:
'''
Builds sbatch command. Commands map to SLURM sbatch options.
Params:
------
account - -A/--account
dependency - {{dependencies}}
error - -e/--error
mail_type - --mail-type
mail_user - --mail-user
ntasks - -n/--ntasks
output - -o/--output
qos - -q/--qos
time - -t/--time
'''
def __init__(self):
self.account = None
self.dependency = None
self.error = None
self.mail_type = None
self.mail_user = None
self.ntasks = None
self.output = None
self.qos = None
self.script = None
self.time = None
[docs] def build_cmd(self):
''' builds sbatch command matching its options '''
sbatch_options = list()
job_attributes = [
'account', 'dependency', 'error', 'output', 'mail_type',
'mail_user', 'ntasks', 'qos', 'time'
]
for attribute in job_attributes:
if getattr(self, attribute):
attribute_value = getattr(self, attribute)
sbatch_options.append('--{} \"{}\"'.format(
attribute.replace("_", "-"), attribute_value))
sbatch_options.append(self.script)
return 'sbatch' + ' ' + ' '.join(sbatch_options)
[docs]class QsubScheduler:
"""docstring for QsubScheduler"""
def __init__(self):
self.account = None
self.dependency = None
self.error = None
self.resources = None
self.mail_type = None
self.mail_user = None
self.ntasks = None
self.output = None
self.qos = None
self.script = None
self.time = None
[docs] def build_cmd(self):
resource_params = ""
depend = ""
qsub_options = list()
# Exclusive node
resource_params += " -l excl=1 "
# if self.time:
# resource_params += " -l \"walltime={}\" ".format(str(self.time))
if self.ntasks:
# resource_params += "nodes=1:ppn={}\" ".format(str(self.ntasks))
resource_params += " -pe mpi {} ".format(str(self.ntasks))
if self.account:
# qsub_options.append(" -A " + str(self.account))
qsub_options.append(" -q " + str(self.account))
if self.error:
qsub_options.append(" -e " + str(self.error))
if self.output:
qsub_options.append(" -o " + str(self.output))
if self.mail_type:
# qsub_options.append(" -m " + str(self.mail_type))
qsub_options.append(" -m s ") # + str(self.mail_type))
if self.mail_user:
qsub_options.append(" -M " + str(self.mail_user))
if self.qos:
qsub_options.append(" -p " + str(self.qos))
if resource_params:
qsub_options.append(resource_params)
if self.dependency:
for jobid in self.dependency:
depend = depend + ":" + jobid
#qsub_options.append(" -W \"depend=afterok" + str(depend) + "\"")
qsub_options.append(" -hold_jid " + ",".join(self.dependency))
if self.script:
qsub_options.append(" {} ".format(self.script))
return "qsub -V -S /bin/bash " + " ".join(qsub_options)
[docs]def read_sample_config(input_json):
''' load input sample_config file. Output of balsamic config sample. '''
try:
with open(input_json) as f:
return json.load(f)
except Exception as e:
raise e
[docs]def write_sacct_file(sacct_file, job_id):
''' writes a yaml file with job ids '''
try:
with open(sacct_file, 'a') as f:
f.write(job_id + "\n")
except FileNotFoundError as e:
raise e
# def write_scheduler_dump(scheduler_file, cmd):
# ''' writes sbatch dump for debuging purpose '''
# try:
# with open(scheduler_file, 'a') as f:
# f.write(cmd + "\n")
# f.write(sys.executable + "\n")
# except OSError:
# raise
[docs]def submit_job(sbatch_cmd, profile):
''' subprocess call for sbatch command '''
# run sbatch cmd
try:
res = subprocess.run(sbatch_cmd,
check=True,
shell=True,
stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise e
# Get jobid
res = res.stdout.decode()
if profile == "slurm":
m = re.search("Submitted batch job (\d+)", res)
jobid = m.group(1)
elif profile == "qsub":
jobid_tmp = str(res).split()[3]
jobid = jobid_tmp.strip("\"()\"")
print(jobid)
return jobid
# def singularity_param(sample_config, script_dir, jobscript, sbatch_script):
# ''' write a modified sbatch script based on singularity parameters '''
# if 'bind_path' not in sample_config['singularity']:
# raise KeyError("bind_path was not found in sample config.")
# if 'main_env' not in sample_config['singularity']:
# raise KeyError("main_env was not found in sample config.")
# if 'container_path' not in sample_config['singularity']:
# raise KeyError("container_path was not found sample config.")
# try:
# bind_path = sample_config['singularity']['bind_path']
# main_env = sample_config['singularity']['main_env']
# container_path = sample_config['singularity']['container_path']
# with open(sbatch_script, 'a') as f:
# f.write("#!/bin/bash" + "\n")
# f.write(
# f"function balsamic-run {{ singularity exec -B {bind_path} --app {main_env} {container_path} $@; }}"
# + "\n")
# f.write(f"# Snakemake original script {jobscript}" + "\n")
# f.write(f"balsamic-run bash {jobscript}" + "\n")
# sbatch_file = os.path.join(
# script_dir, sample_config["analysis"]["case_id"] + ".sbatch")
# return sbatch_file
# except OSError:
# raise
[docs]def get_parser():
''' argument parser '''
parser = argparse.ArgumentParser(description='''
This is an internal script and should be invoked independently.
This script gets a list of arugments (see --help) and submits a job to slurm as
afterok dependency.
''')
parser.add_argument("dependencies",
nargs="*",
help="{{dependencies}} from snakemake")
parser.add_argument("snakescript", help="Snakemake script")
parser.add_argument("--sample-config",
help='balsamic config sample output')
parser.add_argument("--profile", help="profile to run jobs")
parser.add_argument("--account",
required=True,
help='cluster account name')
parser.add_argument("--qos",
default='low',
help='cluster job Priority (slurm - QOS)')
parser.add_argument("--mail-type", help='cluster mail type')
parser.add_argument("--mail-user", help='mail user')
parser.add_argument("--log-dir", help="Log directory")
parser.add_argument("--result-dir", help="Result directory")
parser.add_argument("--script-dir", help="Script directory")
return parser
[docs]def main(args=None):
''' entry point for scheduler.py '''
parser = get_parser()
args = parser.parse_args(args)
jobscript = args.snakescript
job_properties = read_job_properties(jobscript)
shutil.copy2(jobscript, args.script_dir)
jobscript = os.path.join(args.script_dir, os.path.basename(jobscript))
if args.profile == 'slurm':
jobid = '%j'
scheduler_cmd = SbatchScheduler()
if args.dependencies:
scheduler_cmd.dependency = ','.join(
["afterok:%s" % d for d in args.dependencies])
elif args.profile == 'qsub':
jobid = '${JOB_ID}'
scheduler_cmd = QsubScheduler()
scheduler_cmd.dependency = args.dependencies
if not args.mail_type:
mail_type = job_properties["cluster"]["mail_type"]
sample_config = read_sample_config(input_json=args.sample_config)
sacct_file = os.path.join(args.log_dir,
sample_config["analysis"]["case_id"] + ".sacct")
balsamic_run_mode = os.getenv("BALSAMIC_STATUS", "conda")
# if balsamic_run_mode == 'container' and 'singularity' in sample_config:
# sbatch_script = os.path.join(args.script_dir,
# "sbatch." + os.path.basename(jobscript))
# sbatch_file = singularity_param(sample_config=sample_config,
# script_dir=args.script_dir,
# jobscript=jobscript,
# sbatch_script=sbatch_script)
# jobscript = sbatch_script
scheduler_cmd.account = args.account
scheduler_cmd.mail_type = mail_type
scheduler_cmd.error = os.path.join(
args.log_dir,
os.path.basename(jobscript) + "_" + jobid + ".err")
scheduler_cmd.output = os.path.join(
args.log_dir,
os.path.basename(jobscript) + "_" + jobid + ".out")
scheduler_cmd.ntasks = job_properties["cluster"]["n"]
scheduler_cmd.time = job_properties["cluster"]["time"]
scheduler_cmd.mail_user = args.mail_user
scheduler_cmd.script = jobscript
jobid = submit_job(scheduler_cmd.build_cmd(), args.profile)
# scheduler_file = os.path.join(args.script_dir, sample_config["analysis"]["case_id"] + ".scheduler_dump")
# if balsamic_run_mode == 'container' and 'singularity' in sample_config:
# write_scheduler_dump(scheduler_file=scheduler_file, cmd=scheduler_cmd.build_cmd())
write_sacct_file(sacct_file=sacct_file, job_id=jobid)
if __name__ == '__main__':
main()