Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spades_compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ if [ $RUN_TESTS = "y" ]; then
"$WORK_DIR/bin/include_test" ; set -e
"$WORK_DIR/bin/debruijn_test" ; set -e
fi
SPADES="$BASEDIR"/spades.py
SPADES="$BASEDIR"/bin/spades.py
"$SPADES" -t $AMOUNT_OF_THREADS --test ; set -e
"$SPADES" -t $AMOUNT_OF_THREADS --test --isolate ; set -e
"$SPADES" -t $AMOUNT_OF_THREADS --test --sc ; set -e
Expand Down
2 changes: 1 addition & 1 deletion src/projects/mts/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
data: "path/to/samples/directory"
soft: "path/to/software/directory" #Directory with kmc, binners, CheckM, etc.
#bin: "/path/to/mts/binaries" #Derived from the current dir by default
#scripts: "/path/to/scripts/directory" #Derived from the current dir by default
#supplemetary: "/path/to/supplemetary/directory" #Derived from the current dir by default
#threads: 8 #Maximum number of threads for a task to use
#assembly:
#assembler: spades
Expand Down
2 changes: 1 addition & 1 deletion src/projects/mts/scripts/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def fill_default_values(config):
local_dir = config.get("LOCAL_DIR")
if local_dir:
default_values["bin"] = os.path.join(local_dir, "build/release/bin")
default_values["scripts"] = os.path.join(local_dir, "src/projects/mts/scripts")
default_values["supplemetary"] = os.path.join(local_dir, "src/projects/mts/supplemetary")
default_values["assembly"]["dir"] = os.path.join(local_dir, "bin")
setdefault_recursively(config)
config["reassembly"].setdefault("dir", config["assembly"].get("dir"))
Expand Down
Empty file.
186 changes: 97 additions & 89 deletions src/projects/spades/pipeline/spades.py

Large diffs are not rendered by default.

26 changes: 16 additions & 10 deletions src/projects/spades/pipeline/spades_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,41 @@
import sys
from os.path import abspath, dirname, realpath, join, isfile

source_dirs = ["", "truspades", "common", "executors", "scripts"]

# developers configuration
spades_home = abspath(dirname(realpath(__file__)))
bin_home = join(spades_home, "bin")
python_modules_home = join(spades_home, "src")
ext_python_modules_home = join(spades_home, "ext", "src", "python_libs")
spades_root = abspath(join(spades_home, "../../../../"))
config_dirs = [(join(spades_root, "src", "projects", "spades", "configs"), "debruijn"),
(join(spades_root, "src", "projects", "ionhammer", "configs"), "ionhammer"),
(join(spades_root, "src", "projects", "hammer", "configs"), "hammer"),
(join(spades_root, "src", "projects", "corrector", "configs"), "corrector")]
bin_home = join(spades_root, "bin")
python_modules_home = join(spades_root, "src", "projects", "spades", "pipeline")
ext_python_modules_home = join(spades_root, "ext", "src", "python_libs")
spades_version = ""


def init():
global spades_home
global bin_home
global config_dirs
global python_modules_home
global spades_version
global ext_python_modules_home
global spades_root

# users configuration (spades_init.py and spades binary are in the same directory)
if isfile(os.path.join(spades_home, "spades-core")):
# users configuration (spades_init.py and spades binary are in the same directory)
install_prefix = dirname(spades_home)
bin_home = join(install_prefix, "bin")
spades_home = join(install_prefix, "share", "spades")
config_dirs = [(join(spades_home, "configs"), "./")]
python_modules_home = spades_home
ext_python_modules_home = spades_home
spades_version = open(join(spades_home, "VERSION"), 'r').readline().strip()
else:
spades_version = open(join(spades_root, "VERSION"), 'r').readline().strip()

for dir in source_dirs:
sys.path.append(join(python_modules_home, "spades_pipeline", dir))

spades_version = open(join(spades_home, "VERSION"), 'r').readline().strip()
sys.path.append(join(python_modules_home))


