diff --git a/README.md b/README.md index 1e17a3d04a9214d5b50e03a221e4e89072a1f7fe..c6a396ca150464f7c06980a473dbd7160b701e1b 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ > What does it mean? -The name Piping has been chosen to suggest a set of (cluster job) pipelines. +The name Piping has been chosen to suggest a set of job pipelines. > What is it for? -Piping makes it easy to set up and run job pipelines. +Piping makes it easy to set up and run job pipelines in different environments. > Is it hard to use? @@ -14,60 +14,71 @@ Just edit a configuration file and run the program. ## Background -The Slurm workload manager offers a pipeline mechanism in the sense that jobs can be configured to start when one or more other jobs have finished. -But providing the dependency job ids for each submission command can quickly become tedious. -Moreover, it is sometimes quite difficult to estimate the duration of a job. +When working with a workload manager like Slurm, one can spend a lot of time writing and submitting job scripts. +Suppose you want to submit a series of jobs with dependencies and various specifications. +Creating all the job scripts then retrieving and providing the dependency job ids for each submission can quickly become tedious. +In this case, a certain degree of automation would then be appreciated. + +When preparing or modifying a pipeline, it can also be very useful to test the execution on your computer. This would allow you to check the progress of the jobs and quickly identify bugs without having to connect to the cluster or wait for resources. Therefore, it could be very advantageous to have a single file containing the essence of the pipeline and its jobs and that could be converted into scripts for Slurm or for your computer depending on the need. + +Finally, it is sometimes quite difficult to estimate the duration of a job. Besides, some clusters limit the maximum duration to a few hours or days. An automatic requeuing mechanism is often the solution in this case, but can be annoying to implement. -In short, a simple way to configure and submit pipelines with an auto-requeue feature could save valuable design and calculation time. +In short, a simple way to configure and submit pipelines with an auto-requeuing feature could save valuable design and calculation time. ## Features -Piping proposes a script that allows to submit a predefined series of jobs to the Slurm workload manager. +Piping proposes a script that allows to run ordered series of jobs. The preparation of a pipeline is done in a configuration file. -At runtime, this script reads the configuration file associated with the pipeline and translates its contents into directives for Slurm. -The generated job scripts implement an automatic requeue mechanism. -So if your job is not finished by the end of the time limit, a SIGTERM signal is sent a few seconds before the job is finished and placed back in the queue. +At runtime, this script reads the configuration file associated with the pipeline and translates its contents into directives for Slurm or for your computer. +When using the launcher for Slurm, the generated job scripts implement an automatic requeuing mechanism. +So if one of your jobs is not finished by the end of the time limit, a SIGTERM signal is sent a few seconds before the job is finished and placed back in the queue. ## Installation You can simply clone or download the project. -But since the main part of the project is in the [run.py](/run.py) script you can also simply copy and paste its content. +But since the essence of the project is contained in the [run.py](/run.py) script, you can also simply copy and paste its content. ## Usage -You can submit the pipeline configured in the `pipeline-example.conf` file with the command: +Check that the [run.py](/run.py) script is executable so that you can run it without typing "python3" in the command line. +Then, you can run the vanilla example pipeline configured in the [pipelines/vanilla.conf](/pipelines/vanilla.conf) file with the command: ```bash -./run.py pipeline-example.conf +./run.py pipelines/vanilla.conf --local ``` -You can store your pipeline configuration files anywhere, as long as you pass the correct path when calling the script. +The `--local` option allows you to run the pipeline on your computer. +To submit to Slurm, remove this option or use `--slurm` instead. +You can store your pipeline configuration files anywhere as long as you pass the correct path as an argument when calling the script. + All that remains is to configure your own pipeline. -To do this, generate a file in the same format as [pipeline-example.conf](/pipeline-example.conf). -Each job is configured in a section of the file. -Note that the `DEFAULT` section defines the default values for all jobs in the pipeline. +To do this, create a file in the same format as [pipelines/customized.conf](/pipelines/customized.conf). +Except for the `DEFAULT` section (which defines the default values for all jobs), each section of the file corresponds to the configuration of a job of the pipeline. For each job you can configure the following fields: -* `partition`: Name of the cluster partition on which to run the job. -* `nodes`: Number of nodes reserved for the job. -* `mail-user`: Mail for sending job related notifications (optional). -* `mail-type`: Type (optional). -* `time`: Maximum duration for the job. -* `workdir`: Path to the working directory for the job. -* `jobfile`: Name of the log file. -* `logfile`: Name of the job file. -* `stage`: String defining at which stage the job must be executed. -* `command`: Bash command to run. -* `modules`: List of modules to be loaded. -* `venv`: Python virtual environment to be used. - -To define the fields (such as `workdir` or `jobfile`) you can use the following environment variables: - -* `pipeline`: Name of the pipeline. -* `date`: Date of submission of the pipeline. -* `stage`: Stage of the job. -* `job`: Name of the job. +* `stage` **(required)**: String defining at which stage the job must be executed. The execution follows the lexicographic order and jobs that have the same stage value can be executed in parallel. By default, all jobs in a stage must be completed without error before moving to the next stage. +* `command` **(required)**: Main shell command of the job. This entry contains the essence of what the job does. As the SIGTERM signal is sent only once, it is strongly recommended to write only one command per job (i.e. not to put several commands separated by semicolons in this field). +* `directory` **(optional)**: Path to the directory in which the job is executed. The directory is created if it does not exist when the pipeline is launched. Other paths specified in the configuration file will be resolved from this directory if they are relative. By default, the name of the directory contains the creation date of the pipeline and the name of the pipeline so that each launch of a pipeline corresponds to a different directory. +* `modules` **(optional)**: The list of modules to load with the `module load ` command. Using this field assumes that the [modules package](https://github.com/cea-hpc/modules) is installed, but this is usually the case on most clusters. To indicate multiple modules, separate module names with spaces. +* `venv` **(optional)**: The path to the Python virtual environment to activate. Using this field will place the `source /bin/activate` command in the first lines of the job script. This command must be used for virtual environments created with the [venv](https://docs.python.org/3/library/venv.html) package. +* `conda` **(optional)**: Name to the Python virtual environment to activate. Using this field will place the `conda activate ` command in the first lines of the job script. This command must be used for virtual environments created with Conda. +* `array` **(optional)**: Allows to execute the job for different values of a variable named `$var`. Specify the values taken by `$var` separated by spaces and use `$var` or `${var}` in the command line of the job. The suffix can be replaced by a string of alphanumeric characters and underscore. This allows to define several arrays with their associated variables. To limit the number of simultaneous tasks, add the field `tasks` followed by an integer. +* `dependency` **(optional)**: Choose among `after`, `afterany`, `afterburstbuffer`, `aftercorr`, `afternotok`, `afterok` to select the condition to be respected by dependencies to start the job. This corresponds to what is passed to Slurm option [--dependency](https://slurm.schedmd.com/sbatch.html#OPT_dependency) before the list of job ids. +* `script` **(optional)**: Path to the job script file. If the path is relative, it will be evaluated from the path specified in the `directory` field. +* `logging` **(optional)**: Path to the job log file. If the path is relative, it will be evaluated from the path specified in the `directory` field. +* `partition` **(optional)**: Value passed to Slurm option [--partition](https://slurm.schedmd.com/sbatch.html#OPT_partition). +* `nodes` **(optional)**: Value passed to Slurm option [--nodes](https://slurm.schedmd.com/sbatch.html#OPT_nodes). +* `email` **(optional)**: Value passed to Slurm option [--mail-user](https://slurm.schedmd.com/sbatch.html#OPT_mail-user). +* `notification` **(optional)**: Value passed to Slurm option [--mail-type](https://slurm.schedmd.com/sbatch.html#OPT_mail-type). +* `time` **(optional)**: Value passed to Slurm option [--time](https://slurm.schedmd.com/sbatch.html#OPT_time). + +To define the fields (such as `directory`, `script` or `logging`) you can use the following environment variables: + +* `$PIPING_PIPELINE`: Name of the pipeline (corresponds to the stem of the pipeline configuration file name). +* `$PIPING_DATE`: UTC date of submission of the pipeline in ISO `YYYYMMDDTHHMMSSZ` format. +* `$PIPING_STAGE`: Stage of the job. +* `$PIPING_JOB`: Name of the job (corresponds to the name of the section in the configuration file). ## Credits diff --git a/material-example/executable-example.sh b/material-example/executable-example.sh deleted file mode 100755 index 87ad77458e8d757ba268c919e79ed9b0e93b0ecd..0000000000000000000000000000000000000000 --- a/material-example/executable-example.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -date -echo $@ -sleep 10 -touch output-$@ -echo done diff --git a/material/executable.sh b/material/executable.sh new file mode 100755 index 0000000000000000000000000000000000000000..d23f7f449b59ed2546bd4150238af6cfb63dc26d --- /dev/null +++ b/material/executable.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +date +echo "(re)starting" + +if ! test -f "output-$1-t1"; +then + echo "generating first output file..." + sleep 90 + date > output-$1-t1 + echo "done" +fi + +if ! test -f "output-$1-t2"; +then + echo "generating second output file..." + sleep 90 + date > output-$1-t2 + echo "done" +fi diff --git a/pipeline-example.conf b/pipeline-example.conf deleted file mode 100644 index cdf58de72bf2b7a946093c401766681ab3926083..0000000000000000000000000000000000000000 --- a/pipeline-example.conf +++ /dev/null @@ -1,37 +0,0 @@ -[DEFAULT] -partition = all.q -nodes = 1 -mail-user = your@email.com -mail-type = FAIL -time = 24:00:00 -workdir = scratch/$date-$pipeline -jobfile = $pipeline-$stage-$job.job -logfile = $pipeline-$stage-$job.log - -[data-copy] -stage = 0 -command = cp -r ../../material-example/* . - -[input-generation] -stage = 1 -command = ./executable-example.sh creating-data - -[output-generation-a] -stage = 2 -nodes = 2 -command = ./executable-example.sh processing-data-a - -[output-generation-b] -stage = 2 -command = ./executable-example.sh processing-data-b - -[post-treatment] -stage = 3 -nodes = 2 -command = ./executable-example.sh post-processing-data - -[archive-data] -mail-type = END, FAIL -stage = 4 -nodes = 1 -command = tar -cvf results.tar output-* diff --git a/pipelines/customized.conf b/pipelines/customized.conf new file mode 100644 index 0000000000000000000000000000000000000000..00e57c5cb67fc2551218a5b98afd44ab4f0ec00f --- /dev/null +++ b/pipelines/customized.conf @@ -0,0 +1,62 @@ +# The lines below do not correspond to a job but configure the default +# settings. The name "DEFAULT" is reserved for this use. +[DEFAULT] +partition = all.q +directory = scratch/$PIPING_DATE-$PIPING_PIPELINE-test +script = $PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.sh +logging = $PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.txt +email = your@email.com +notification = FAIL +time = 00:02:00 + +# The "copy" job defined below is an example of how to copy data to the +# working directory created when the pipeline was launched. +[copy] +stage = 0 +command = cp ../../material/executable.sh . --verbose + +# The "build" job defined below will use the modules package to load +# two modules. After expansion of the variables, the command executed +# will be: "./executable.sh build" +[build] +stage = 1 +modules = gcc/9.3.0 mpi/openmpi-4.1.1-gcc9-3 +command = ./executable.sh $PIPING_JOB + +# The "array-1" job defined below is an example of how to run a command +# for different values of a variable using Slurm arrays. During its +# execution, this job will generate two tasks with the commands: +# "./executable.sh array-1-a" and "./executable.sh array-1-b". +[array-1] +stage = 2 +array = a b +command = ./executable.sh $PIPING_JOB-$var + +# The "array-2" job defined below follows the example of the previous +# job but adding a variable. During its execution, this job will +# generate two tasks with the commands: +# "./executable.sh array-2-a-c" and "./executable.sh array-2-b-d". +[array-2] +stage = 2 +array_0 = a b +array_1 = c d +command = ./executable.sh $PIPING_JOB-${var_0}-${var_1} + +# The "post-process" job defined bellow shows how to run a job on +# multiple nodes. +[post-process] +stage = 3 +nodes = 2 +command = ./executable.sh $PIPING_JOB +# In Slurm versions prior to 19.05.3, arrays with requeued tasks always +# return an exit code different from 0. The following line allows to +# bypass this bug. Remove it if your version of Slurm is >= 19.05.3. +dependency = afterany + +# This last job allows to store the produced data in an archive. Here, +# the type of event notified by email is modified in order to inform +# the user when the pipeline is completed. +[archive] +notification = END,FAIL +stage = 5 +command = tar -cvf $PIPING_DATE-$PIPING_PIPELINE.tar output-* diff --git a/pipelines/vanilla.conf b/pipelines/vanilla.conf new file mode 100644 index 0000000000000000000000000000000000000000..1325c321bfdfadf7c75b67b6add5780f4f6af17d --- /dev/null +++ b/pipelines/vanilla.conf @@ -0,0 +1,14 @@ +# The three lines below define a job called "print-hello" that just +# displays "Hello". +[print-hello] +stage = 0 +command = echo Hello + +# The three lines below define an other job called "print-world" that +# displays "world!". +[print-world] +stage = 1 +command = echo world! + +# Given the values chosen for field "stage", the "print-hello" job will +# always be executed before "print-world" job. diff --git a/run.py b/run.py index fd853ceb80aab9e728ac0076b913776be9679c90..d76b328a30ec605f0c656f31f662a8114a3fef49 100755 --- a/run.py +++ b/run.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""Script for pipeline submission. +"""Script to run job pipelines. -This script allows to submit a predefined series of jobs to the Slurm -workload manager. The preparation of a pipeline is done in a -configuration file. At runtime, this script reads the configuration -file associated with the pipeline and translates its contents into -directives for Slurm. +This script allows to run an ordered series of jobs in different +environments. After being defined in a configuration file, jobs can +be run locally or submitted to a workload manager such as Slurm. At +runtime, this script reads the configuration file associated with the +pipeline to generate and execute the appropriate shell scripts. """ @@ -18,7 +18,8 @@ from logging import getLogger from os import makedirs, path from pathlib import Path from string import Template -from subprocess import CalledProcessError, check_output, PIPE +from subprocess import CalledProcessError, check_output, PIPE, run +from sys import exit as sys_exit from time import sleep @@ -26,10 +27,12 @@ logger = getLogger(__name__) class Pipeline: - """Class representing an ordered sequence of jobs to execute. + """This class represents an ordered series of jobs. Attributes ---------- + defaults : dict of str + Default value for job configuration. name : str Pipeline name. date : str @@ -43,8 +46,14 @@ class Pipeline: """ + defaults = { + 'directory': 'scratch/$PIPING_DATE-$PIPING_PIPELINE', + 'script': '$PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.job', + 'logging': '$PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.log', + } + def __init__(self, config_path): - """Instantiate a pipeline launcher. + """Instantiate a pipeline representation. Parameters ---------- @@ -54,11 +63,11 @@ class Pipeline: """ logger.debug("defining pipeline variables") self.name = Path(config_path).stem - self.date = datetime.now().strftime("%Y%m%dT%H%M%S") + self.date = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") logger.debug("loading pipeline configuration file %s", config_path) - self.config = ConfigParser() - with open(config_path, encoding='utf-8') as file: + self.config = ConfigParser(defaults=self.defaults) + with open(config_path, 'r', encoding='utf-8') as file: self.config.read_file(file) logger.debug("defining job stages and order") @@ -69,7 +78,7 @@ class Pipeline: self.order = sorted(self.stages.keys()) def variables(self, job): - """Return the job environment variable values. + """Return the environment variables of the job. Parameters ---------- @@ -79,19 +88,19 @@ class Pipeline: Returns ------- dict of str - Job environment variable values. + Job environment variables. """ variables = { - 'pipeline': self.name, - 'date': self.date, - 'stage': self.config.get(job, 'stage'), - 'job': job, + 'PIPING_PIPELINE': self.name, + 'PIPING_DATE': self.date, + 'PIPING_STAGE': self.config.get(job, 'stage'), + 'PIPING_JOB': job, } return variables def substitute(self, template, job): - """Return the template with environment variables expanded. + """Return the template with expanded environment variables. Parameters ---------- @@ -112,8 +121,14 @@ class Pipeline: result = path.expandvars(result) return result + +class SlurmPipeline(Pipeline): + """This class allows to submit a pipeline to Slurm. + + """ + def script(self, job): - """Return the Job script content. + """Return the Slurm job script content. Parameters ---------- @@ -128,51 +143,88 @@ class Pipeline: """ logger.debug("retrieving %s job settings", job) section = self.config[job] - logfile = self.substitute(section['logfile'], job) - - logger.debug("generating %s job script", job) - lines = [] - lines.append("#!/bin/bash") - lines.append(f"#SBATCH --nodes {section.get('nodes')}") - if 'mail-user' in section: - lines.append(f"#SBATCH --mail-user {section.get('mail-user')}") - if 'mail-type' in section: - lines.append(f"#SBATCH --mail-type {section.get('mail-type')}") - lines.append(f"#SBATCH --job-name {job}") - lines.append(f"#SBATCH --time {section.get('time')}") - lines.append(f"#SBATCH --partition {section.get('partition')}") - lines.append("#SBATCH --exclusive") - lines.append("#SBATCH --mem 0") - lines.append(f"#SBATCH --error {logfile}") - lines.append(f"#SBATCH --output {logfile}") - lines.append("#SBATCH --open-mode append") - lines.append("") - if 'modules' in section: - for module in section.get('modules', '').split(): - lines.append(f"module load {module}") - lines.append("") - if 'venv' in section: + logfile = self.substitute(section.get('logging', ''), job) + variables = self.variables(job) + content = "\n" + requeue = "$SLURM_JOB_ID" + + logger.debug("mapping options") + options = { + 'job-name': job, + 'nodes': section.get('nodes', '1'), + 'mail-user': section.get('email', ''), + 'mail-type': section.get('notification', ''), + 'time': section.get('time', '24:00:00'), + 'partition': section.get('partition', ''), + 'mem': section.get('mem', '0'), + 'error': logfile, + 'output': logfile, + 'open-mode': section.get('open-mode', 'append'), + 'exclusive': section.getboolean('exclusive', True), + } + + logger.debug("export environment variables") + content += "".join(f"export {k}={v}\n" for k, v in variables.items()) + + logger.debug("define array") + array = {k[5:]: v for k, v in section.items() if k.startswith('array')} + lengths = [len(v.split()) for v in array.values()] + if not all(length == lengths[0] for length in lengths[1:]): + raise ValueError("arrays should be the same length") + if array: + options['error'] += '-%a' + options['output'] += '-%a' + options['array'] = f"0-{lengths[0]-1}" + if section.get('tasks', ''): + options['array'] += f"%{int(section['tasks'])}" + requeue = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" + content += "\n" + for key, value in array.items(): + content += f"arr{key}=({value})\n" + for key in array: + content += f"var{key}=${{arr{key}[$SLURM_ARRAY_TASK_ID]}}\n" + + logger.debug("load modules") + if section.get('modules', ''): + content += "\nmodule purge\n" + for module in section.get('modules').split(): + content += f"module load {module}\n" + + logger.debug("activate virtual environment") + if section.get('venv', ''): source = path.join(section.get('venv'), 'bin', 'activate') - lines.append(f"source {source}") - lines.append("") - lines.append("terminate () {") - lines.append(" echo caught SIGTERM") - lines.append(' echo terminating "$pid"') - lines.append(' kill -TERM "$pid"') - lines.append(' wait "$pid"') - lines.append(" echo requeuing $SLURM_JOB_ID") - lines.append(" scontrol requeue $SLURM_JOB_ID") - lines.append("}") - lines.append("") - lines.append("trap terminate SIGTERM") - lines.append("") - lines.append(f"srun {section.get('command')} &") - lines.append("pid=$!") - lines.append('wait "$pid"') - return "\n".join(lines) + content += f"\nsource {source}\n" + elif section.get('conda', ''): + content += f"\nconda activate {section.get('conda')}\n" + + logger.debug("implement requeuing mechanism") + content += ( + "\nterminate () {\n" + " echo caught SIGTERM\n" + ' echo terminating "$pid"\n' + ' kill -TERM "$pid"\n' + ' wait "$pid"\n' + f" echo requeuing {requeue}\n" + f" scontrol requeue {requeue}\n" + "}\n" + "\ntrap terminate SIGTERM\n" + f"\nsrun {section['command']} &\n" + "pid=$!\n" + 'wait "$pid"\n' + ) + + logger.debug("pass options") + header = "#!/bin/bash\n" + for key, value in options.items(): + if isinstance(value, bool) and value: + header += f"#SBATCH --{key}\n" + elif isinstance(value, str) and value: + header += f"#SBATCH --{key} {value}\n" + + return header + content def submit(self, job, dependencies): - """Submit a job and return its job id. + """Submit a job to Slurm and return its job id. Parameters ---------- @@ -189,8 +241,8 @@ class Pipeline: """ logger.debug("retrieving %s job settings", job) section = self.config[job] - workdir = self.substitute(section['workdir'], job) - jobfile = self.substitute(section['jobfile'], job) + workdir = self.substitute(section['directory'], job) + jobfile = self.substitute(section['script'], job) logger.debug("initializing work directory") makedirs(workdir, exist_ok=True) @@ -201,7 +253,8 @@ class Pipeline: command = ['cd', workdir, ';', 'sbatch'] if len(dependencies) != 0: command.append('--dependency') - dependencies = [f'afterok:{jobid}' for jobid in dependencies] + dependency = section.get('dependency', 'afterok') + dependencies = [f'{dependency}:{jobid}' for jobid in dependencies] command.append(','.join(dependencies)) command.append('--parsable') command.append(jobfile) @@ -217,11 +270,15 @@ class Pipeline: except CalledProcessError as exc: raise RuntimeError(exc.returncode, exc.stderr) from exc - def run(self): + def run(self, *dependencies): """Submit the pipeline to Slurm. + Parameters + ---------- + *dependencies : list of str + Id of the jobs to be completed before. + """ - dependencies = [] for rank in self.order: ids = [] for job in self.stages[rank]: @@ -230,8 +287,112 @@ class Pipeline: dependencies = ids +class LocalPipeline(Pipeline): + """This class allows to execute a pipeline locally. + + """ + + def script(self, job): + """Return the job shell script. + + Parameters + ---------- + job : str + Name of the job in the pipeline. + + Returns + ------- + str + Job script content. + + """ + logger.debug("retrieving %s job settings", job) + section = self.config[job] + variables = self.variables(job) + content = "#!/bin/bash\n\n" + + logger.debug("export environment variables") + content += "".join(f"export {k}={v}\n" for k, v in variables.items()) + + logger.debug("define array") + array = {k[5:]: v for k, v in section.items() if k.startswith('array')} + lengths = [len(v.split()) for v in array.values()] + if not all(length == lengths[0] for length in lengths[1:]): + raise ValueError("arrays should be the same length") + if array: + content += "\n" + for key, value in array.items(): + content += f"arr{key}=({value})\n" + + logger.debug("activate virtual environment") + if section.get('venv', ''): + source = path.join(section.get('venv'), 'bin', 'activate') + content += f"\nsource {source}\n" + elif section.get('conda', ''): + content += f"\nconda activate {section.get('conda')}\n" + + logger.debug("execute command") + if array: + content += ( + f"\nfor i in {{0..{lengths[0]-1}}}\n" + f"do\n" + ) + for key in array: + content += f" var{key}=${{arr{key}[$i]}}\n" + content += ( + f" {section.get('command')}\n" + f"done" + ) + else: + content += "\n" + section.get('command') + + return content + + def submit(self, job): + """Execute a job locally. + + Parameters + ---------- + job : str + Name of the job to be executed. + + """ + logger.debug("retrieving %s job settings", job) + section = self.config[job] + workdir = self.substitute(section['directory'], job) + jobfile = self.substitute(section['script'], job) + logfile = self.substitute(section['logging'], job) + + logger.debug("initializing work directory") + makedirs(workdir, exist_ok=True) + with open(path.join(workdir, jobfile), 'w', encoding='utf-8') as file: + file.write(self.script(job)) + + logger.debug("executing job") + print(f"running {job}") + command = f"cd {workdir}; bash {jobfile} > {logfile} 2>&1" + result = run(command, shell=True, check=False) + if result.returncode: + print(f"exited with code {result.returncode}, see {logfile}") + sys_exit(result.returncode) + + def run(self, *args): + """Run the pipeline. + + """ + _ = args + for rank in self.order: + for job in self.stages[rank]: + self.submit(job) + + if __name__ == '__main__': + launchers = { + 'local': LocalPipeline, + 'slurm': SlurmPipeline, + } + parser = ArgumentParser( prog='./run.py', description="pipeline launcher", @@ -241,7 +402,28 @@ if __name__ == '__main__': help="path to the pipeline configuration file", metavar='path', ) - args = parser.parse_args() - - pipeline = Pipeline(args.pipeline) - pipeline.run() + parser.add_argument( + '-d', '--dependency', + help="id of the jobs to be completed before", + nargs='+', + metavar='id', + default=[], + ) + group = parser.add_mutually_exclusive_group() + for name, lancher in launchers.items(): + group.add_argument( + f'--{name}', + action='store_const', + const=name, + dest='launcher', + help=f"launch with {lancher.__name__}", + ) + arguments = parser.parse_args() + + try: + launcher = launchers.get(arguments.launcher, SlurmPipeline) + pipeline = launcher(arguments.pipeline) + pipeline.run(*arguments.dependency) + except KeyboardInterrupt: + print("program interrupted by Control-C") + sys_exit(130)