if __name__ == "__main__":
Expand Down
Empty file.
18 changes: 9 additions & 9 deletions src/projects/spades/pipeline/spades_pipeline/commands_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@

import random
import string

import support
import sys

from . import support
from .options_storage import OptionStorage
options_storage = OptionStorage()


class Command(object):
def __init__(self, STAGE, path, args, short_name, config_dir="",
def __init__(self, stage, path, args, short_name, config_dir="",
mpi_support=False, job_uuid="",
del_after=None, output_files=None):
self.STAGE = STAGE
self.stage = stage
self.path = path
self.args = args
self.short_name = short_name
Expand Down Expand Up @@ -66,7 +68,7 @@ def run(self, log):
support.sys_call(self.to_list(), log)

def to_dict(self):
return {"STAGE": self.STAGE,
return {"STAGE": self.stage,
"path": self.path,
"args": self.args,
"short_name": self.short_name,
Expand All @@ -77,10 +79,8 @@ def to_dict(self):
"del_after": self.del_after}

def generate_job_uuid(self):
return ('hpcSPAdes_' if self.mpi_support else 'SPAdes_') + \
self.STAGE.replace(' ', '_') + "_" + \
''.join([random.choice(string.ascii_uppercase + string.digits) for k in range(32)])

return (('hpcSPAdes_' if self.mpi_support else 'SPAdes_') + self.stage.replace(' ', '_') + "_" +
''.join([random.choice(string.ascii_uppercase + string.digits) for _ in range(32)]))


def write_commands_to_sh(commands, output_file):
Expand Down
7 changes: 4 additions & 3 deletions src/projects/spades/pipeline/spades_pipeline/common/SeqIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, handler):
self.cash = None

def FillCash(self):
if self.cash == None:
if self.cash is None:
self.cash = self.handler.readline()

def TrashCash(self):
Expand Down Expand Up @@ -74,7 +74,7 @@ class SeqRecord:
def __init__(self, seq, id, qual=None):
if qual != None and len(qual) != len(seq):
sys.stdout.write("oppa" + id + "oppa")
assert qual == None or len(qual) == len(seq)
assert qual is None or len(qual) == len(seq)
self.id = id
self.seq = seq
self.qual = qual
Expand All @@ -86,7 +86,7 @@ def __getitem__(self, key):
return self.seq[key]

def QualSubseq(self, l, r):
if self.qual != None:
if self.qual is not None:
return self.qual[l: r]
return None

Expand Down Expand Up @@ -181,6 +181,7 @@ def is_fastq(file_name):
def is_bam(file_name):
return check_extension(file_name, ['.bam'])


def is_sra(file_name):
return check_extension(file_name, ['.sra'])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import os
import shutil
import logging

import support
from .. import support


def align_bwa_pe_lib(command, index, reads_file1, reads_file2, work_dir, log, threads=1):
Expand Down Expand Up @@ -58,7 +59,7 @@ def align_bwa_pe_libs(command, index, reads, work_dir, log, threads):


def align_bwa(command, reference, dataset, work_dir, log=None, threads=1):
if log == None:
if log is None:
log = logging.getLogger('')
if os.path.exists(work_dir):
shutil.rmtree(work_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def GetHandlers(output_file_pattern, err_file_pattern, bid):
output_file_pattern = "/dev/null"
output = open(output_file_pattern.format(bid), "a")
if err_file_pattern == "":
return (output, subprocess.STDOUT)
return output, subprocess.STDOUT
else:
return (output, open(err_file_pattern.format(bid), "a"))
return output, open(err_file_pattern.format(bid), "a")


class ExternalCallTask:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import sys
from abc import ABCMeta, abstractmethod

import options_storage
import support
import commands_parser
from .. import support
from .. import commands_parser
from ..options_storage import OptionStorage

options_storage = OptionStorage()


class ExecutorBase(object):
Expand Down Expand Up @@ -56,18 +58,19 @@ class ExecutorCluster(ExecutorBase):
grid_engine_kill_command = None

def join(self, job_name):
support.sys_call(self.grid_engine_wait_command.format(JOB_NAME=job_name), log=self.log)
support.sys_call(self.grid_engine_wait_command.format(JOB_NAME=job_name), logger_instance=self.log)

def kill(self, job_name):
# support.sys_call(self.grid_engine_kill_command.format(JOB_NAME=job_name), log=self.log)
os.system(self.grid_engine_kill_command.format(JOB_NAME=job_name))

def run_cluster_command(self, cmd, uuid):
support.sys_call(cmd, log=self.log)
support.sys_call(cmd, logger_instance=self.log)
return uuid

def execute(self, commands):
jobs = []

def prev_id():
if not jobs:
return ""
Expand All @@ -81,11 +84,11 @@ def prev_id():
if os.path.isfile(stage_checkpoint_path) and \
("_start" not in command.short_name) and \
("_finish" not in command.short_name):
self.log.info("===== Skipping %s (already processed)" % command.STAGE)
self.log.info("===== Skipping %s (already processed)" % command.stage)
continue

if "_finish" not in command.short_name:
self.log.info("\n===== %s started. \n" % command.STAGE)
self.log.info("\n===== %s started. \n" % command.stage)

# `true' command does nothing, it corresponds to an arbitrary stage
# used for cleanup, restart-from, and other such stuff We skip its
Expand All @@ -98,16 +101,17 @@ def prev_id():
cmd = self.get_not_MPI_command(command, prev_id())
jid = self.run_cluster_command(cmd, command.job_uuid)
if "_start" not in command.short_name:
self.log.info("\n===== %s submitted. Job ID: %s \n" % (command.STAGE, jid))
self.log.info("\n===== %s submitted. Job ID: %s \n" % (command.stage, jid))
jobs.append(jid)

touch_command = commands_parser.Command(command.STAGE + "_touch",
"touch",
[stage_checkpoint_path],
"touch",
job_uuid=command.job_uuid + "_touch")
touch_command = commands_parser.Command(command.stage + "_touch",
"touch",
[stage_checkpoint_path],
"touch",
job_uuid=command.job_uuid + "_touch")

touch_jid = self.run_cluster_command(self.get_not_MPI_command(touch_command, prev_id()), touch_command.job_uuid)
touch_jid = self.run_cluster_command(self.get_not_MPI_command(touch_command, prev_id()),
touch_command.job_uuid)
jobs.append(touch_jid)

# FIXME implement
Expand All @@ -116,7 +120,7 @@ def prev_id():

if options_storage.args.stop_after == command.short_name or \
("_finish" in command.short_name and
options_storage.args.stop_after == command.short_name.split('_')[0]):
options_storage.args.stop_after == command.short_name.split('_')[0]):
self.log.info("\n======= Skipping the rest of SPAdes "
"pipeline (--stop-after was set to '%s'). "
"You can continue later with --continue or "
Expand Down Expand Up @@ -144,9 +148,10 @@ def get_MPI_sh_preambula(self):
preambula += "LOG_OUT=\"" + self.grid_engine_output_option.format(OUT=log_file) + "\"\n"
preambula += "ERR_OUT=\"" + self.grid_engine_err_output_option.format(ERR=log_file) + "\"\n"
memory_in_kb = int(options_storage.args.memory * 1024 * 1024)
preambula += "QUEUE=\"" + self.grid_engine_queue.format(QUEUE=options_storage.args.grid_queue) + "\"\n"
preambula += "QUEUE=\"" + self.grid_engine_queue.format(QUEUE=options_storage.args.grid_queue) + "\"\n"
preambula += "CLUSTER_ARGS=\"$QUEUE " + \
self.grid_engine_memory_option.format(MEMORY=memory_in_kb, TOTAL_MEMORY=memory_in_kb * options_storage.args.grid_nnodes) + " " + \
self.grid_engine_memory_option.format(MEMORY=memory_in_kb,
TOTAL_MEMORY=memory_in_kb * options_storage.args.grid_nnodes) + " " + \
self.grid_engine_thread_option.format(NNODES=options_storage.args.grid_nnodes,
NCPUS=options_storage.args.threads,
NPROCESSORS=options_storage.args.grid_nnodes * options_storage.args.threads) + " " + \
Expand All @@ -168,14 +173,14 @@ def get_MPI_sh_command(self, command, prev_job_name=""):
cmd += "$CLUSTER_ARGS "
cmd += self.grid_engine_mpi_runtime + " $MPIRUN_ARGS "
cmd1 = cmd
cmd = "# === STAGE " + command.STAGE + "(MPI) === \n"
cmd = "# === STAGE " + command.stage + "(MPI) === \n"
cmd += "CMD=\"" + command.mpi_sh_str() + "\"\n\n"
cmd += cmd1
cmd += "$CMD\n\n"
return cmd

def get_not_MPI_sh_command(self, command, prev_job_name=""):
cmd = "#=== STAGE " + command.STAGE + " (not MPI) ===\n"
cmd = "#=== STAGE " + command.stage + " (not MPI) ===\n"
cmd += "CMD=\"" + command.sh_str() + "\"\n\n"

cmd += self.grid_engine_submit_command + " "
Expand All @@ -188,7 +193,6 @@ def get_not_MPI_sh_command(self, command, prev_job_name=""):
cmd += "$CMD\n\n"
return cmd


def get_MPI_command(self, command, prev_job_name=""):
cmd = self.grid_engine_submit_command + " "
cmd += self.grid_engine_name_option.format(JOB_NAME=command.job_uuid) + " "
Expand All @@ -199,18 +203,19 @@ def get_MPI_command(self, command, prev_job_name=""):
cmd += self.grid_engine_dependency_option.format(WAIT_TAG=prev_job_name) + " "
cmd += self.grid_engine_queue.format(QUEUE=options_storage.args.grid_queue) + " "
memory_in_kb = int(options_storage.args.memory * 1024 * 1024)
cmd += self.grid_engine_memory_option.format(MEMORY=memory_in_kb, TOTAL_MEMORY=memory_in_kb * options_storage.args.grid_nnodes) + " "
cmd += self.grid_engine_memory_option.format(MEMORY=memory_in_kb,
TOTAL_MEMORY=memory_in_kb * options_storage.args.grid_nnodes) + " "
cmd += self.grid_engine_thread_option.format(NNODES=options_storage.args.grid_nnodes,
NCPUS=options_storage.args.threads,
NPROCESSORS=options_storage.args.grid_nnodes * options_storage.args.threads) + " "
NCPUS=options_storage.args.threads,
NPROCESSORS=options_storage.args.grid_nnodes * options_storage.args.threads) + " "
cmd += self.grid_engine_minimum_node_mem.format(MEMORY=memory_in_kb) + " "

cmd += self.grid_engine_mpi_runtime + " " + self.grid_engine_mpi_runtime_args.format(
NNODES=options_storage.args.grid_nnodes,
NCPUS=options_storage.args.threads) + " "

if options_storage.args.grid_profile:
name = command.STAGE + "_" + command.short_name + "_" + command.job_uuid
name = command.stage + "_" + command.short_name + "_" + command.job_uuid
profile = options_storage.args.output_dir + "/" + name + ".prof"
profile_line = " -x CPUPROFILE={PROFILE} ompi_profile_helper.sh ".format(PROFILE=profile)
else:
Expand All @@ -231,7 +236,6 @@ def get_MPI_command(self, command, prev_job_name=""):
cmd += command.mpi_str()
return cmd


def get_not_MPI_command(self, command, prev_job_name=""):
cmd = self.grid_engine_submit_command + " "
cmd += self.grid_engine_name_option.format(JOB_NAME=command.job_uuid) + " "
Expand Down
Loading