From d09db75364b6a27423bfa91eb3e23937870590c8 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Thu, 20 Apr 2023 21:52:18 -0400 Subject: [PATCH 01/11] export job variables --- pipeline-example.conf | 2 +- run.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pipeline-example.conf b/pipeline-example.conf index cdf58de..0a2b14b 100644 --- a/pipeline-example.conf +++ b/pipeline-example.conf @@ -34,4 +34,4 @@ command = ./executable-example.sh post-processing-data mail-type = END, FAIL stage = 4 nodes = 1 -command = tar -cvf results.tar output-* +command = tar -cvf $date-$pipeline.tar output-* diff --git a/run.py b/run.py index fd853ce..d837516 100755 --- a/run.py +++ b/run.py @@ -129,6 +129,7 @@ class Pipeline: logger.debug("retrieving %s job settings", job) section = self.config[job] logfile = self.substitute(section['logfile'], job) + variables = self.variables(job) logger.debug("generating %s job script", job) lines = [] @@ -147,6 +148,10 @@ class Pipeline: lines.append(f"#SBATCH --output {logfile}") lines.append("#SBATCH --open-mode append") lines.append("") + if len(variables) > 0: + for key, value in variables.items(): + lines.append(f"export {key}={value}") + lines.append("") if 'modules' in section: for module in section.get('modules', '').split(): lines.append(f"module load {module}") -- GitLab From 9e4109a5d67f512f6b0254b2549001c1f5908f2e Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Thu, 20 Apr 2023 22:48:29 -0400 Subject: [PATCH 02/11] copy data with a single node --- pipeline-example.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/pipeline-example.conf b/pipeline-example.conf index 0a2b14b..fedc98f 100644 --- a/pipeline-example.conf +++ b/pipeline-example.conf @@ -10,6 +10,7 @@ logfile = $pipeline-$stage-$job.log [data-copy] stage = 0 +nodes = 1 command = cp -r ../../material-example/* . [input-generation] -- GitLab From a2aabae7f1e7ed046d3b5f835c8b49d234b4d139 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Thu, 20 Apr 2023 23:58:50 -0400 Subject: [PATCH 03/11] fix mail type --- pipeline-example.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline-example.conf b/pipeline-example.conf index fedc98f..7d52813 100644 --- a/pipeline-example.conf +++ b/pipeline-example.conf @@ -32,7 +32,7 @@ nodes = 2 command = ./executable-example.sh post-processing-data [archive-data] -mail-type = END, FAIL +mail-type = END,FAIL stage = 4 nodes = 1 command = tar -cvf $date-$pipeline.tar output-* -- GitLab From de042a61568d5a243992f8127580021b32b70ec5 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Fri, 21 Apr 2023 22:31:11 -0400 Subject: [PATCH 04/11] enable local execution --- README.md | 52 ++++--- ...utable-example.sh => executable-example.sh | 0 pipeline-example.conf | 2 +- run.py | 134 +++++++++++++++--- 4 files changed, 148 insertions(+), 40 deletions(-) rename material-example/executable-example.sh => executable-example.sh (100%) diff --git a/README.md b/README.md index 1e17a3d..3d3af35 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ > What does it mean? -The name Piping has been chosen to suggest a set of (cluster job) pipelines. +The name Piping has been chosen to suggest a set of job pipelines. > What is it for? -Piping makes it easy to set up and run job pipelines. +Piping makes it easy to set up and run job pipelines in different environments. > Is it hard to use? @@ -14,53 +14,61 @@ Just edit a configuration file and run the program. ## Background -The Slurm workload manager offers a pipeline mechanism in the sense that jobs can be configured to start when one or more other jobs have finished. -But providing the dependency job ids for each submission command can quickly become tedious. -Moreover, it is sometimes quite difficult to estimate the duration of a job. +When working with a workload manager like Slurm, one can spend a lot of time writing and submitting job scripts. +Suppose you want to submit a series of dependent jobs that require different numbers of nodes, or on different partitions. +Creating a job script then retrieving and providing the dependency job ids for each submission can quickly become tedious. +A certain degree of automation would then be appreciated. + +Moreover, you may want to alternate the execution of your pipelines on a cluster and on your computer. This is sometimes the case during debugging phases. In this case, you would probably have to rewrite a script adapted to your computer. But it could be more advantageous to have a single file containing the essence of the jobs that could be converted into scripts for Slurm or for your computer depending on the need. + +Finally, it is sometimes quite difficult to estimate the duration of a job. Besides, some clusters limit the maximum duration to a few hours or days. An automatic requeuing mechanism is often the solution in this case, but can be annoying to implement. In short, a simple way to configure and submit pipelines with an auto-requeue feature could save valuable design and calculation time. ## Features -Piping proposes a script that allows to submit a predefined series of jobs to the Slurm workload manager. +Piping proposes a script that allows to run ordered series of jobs. The preparation of a pipeline is done in a configuration file. -At runtime, this script reads the configuration file associated with the pipeline and translates its contents into directives for Slurm. -The generated job scripts implement an automatic requeue mechanism. -So if your job is not finished by the end of the time limit, a SIGTERM signal is sent a few seconds before the job is finished and placed back in the queue. +At runtime, this script reads the configuration file associated with the pipeline and translates its contents into directives for Slurm or for your computer. +When using the launcher for Slurm, the generated job scripts implement an automatic requeue mechanism. +So if one of your jobs is not finished by the end of the time limit, a SIGTERM signal is sent a few seconds before the job is finished and placed back in the queue. ## Installation You can simply clone or download the project. -But since the main part of the project is in the [run.py](/run.py) script you can also simply copy and paste its content. +But since the essence of the project is contained in the [run.py](/run.py) script, you can also simply copy and paste its content. ## Usage -You can submit the pipeline configured in the `pipeline-example.conf` file with the command: +Remember to make the script `run.py` executable so that you can run it without typing "python3" in the command line. +Then, you can submit the pipeline configured in the [pipeline-example.conf](/pipeline-example.conf) file with the command: ```bash -./run.py pipeline-example.conf +./run.py pipeline-example.conf --local ``` +The `--local` option allows you to run the pipeline on your computer. +To submit to Slurm remove the option or use `--slurm` instead. You can store your pipeline configuration files anywhere, as long as you pass the correct path when calling the script. + All that remains is to configure your own pipeline. -To do this, generate a file in the same format as [pipeline-example.conf](/pipeline-example.conf). -Each job is configured in a section of the file. -Note that the `DEFAULT` section defines the default values for all jobs in the pipeline. +To do this, create a file in the same format as [pipeline-example.conf](/pipeline-example.conf). +Except for the `DEFAULT` section (which defines the default values for all jobs), each section of the file corresponds to the configuration of a job of the pipeline. For each job you can configure the following fields: * `partition`: Name of the cluster partition on which to run the job. * `nodes`: Number of nodes reserved for the job. * `mail-user`: Mail for sending job related notifications (optional). -* `mail-type`: Type (optional). -* `time`: Maximum duration for the job. -* `workdir`: Path to the working directory for the job. -* `jobfile`: Name of the log file. -* `logfile`: Name of the job file. +* `mail-type`: Events for which an email is sent (optional). +* `time`: Maximum duration of the job. +* `workdir`: Path to the working directory of the job. +* `jobfile`: Name of the job log file. +* `logfile`: Name of the job script file. * `stage`: String defining at which stage the job must be executed. * `command`: Bash command to run. -* `modules`: List of modules to be loaded. -* `venv`: Python virtual environment to be used. +* `modules`: List of modules to be loaded (optional). +* `venv`: Path to the Python virtual environment to be used (optional). To define the fields (such as `workdir` or `jobfile`) you can use the following environment variables: diff --git a/material-example/executable-example.sh b/executable-example.sh similarity index 100% rename from material-example/executable-example.sh rename to executable-example.sh diff --git a/pipeline-example.conf b/pipeline-example.conf index 7d52813..6821f10 100644 --- a/pipeline-example.conf +++ b/pipeline-example.conf @@ -11,7 +11,7 @@ logfile = $pipeline-$stage-$job.log [data-copy] stage = 0 nodes = 1 -command = cp -r ../../material-example/* . +command = cp -r ../../executable-example.sh . [input-generation] stage = 1 diff --git a/run.py b/run.py index d837516..ad1da58 100755 --- a/run.py +++ b/run.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""Script for pipeline submission. +"""Script to run job pipelines. -This script allows to submit a predefined series of jobs to the Slurm -workload manager. The preparation of a pipeline is done in a -configuration file. At runtime, this script reads the configuration -file associated with the pipeline and translates its contents into -directives for Slurm. +This script allows to run an ordered series of jobs in different +environments. After being defined in a configuration file, jobs can +be run locally or submitted to a workload manager such as Slurm. At +runtime, this script reads the configuration file associated with the +pipeline to generate and execute the appropriate shell scripts. """ @@ -15,10 +15,11 @@ from argparse import ArgumentParser from configparser import ConfigParser from datetime import datetime, timezone from logging import getLogger -from os import makedirs, path +from os import makedirs, path, system, waitstatus_to_exitcode from pathlib import Path from string import Template from subprocess import CalledProcessError, check_output, PIPE +from sys import exit as sys_exit from time import sleep @@ -26,7 +27,7 @@ logger = getLogger(__name__) class Pipeline: - """Class representing an ordered sequence of jobs to execute. + """This class represents an ordered series of jobs. Attributes ---------- @@ -44,7 +45,7 @@ class Pipeline: """ def __init__(self, config_path): - """Instantiate a pipeline launcher. + """Instantiate a pipeline representation. Parameters ---------- @@ -54,11 +55,11 @@ class Pipeline: """ logger.debug("defining pipeline variables") self.name = Path(config_path).stem - self.date = datetime.now().strftime("%Y%m%dT%H%M%S") + self.date = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") logger.debug("loading pipeline configuration file %s", config_path) self.config = ConfigParser() - with open(config_path, encoding='utf-8') as file: + with open(config_path, 'r', encoding='utf-8') as file: self.config.read_file(file) logger.debug("defining job stages and order") @@ -69,7 +70,7 @@ class Pipeline: self.order = sorted(self.stages.keys()) def variables(self, job): - """Return the job environment variable values. + """Return the environment variables of the job. Parameters ---------- @@ -79,7 +80,7 @@ class Pipeline: Returns ------- dict of str - Job environment variable values. + Job environment variables. """ variables = { @@ -91,7 +92,7 @@ class Pipeline: return variables def substitute(self, template, job): - """Return the template with environment variables expanded. + """Return the template with expanded environment variables. Parameters ---------- @@ -112,8 +113,14 @@ class Pipeline: result = path.expandvars(result) return result + +class SlurmPipeline(Pipeline): + """This class allows to submit a pipeline to Slurm. + + """ + def script(self, job): - """Return the Job script content. + """Return the Slurm job script of the job. Parameters ---------- @@ -177,7 +184,7 @@ class Pipeline: return "\n".join(lines) def submit(self, job, dependencies): - """Submit a job and return its job id. + """Submit a job to Slurm and return its job id. Parameters ---------- @@ -235,8 +242,91 @@ class Pipeline: dependencies = ids +class LocalPipeline(Pipeline): + """This class allows to execute a pipeline locally. + + """ + + def script(self, job): + """Return the job script of the job. + + Parameters + ---------- + job : str + Name of the job in the pipeline. + + Returns + ------- + str + Job script content. + + """ + logger.debug("retrieving %s job settings", job) + section = self.config[job] + variables = self.variables(job) + + logger.debug("generating %s job script", job) + lines = [] + lines.append("#!/bin/bash") + lines.append("") + if len(variables) > 0: + for key, value in variables.items(): + lines.append(f"export {key}={value}") + lines.append("") + if 'venv' in section: + source = path.join(section.get('venv'), 'bin', 'activate') + lines.append(f"source {source}") + lines.append("") + lines.append(section.get('command')) + return "\n".join(lines) + + def submit(self, job): + """Execute a job locally . + + Parameters + ---------- + job : str + Name of the job to be executed. + + """ + logger.debug("retrieving %s job settings", job) + section = self.config[job] + workdir = self.substitute(section['workdir'], job) + jobfile = self.substitute(section['jobfile'], job) + logfile = self.substitute(section['logfile'], job) + + logger.debug("initializing work directory") + makedirs(workdir, exist_ok=True) + with open(path.join(workdir, jobfile), 'w', encoding='utf-8') as file: + file.write(self.script(job)) + + logger.debug("executing job") + print(f"running {job}") + status = system(f"cd {workdir}; bash {jobfile} > {logfile} 2>&1") + code = waitstatus_to_exitcode(status) + if code > 0: + print(f"exited with code: {code}, see {logfile}") + sys_exit(code) + elif code < 0: + print(f"terminated by signum: {-code}") + sys_exit(1) + + def run(self): + """Run the pipeline. + + """ + for rank in self.order: + for job in self.stages[rank]: + self.submit(job) + + if __name__ == '__main__': + launchers = { + 'local': LocalPipeline, + 'slurm': SlurmPipeline, + } + parser = ArgumentParser( prog='./run.py', description="pipeline launcher", @@ -246,7 +336,17 @@ if __name__ == '__main__': help="path to the pipeline configuration file", metavar='path', ) + group = parser.add_mutually_exclusive_group() + for key in launchers: + group.add_argument( + f'--{key}', + action='store_const', + const=key, + dest='launcher', + help=f"launch with {launchers[key].__name__}", + ) args = parser.parse_args() - pipeline = Pipeline(args.pipeline) + launcher = launchers.get(args.launcher, SlurmPipeline) + pipeline = launcher(args.pipeline) pipeline.run() -- GitLab From 78731137129fd368a435c624978ff81bf457305b Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Sat, 22 Apr 2023 21:31:12 -0400 Subject: [PATCH 05/11] use only subprocess --- run.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/run.py b/run.py index ad1da58..9bf9918 100755 --- a/run.py +++ b/run.py @@ -15,10 +15,10 @@ from argparse import ArgumentParser from configparser import ConfigParser from datetime import datetime, timezone from logging import getLogger -from os import makedirs, path, system, waitstatus_to_exitcode +from os import makedirs, path from pathlib import Path from string import Template -from subprocess import CalledProcessError, check_output, PIPE +from subprocess import CalledProcessError, check_output, PIPE, run from sys import exit as sys_exit from time import sleep @@ -156,8 +156,8 @@ class SlurmPipeline(Pipeline): lines.append("#SBATCH --open-mode append") lines.append("") if len(variables) > 0: - for key, value in variables.items(): - lines.append(f"export {key}={value}") + for name, value in variables.items(): + lines.append(f"export {name}={value}") lines.append("") if 'modules' in section: for module in section.get('modules', '').split(): @@ -270,8 +270,8 @@ class LocalPipeline(Pipeline): lines.append("#!/bin/bash") lines.append("") if len(variables) > 0: - for key, value in variables.items(): - lines.append(f"export {key}={value}") + for name, value in variables.items(): + lines.append(f"export {name}={value}") lines.append("") if 'venv' in section: source = path.join(section.get('venv'), 'bin', 'activate') @@ -302,14 +302,11 @@ class LocalPipeline(Pipeline): logger.debug("executing job") print(f"running {job}") - status = system(f"cd {workdir}; bash {jobfile} > {logfile} 2>&1") - code = waitstatus_to_exitcode(status) - if code > 0: - print(f"exited with code: {code}, see {logfile}") - sys_exit(code) - elif code < 0: - print(f"terminated by signum: {-code}") - sys_exit(1) + command = f"cd {workdir}; bash {jobfile} > {logfile} 2>&1" + result = run(command, shell=True, check=False) + if result.returncode: + print(f"exited with code {result.returncode}, see {logfile}") + sys_exit(result.returncode) def run(self): """Run the pipeline. @@ -337,16 +334,20 @@ if __name__ == '__main__': metavar='path', ) group = parser.add_mutually_exclusive_group() - for key in launchers: + for key, lancher in launchers.items(): group.add_argument( f'--{key}', action='store_const', const=key, dest='launcher', - help=f"launch with {launchers[key].__name__}", + help=f"launch with {lancher.__name__}", ) args = parser.parse_args() - launcher = launchers.get(args.launcher, SlurmPipeline) - pipeline = launcher(args.pipeline) - pipeline.run() + try: + launcher = launchers.get(args.launcher, SlurmPipeline) + pipeline = launcher(args.pipeline) + pipeline.run() + except KeyboardInterrupt: + print("program interrupted by Control-C") + sys_exit(130) -- GitLab From 2cda71c4c8c137b77ab59e16d28cded3dbd34b60 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Tue, 25 Apr 2023 00:58:44 -0400 Subject: [PATCH 06/11] add arrays --- README.md | 49 ++++++------ executable-example.sh | 7 -- material/executable.sh | 20 +++++ pipeline-example.conf | 38 ---------- pipelines/customized.conf | 32 ++++++++ pipelines/vanilla.conf | 7 ++ run.py | 155 +++++++++++++++++++++++--------------- 7 files changed, 179 insertions(+), 129 deletions(-) delete mode 100755 executable-example.sh create mode 100755 material/executable.sh delete mode 100644 pipeline-example.conf create mode 100644 pipelines/customized.conf create mode 100644 pipelines/vanilla.conf diff --git a/README.md b/README.md index 3d3af35..62aebd4 100644 --- a/README.md +++ b/README.md @@ -24,14 +24,14 @@ Moreover, you may want to alternate the execution of your pipelines on a cluster Finally, it is sometimes quite difficult to estimate the duration of a job. Besides, some clusters limit the maximum duration to a few hours or days. An automatic requeuing mechanism is often the solution in this case, but can be annoying to implement. -In short, a simple way to configure and submit pipelines with an auto-requeue feature could save valuable design and calculation time. +In short, a simple way to configure and submit pipelines with an auto-requeuing feature could save valuable design and calculation time. ## Features Piping proposes a script that allows to run ordered series of jobs. The preparation of a pipeline is done in a configuration file. At runtime, this script reads the configuration file associated with the pipeline and translates its contents into directives for Slurm or for your computer. -When using the launcher for Slurm, the generated job scripts implement an automatic requeue mechanism. +When using the launcher for Slurm, the generated job scripts implement an automatic requeuing mechanism. So if one of your jobs is not finished by the end of the time limit, a SIGTERM signal is sent a few seconds before the job is finished and placed back in the queue. ## Installation @@ -41,41 +41,42 @@ But since the essence of the project is contained in the [run.py](/run.py) scrip ## Usage -Remember to make the script `run.py` executable so that you can run it without typing "python3" in the command line. -Then, you can submit the pipeline configured in the [pipeline-example.conf](/pipeline-example.conf) file with the command: +Check that the [`run.py`](/run.py) script is executable so that you can run it without typing "python3" in the command line. +Then, you can run the vanilla example pipeline configured in the [`pipelines/vanilla.conf`](/pipelines/vanilla.conf) file with the command: ```bash -./run.py pipeline-example.conf --local +./run.py pipelines/vanilla.conf --local ``` The `--local` option allows you to run the pipeline on your computer. -To submit to Slurm remove the option or use `--slurm` instead. -You can store your pipeline configuration files anywhere, as long as you pass the correct path when calling the script. +To submit to Slurm, remove this option or use `--slurm` instead. +You can store your pipeline configuration files anywhere as long as you pass the correct path as an argument when calling the script. All that remains is to configure your own pipeline. -To do this, create a file in the same format as [pipeline-example.conf](/pipeline-example.conf). +To do this, create a file in the same format as [`pipelines/customized.conf`](/pipelines/customized.conf). Except for the `DEFAULT` section (which defines the default values for all jobs), each section of the file corresponds to the configuration of a job of the pipeline. For each job you can configure the following fields: -* `partition`: Name of the cluster partition on which to run the job. -* `nodes`: Number of nodes reserved for the job. -* `mail-user`: Mail for sending job related notifications (optional). -* `mail-type`: Events for which an email is sent (optional). -* `time`: Maximum duration of the job. -* `workdir`: Path to the working directory of the job. -* `jobfile`: Name of the job log file. -* `logfile`: Name of the job script file. -* `stage`: String defining at which stage the job must be executed. -* `command`: Bash command to run. -* `modules`: List of modules to be loaded (optional). -* `venv`: Path to the Python virtual environment to be used (optional). +* `stage`: String defining at which stage the job must be executed. The execution follows the lexicographic order and jobs that have the same stage value can be executed in parallel. Stages are launched in lexicographical order, and all jobs in a stage must be completed before moving to the next stage. +* `command`: Main shell command of the job. This entry contains the essence of what the job does. As the SIGTERM signal is sent only once, it is strongly recommended to write only one command per job (i.e. not to put several commands separated by semicolons in this field). +* `workdir` (optional): The job script and the log file will be placed in this folder. The job will be executed from this folder. The relative paths will be resolved from this folder. By default the folder name contains the creation date of the pipeline so that each launch of a pipeline corresponds to a different folder. +* `modules` (optional): List of modules to load with the `module load ...` command. Using this field assumes that the [modules package](https://github.com/cea-hpc/modules) is installed, but this is usually the case on most clusters. +* `venv` (optional): Path to the Python virtual environment to be used. Using this field will place the `source /bin/activate` command in the job initialization. +* `array` (optional): Allows to execute the job for different values of a variable named `$var`. Specify the values taken by `$var` separated by a space and use `$var` in the command line of the job. +* `jobfile` (optional): Path to the job log file. +* `logfile` (optional): Path to the job script file. +* `partition` (optional): Value passed to Slurm option [--patition](https://slurm.schedmd.com/sbatch.html#OPT_partition). +* `nodes` (optional): Value passed to Slurm option [--nodes](https://slurm.schedmd.com/sbatch.html#OPT_nodes). +* `mail-user` (optional): Value passed to Slurm option [--mail-user](https://slurm.schedmd.com/sbatch.html#OPT_mail-user). +* `mail-type` (optional): Value passed to Slurm option [--mail-type](https://slurm.schedmd.com/sbatch.html#OPT_mail-type). +* `time` (optional): Value passed to Slurm option [--time](https://slurm.schedmd.com/sbatch.html#OPT_time). To define the fields (such as `workdir` or `jobfile`) you can use the following environment variables: -* `pipeline`: Name of the pipeline. -* `date`: Date of submission of the pipeline. -* `stage`: Stage of the job. -* `job`: Name of the job. +* `$pipeline`: Name of the pipeline (corresponds to the stem of the pipeline configuration file name). +* `$date`: UTC date of submission of the pipeline in `YYYYMMDDTHHMMSSZ` format. +* `$stage`: Stage of the job. +* `$job`: Name of the job (corresponds to the name of the section in the configuration file). ## Credits diff --git a/executable-example.sh b/executable-example.sh deleted file mode 100755 index 87ad774..0000000 --- a/executable-example.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -date -echo $@ -sleep 10 -touch output-$@ -echo done diff --git a/material/executable.sh b/material/executable.sh new file mode 100755 index 0000000..c732a04 --- /dev/null +++ b/material/executable.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +date +echo "(re)starting" + +if ! test -f "output-$1-t1"; +then + echo "generating first output file..." + sleep 40 + date > output-$1-t1 + echo "done" +fi + +if ! test -f "output-$1-t2"; +then + echo "generating second output file..." + sleep 40 + date > output-$1-t2 + echo "done" +fi diff --git a/pipeline-example.conf b/pipeline-example.conf deleted file mode 100644 index 6821f10..0000000 --- a/pipeline-example.conf +++ /dev/null @@ -1,38 +0,0 @@ -[DEFAULT] -partition = all.q -nodes = 1 -mail-user = your@email.com -mail-type = FAIL -time = 24:00:00 -workdir = scratch/$date-$pipeline -jobfile = $pipeline-$stage-$job.job -logfile = $pipeline-$stage-$job.log - -[data-copy] -stage = 0 -nodes = 1 -command = cp -r ../../executable-example.sh . - -[input-generation] -stage = 1 -command = ./executable-example.sh creating-data - -[output-generation-a] -stage = 2 -nodes = 2 -command = ./executable-example.sh processing-data-a - -[output-generation-b] -stage = 2 -command = ./executable-example.sh processing-data-b - -[post-treatment] -stage = 3 -nodes = 2 -command = ./executable-example.sh post-processing-data - -[archive-data] -mail-type = END,FAIL -stage = 4 -nodes = 1 -command = tar -cvf $date-$pipeline.tar output-* diff --git a/pipelines/customized.conf b/pipelines/customized.conf new file mode 100644 index 0000000..9eb3ffd --- /dev/null +++ b/pipelines/customized.conf @@ -0,0 +1,32 @@ +[DEFAULT] +partition = all.q +workdir = scratch/$date-$pipeline +jobfile = $pipeline-$stage-$job.job +logfile = $pipeline-$stage-$job.log +mail-user = your@email.com +mail-type = FAIL +time = 00:01:00 + +[copy] +stage = 0 +command = cp ../../material/executable.sh . --verbose + +[step-1] +stage = 1 +modules = gcc/9.3.0 mpi/openmpi-4.1.1-gcc9-3 +command = ./executable.sh step-1 + +[step-2] +stage = 2 +array = a b +command = ./executable.sh step-2-${var} + +[step-3] +stage = 3 +nodes = 2 +command = ./executable.sh step-3 + +[archive] +mail-type = END,FAIL +stage = 4 +command = tar -cvf $date-$pipeline.tar output-* diff --git a/pipelines/vanilla.conf b/pipelines/vanilla.conf new file mode 100644 index 0000000..54f7d17 --- /dev/null +++ b/pipelines/vanilla.conf @@ -0,0 +1,7 @@ +[print-hello] +stage = 0 +command = echo Hello + +[print-world] +stage = 1 +command = echo world! diff --git a/run.py b/run.py index 9bf9918..dbc149f 100755 --- a/run.py +++ b/run.py @@ -31,6 +31,8 @@ class Pipeline: Attributes ---------- + defaults : dict of str + Default value for job configuration. name : str Pipeline name. date : str @@ -44,6 +46,12 @@ class Pipeline: """ + defaults = { + 'workdir': 'scratch/$date-$pipeline', + 'jobfile': '$pipeline-$stage-$job.job', + 'logfile': '$pipeline-$stage-$job.log', + } + def __init__(self, config_path): """Instantiate a pipeline representation. @@ -58,7 +66,7 @@ class Pipeline: self.date = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") logger.debug("loading pipeline configuration file %s", config_path) - self.config = ConfigParser() + self.config = ConfigParser(defaults=self.defaults) with open(config_path, 'r', encoding='utf-8') as file: self.config.read_file(file) @@ -120,7 +128,7 @@ class SlurmPipeline(Pipeline): """ def script(self, job): - """Return the Slurm job script of the job. + """Return the Slurm job script content. Parameters ---------- @@ -135,53 +143,75 @@ class SlurmPipeline(Pipeline): """ logger.debug("retrieving %s job settings", job) section = self.config[job] - logfile = self.substitute(section['logfile'], job) + logfile = self.substitute(section.get('logfile', ''), job) variables = self.variables(job) - - logger.debug("generating %s job script", job) - lines = [] - lines.append("#!/bin/bash") - lines.append(f"#SBATCH --nodes {section.get('nodes')}") - if 'mail-user' in section: - lines.append(f"#SBATCH --mail-user {section.get('mail-user')}") - if 'mail-type' in section: - lines.append(f"#SBATCH --mail-type {section.get('mail-type')}") - lines.append(f"#SBATCH --job-name {job}") - lines.append(f"#SBATCH --time {section.get('time')}") - lines.append(f"#SBATCH --partition {section.get('partition')}") - lines.append("#SBATCH --exclusive") - lines.append("#SBATCH --mem 0") - lines.append(f"#SBATCH --error {logfile}") - lines.append(f"#SBATCH --output {logfile}") - lines.append("#SBATCH --open-mode append") - lines.append("") - if len(variables) > 0: - for name, value in variables.items(): - lines.append(f"export {name}={value}") - lines.append("") - if 'modules' in section: - for module in section.get('modules', '').split(): - lines.append(f"module load {module}") - lines.append("") - if 'venv' in section: + content = "\n" + options = {} + + logger.debug("mapping options") + options['job-name'] = job + options['nodes'] = section.get('nodes', '1') + options['mail-user'] = section.get('mail-user', '') + options['mail-type'] = section.get('mail-type', '') + options['time'] = section.get('time', '24:00:00') + options['partition'] = section.get('parition', '') + options['mem'] = section.get('mem', '0') + options['error'] = logfile + options['output'] = logfile + options['open-mode'] = section.get('open-mode', 'append') + options['exclusive'] = section.getboolean('exclusive', True) + + logger.debug("export environment variables") + for name, val in variables.items(): + content += f"export {name}={val}\n" + + logger.debug("define array variables") + if section.get('array', ''): + options['error'] += '-%a' + options['output'] += '-%a' + values = section['array'].split() + options['array'] = f"0-{len(values)-1}" + if section.get('array-max', '') != '': + options['array'] += f"%{int(section['array-max'])}" + content += f"\narray=({' '.join(values)})\n" + content += "export var=${array[$SLURM_ARRAY_TASK_ID]}\n" + + logger.debug("load modules") + if section.get('modules', ''): + content += "\nmodule purge\n" + for module in section.get('modules').split(): + content += f"module load {module}\n" + + logger.debug("source virtual environment") + if section.get('venv', ''): source = path.join(section.get('venv'), 'bin', 'activate') - lines.append(f"source {source}") - lines.append("") - lines.append("terminate () {") - lines.append(" echo caught SIGTERM") - lines.append(' echo terminating "$pid"') - lines.append(' kill -TERM "$pid"') - lines.append(' wait "$pid"') - lines.append(" echo requeuing $SLURM_JOB_ID") - lines.append(" scontrol requeue $SLURM_JOB_ID") - lines.append("}") - lines.append("") - lines.append("trap terminate SIGTERM") - lines.append("") - lines.append(f"srun {section.get('command')} &") - lines.append("pid=$!") - lines.append('wait "$pid"') - return "\n".join(lines) + content += f"\nsource {source}\n" + + logger.debug("implement requeuing mechanism") + content += ( + "\nterminate () {\n" + " echo caught SIGTERM\n" + ' echo terminating "$pid"\n' + ' kill -TERM "$pid"\n' + ' wait "$pid"\n' + " echo requeuing $SLURM_JOB_ID\n" + " scontrol requeue $SLURM_JOB_ID\n" + "}\n" + "\ntrap terminate SIGTERM\n" + f"\nsrun {section['command']} &\n" + "pid=$!\n" + 'wait "$pid"\n' + ) + + logger.debug("pass options") + header = "#!/bin/bash\n" + for name, value in options.items(): + if isinstance(value, bool) and value: + header += f"#SBATCH --{name}\n" + elif isinstance(value, str) and value: + header += f"#SBATCH --{name} {value}\n" + + return header + content def submit(self, job, dependencies): """Submit a job to Slurm and return its job id. @@ -248,7 +278,7 @@ class LocalPipeline(Pipeline): """ def script(self, job): - """Return the job script of the job. + """Return the job shell script. Parameters ---------- @@ -264,21 +294,26 @@ class LocalPipeline(Pipeline): logger.debug("retrieving %s job settings", job) section = self.config[job] variables = self.variables(job) + content = "#!/bin/bash\n\n" logger.debug("generating %s job script", job) - lines = [] - lines.append("#!/bin/bash") - lines.append("") - if len(variables) > 0: - for name, value in variables.items(): - lines.append(f"export {name}={value}") - lines.append("") - if 'venv' in section: + + for name, value in variables.items(): + content += f"export {name}={value}\n" + if section.get('venv', ''): source = path.join(section.get('venv'), 'bin', 'activate') - lines.append(f"source {source}") - lines.append("") - lines.append(section.get('command')) - return "\n".join(lines) + content += f"\nsource {source}\n" + if section.get('array', ''): + values = section['array'].split() + content += ( + f"\nfor var in {' '.join(values)};\n" + f"do\n" + f" {section.get('command')}\n" + f"done" + ) + else: + content += "\n" + section.get('command') + return content def submit(self, job): """Execute a job locally . -- GitLab From 08ba571cdf0e29ef6379124d70cd8b8b1a91893b Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Tue, 25 Apr 2023 20:33:15 -0400 Subject: [PATCH 07/11] fix requeuing for array job tasks --- material/executable.sh | 4 ++-- pipelines/customized.conf | 2 +- run.py | 10 +++++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/material/executable.sh b/material/executable.sh index c732a04..d23f7f4 100755 --- a/material/executable.sh +++ b/material/executable.sh @@ -6,7 +6,7 @@ echo "(re)starting" if ! test -f "output-$1-t1"; then echo "generating first output file..." - sleep 40 + sleep 90 date > output-$1-t1 echo "done" fi @@ -14,7 +14,7 @@ fi if ! test -f "output-$1-t2"; then echo "generating second output file..." - sleep 40 + sleep 90 date > output-$1-t2 echo "done" fi diff --git a/pipelines/customized.conf b/pipelines/customized.conf index 9eb3ffd..85b9189 100644 --- a/pipelines/customized.conf +++ b/pipelines/customized.conf @@ -5,7 +5,7 @@ jobfile = $pipeline-$stage-$job.job logfile = $pipeline-$stage-$job.log mail-user = your@email.com mail-type = FAIL -time = 00:01:00 +time = 00:02:00 [copy] stage = 0 diff --git a/run.py b/run.py index dbc149f..835febb 100755 --- a/run.py +++ b/run.py @@ -188,14 +188,17 @@ class SlurmPipeline(Pipeline): content += f"\nsource {source}\n" logger.debug("implement requeuing mechanism") + requeue_id = "$SLURM_JOB_ID" + if section.get('array', ''): + requeue_id = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" content += ( "\nterminate () {\n" " echo caught SIGTERM\n" ' echo terminating "$pid"\n' ' kill -TERM "$pid"\n' ' wait "$pid"\n' - " echo requeuing $SLURM_JOB_ID\n" - " scontrol requeue $SLURM_JOB_ID\n" + f" echo requeuing {requeue_id}\n" + f" scontrol requeue {requeue_id}\n" "}\n" "\ntrap terminate SIGTERM\n" f"\nsrun {section['command']} &\n" @@ -243,7 +246,8 @@ class SlurmPipeline(Pipeline): command = ['cd', workdir, ';', 'sbatch'] if len(dependencies) != 0: command.append('--dependency') - dependencies = [f'afterok:{jobid}' for jobid in dependencies] + dependency = section.get('dependency', 'afterok') + dependencies = [f'{dependency}:{jobid}' for jobid in dependencies] command.append(','.join(dependencies)) command.append('--parsable') command.append(jobfile) -- GitLab From c4676cf3817d4b52e804c3d81f9faf640f8dbed8 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Wed, 26 Apr 2023 10:08:13 -0400 Subject: [PATCH 08/11] add conda option --- run.py | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/run.py b/run.py index 835febb..48e76e2 100755 --- a/run.py +++ b/run.py @@ -146,20 +146,22 @@ class SlurmPipeline(Pipeline): logfile = self.substitute(section.get('logfile', ''), job) variables = self.variables(job) content = "\n" - options = {} logger.debug("mapping options") - options['job-name'] = job - options['nodes'] = section.get('nodes', '1') - options['mail-user'] = section.get('mail-user', '') - options['mail-type'] = section.get('mail-type', '') - options['time'] = section.get('time', '24:00:00') - options['partition'] = section.get('parition', '') - options['mem'] = section.get('mem', '0') - options['error'] = logfile - options['output'] = logfile - options['open-mode'] = section.get('open-mode', 'append') - options['exclusive'] = section.getboolean('exclusive', True) + options = { + 'job-name': job, + 'nodes': section.get('nodes', '1'), + 'mail-user': section.get('mail-user', ''), + 'mail-type': section.get('mail-type', ''), + 'time': section.get('time', '24:00:00'), + 'partition': section.get('parition', ''), + 'mem': section.get('mem', '0'), + 'error': logfile, + 'output': logfile, + 'open-mode': section.get('open-mode', 'append'), + 'exclusive': section.getboolean('exclusive', True), + } + logger.debug("export environment variables") for name, val in variables.items(): @@ -182,10 +184,12 @@ class SlurmPipeline(Pipeline): for module in section.get('modules').split(): content += f"module load {module}\n" - logger.debug("source virtual environment") + logger.debug("activate virtual environment") if section.get('venv', ''): source = path.join(section.get('venv'), 'bin', 'activate') content += f"\nsource {source}\n" + elif section.get('conda', ''): + content += f"\nconda activate {section.get('conda')}\n" logger.debug("implement requeuing mechanism") requeue_id = "$SLURM_JOB_ID" @@ -300,13 +304,18 @@ class LocalPipeline(Pipeline): variables = self.variables(job) content = "#!/bin/bash\n\n" - logger.debug("generating %s job script", job) - + logger.debug("export environment variables") for name, value in variables.items(): content += f"export {name}={value}\n" + + logger.debug("activate virtual environment") if section.get('venv', ''): source = path.join(section.get('venv'), 'bin', 'activate') content += f"\nsource {source}\n" + elif section.get('conda', ''): + content += f"\nconda activate {section.get('conda')}\n" + + logger.debug("execute command") if section.get('array', ''): values = section['array'].split() content += ( @@ -317,6 +326,7 @@ class LocalPipeline(Pipeline): ) else: content += "\n" + section.get('command') + return content def submit(self, job): -- GitLab From e1c61eb11e2608f7f0573d85ce6e477ed0ff95ff Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Wed, 26 Apr 2023 10:51:44 -0400 Subject: [PATCH 09/11] add pipeline dependency option --- run.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/run.py b/run.py index 48e76e2..3d71e74 100755 --- a/run.py +++ b/run.py @@ -267,11 +267,15 @@ class SlurmPipeline(Pipeline): except CalledProcessError as exc: raise RuntimeError(exc.returncode, exc.stderr) from exc - def run(self): + def run(self, *dependencies): """Submit the pipeline to Slurm. + Parameters + ---------- + *dependencies : list of str + Id of the jobs to be completed before. + """ - dependencies = [] for rank in self.order: ids = [] for job in self.stages[rank]: @@ -330,7 +334,7 @@ class LocalPipeline(Pipeline): return content def submit(self, job): - """Execute a job locally . + """Execute a job locally. Parameters ---------- @@ -357,10 +361,11 @@ class LocalPipeline(Pipeline): print(f"exited with code {result.returncode}, see {logfile}") sys_exit(result.returncode) - def run(self): + def run(self, *args): """Run the pipeline. """ + _ = args for rank in self.order: for job in self.stages[rank]: self.submit(job) @@ -382,6 +387,13 @@ if __name__ == '__main__': help="path to the pipeline configuration file", metavar='path', ) + parser.add_argument( + '-d', '--dependency', + help="id of the jobs to be completed before", + nargs='+', + metavar='id', + default=[], + ) group = parser.add_mutually_exclusive_group() for key, lancher in launchers.items(): group.add_argument( @@ -391,12 +403,12 @@ if __name__ == '__main__': dest='launcher', help=f"launch with {lancher.__name__}", ) - args = parser.parse_args() + arguments = parser.parse_args() try: - launcher = launchers.get(args.launcher, SlurmPipeline) - pipeline = launcher(args.pipeline) - pipeline.run() + launcher = launchers.get(arguments.launcher, SlurmPipeline) + pipeline = launcher(arguments.pipeline) + pipeline.run(*arguments.dependency) except KeyboardInterrupt: print("program interrupted by Control-C") sys_exit(130) -- GitLab From ef9325af98f32ab81eea57eda336a92eb7546846 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Wed, 26 Apr 2023 16:18:50 -0400 Subject: [PATCH 10/11] enable multi-variable arrays --- run.py | 71 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/run.py b/run.py index 3d71e74..464d38e 100755 --- a/run.py +++ b/run.py @@ -146,6 +146,7 @@ class SlurmPipeline(Pipeline): logfile = self.substitute(section.get('logfile', ''), job) variables = self.variables(job) content = "\n" + requeue = "$SLURM_JOB_ID" logger.debug("mapping options") options = { @@ -162,21 +163,26 @@ class SlurmPipeline(Pipeline): 'exclusive': section.getboolean('exclusive', True), } - logger.debug("export environment variables") - for name, val in variables.items(): - content += f"export {name}={val}\n" - - logger.debug("define array variables") - if section.get('array', ''): + content += "".join(f"export {k}={v}\n" for k, v in variables.items()) + + logger.debug("define array") + array = {k[5:]: v for k, v in section.items() if k.startswith('array')} + lengths = [len(v.split()) for v in array.values()] + if not all(length == lengths[0] for length in lengths[1:]): + raise ValueError("arrays should be the same length") + if array: options['error'] += '-%a' options['output'] += '-%a' - values = section['array'].split() - options['array'] = f"0-{len(values)-1}" - if section.get('array-max', '') != '': - options['array'] += f"%{int(section['array-max'])}" - content += f"\narray=({' '.join(values)})\n" - content += "export var=${array[$SLURM_ARRAY_TASK_ID]}\n" + options['array'] = f"0-{lengths[0]-1}" + if section.get('tasks', ''): + options['array'] += f"%{int(section['tasks'])}" + requeue = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" + content += "\n" + for key, value in array.items(): + content += f"arr{key}=({value})\n" + for key in array: + content += f"var{key}=${{arr{key}[$SLURM_ARRAY_TASK_ID]}}\n" logger.debug("load modules") if section.get('modules', ''): @@ -192,17 +198,14 @@ class SlurmPipeline(Pipeline): content += f"\nconda activate {section.get('conda')}\n" logger.debug("implement requeuing mechanism") - requeue_id = "$SLURM_JOB_ID" - if section.get('array', ''): - requeue_id = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}" content += ( "\nterminate () {\n" " echo caught SIGTERM\n" ' echo terminating "$pid"\n' ' kill -TERM "$pid"\n' ' wait "$pid"\n' - f" echo requeuing {requeue_id}\n" - f" scontrol requeue {requeue_id}\n" + f" echo requeuing {requeue}\n" + f" scontrol requeue {requeue}\n" "}\n" "\ntrap terminate SIGTERM\n" f"\nsrun {section['command']} &\n" @@ -212,11 +215,11 @@ class SlurmPipeline(Pipeline): logger.debug("pass options") header = "#!/bin/bash\n" - for name, value in options.items(): + for key, value in options.items(): if isinstance(value, bool) and value: - header += f"#SBATCH --{name}\n" + header += f"#SBATCH --{key}\n" elif isinstance(value, str) and value: - header += f"#SBATCH --{name} {value}\n" + header += f"#SBATCH --{key} {value}\n" return header + content @@ -309,8 +312,17 @@ class LocalPipeline(Pipeline): content = "#!/bin/bash\n\n" logger.debug("export environment variables") - for name, value in variables.items(): - content += f"export {name}={value}\n" + content += "".join(f"export {k}={v}\n" for k, v in variables.items()) + + logger.debug("define array") + array = {k[5:]: v for k, v in section.items() if k.startswith('array')} + lengths = [len(v.split()) for v in array.values()] + if not all(length == lengths[0] for length in lengths[1:]): + raise ValueError("arrays should be the same length") + if array: + content += "\n" + for key, value in array.items(): + content += f"arr{key}=({value})\n" logger.debug("activate virtual environment") if section.get('venv', ''): @@ -320,11 +332,14 @@ class LocalPipeline(Pipeline): content += f"\nconda activate {section.get('conda')}\n" logger.debug("execute command") - if section.get('array', ''): - values = section['array'].split() + if array: content += ( - f"\nfor var in {' '.join(values)};\n" + f"\nfor i in {{0..{lengths[0]-1}}}\n" f"do\n" + ) + for key in array: + content += f" var{key}=${{arr{key}[$i]}}\n" + content += ( f" {section.get('command')}\n" f"done" ) @@ -395,11 +410,11 @@ if __name__ == '__main__': default=[], ) group = parser.add_mutually_exclusive_group() - for key, lancher in launchers.items(): + for name, lancher in launchers.items(): group.add_argument( - f'--{key}', + f'--{name}', action='store_const', - const=key, + const=name, dest='launcher', help=f"launch with {lancher.__name__}", ) -- GitLab From d5ab8b11fb8c157467d54df5a009aa5a086ce914 Mon Sep 17 00:00:00 2001 From: Dunstan Becht Date: Thu, 27 Apr 2023 11:15:41 -0400 Subject: [PATCH 11/11] update variable names --- README.md | 56 +++++++++++++++++++------------------ pipelines/customized.conf | 58 +++++++++++++++++++++++++++++---------- pipelines/vanilla.conf | 7 +++++ run.py | 32 ++++++++++----------- 4 files changed, 96 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 62aebd4..c6a396c 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,11 @@ Just edit a configuration file and run the program. ## Background When working with a workload manager like Slurm, one can spend a lot of time writing and submitting job scripts. -Suppose you want to submit a series of dependent jobs that require different numbers of nodes, or on different partitions. -Creating a job script then retrieving and providing the dependency job ids for each submission can quickly become tedious. -A certain degree of automation would then be appreciated. +Suppose you want to submit a series of jobs with dependencies and various specifications. +Creating all the job scripts then retrieving and providing the dependency job ids for each submission can quickly become tedious. +In this case, a certain degree of automation would then be appreciated. -Moreover, you may want to alternate the execution of your pipelines on a cluster and on your computer. This is sometimes the case during debugging phases. In this case, you would probably have to rewrite a script adapted to your computer. But it could be more advantageous to have a single file containing the essence of the jobs that could be converted into scripts for Slurm or for your computer depending on the need. +When preparing or modifying a pipeline, it can also be very useful to test the execution on your computer. This would allow you to check the progress of the jobs and quickly identify bugs without having to connect to the cluster or wait for resources. Therefore, it could be very advantageous to have a single file containing the essence of the pipeline and its jobs and that could be converted into scripts for Slurm or for your computer depending on the need. Finally, it is sometimes quite difficult to estimate the duration of a job. Besides, some clusters limit the maximum duration to a few hours or days. @@ -41,8 +41,8 @@ But since the essence of the project is contained in the [run.py](/run.py) scrip ## Usage -Check that the [`run.py`](/run.py) script is executable so that you can run it without typing "python3" in the command line. -Then, you can run the vanilla example pipeline configured in the [`pipelines/vanilla.conf`](/pipelines/vanilla.conf) file with the command: +Check that the [run.py](/run.py) script is executable so that you can run it without typing "python3" in the command line. +Then, you can run the vanilla example pipeline configured in the [pipelines/vanilla.conf](/pipelines/vanilla.conf) file with the command: ```bash ./run.py pipelines/vanilla.conf --local @@ -53,30 +53,32 @@ To submit to Slurm, remove this option or use `--slurm` instead. You can store your pipeline configuration files anywhere as long as you pass the correct path as an argument when calling the script. All that remains is to configure your own pipeline. -To do this, create a file in the same format as [`pipelines/customized.conf`](/pipelines/customized.conf). +To do this, create a file in the same format as [pipelines/customized.conf](/pipelines/customized.conf). Except for the `DEFAULT` section (which defines the default values for all jobs), each section of the file corresponds to the configuration of a job of the pipeline. For each job you can configure the following fields: -* `stage`: String defining at which stage the job must be executed. The execution follows the lexicographic order and jobs that have the same stage value can be executed in parallel. Stages are launched in lexicographical order, and all jobs in a stage must be completed before moving to the next stage. -* `command`: Main shell command of the job. This entry contains the essence of what the job does. As the SIGTERM signal is sent only once, it is strongly recommended to write only one command per job (i.e. not to put several commands separated by semicolons in this field). -* `workdir` (optional): The job script and the log file will be placed in this folder. The job will be executed from this folder. The relative paths will be resolved from this folder. By default the folder name contains the creation date of the pipeline so that each launch of a pipeline corresponds to a different folder. -* `modules` (optional): List of modules to load with the `module load ...` command. Using this field assumes that the [modules package](https://github.com/cea-hpc/modules) is installed, but this is usually the case on most clusters. -* `venv` (optional): Path to the Python virtual environment to be used. Using this field will place the `source /bin/activate` command in the job initialization. -* `array` (optional): Allows to execute the job for different values of a variable named `$var`. Specify the values taken by `$var` separated by a space and use `$var` in the command line of the job. -* `jobfile` (optional): Path to the job log file. -* `logfile` (optional): Path to the job script file. -* `partition` (optional): Value passed to Slurm option [--patition](https://slurm.schedmd.com/sbatch.html#OPT_partition). -* `nodes` (optional): Value passed to Slurm option [--nodes](https://slurm.schedmd.com/sbatch.html#OPT_nodes). -* `mail-user` (optional): Value passed to Slurm option [--mail-user](https://slurm.schedmd.com/sbatch.html#OPT_mail-user). -* `mail-type` (optional): Value passed to Slurm option [--mail-type](https://slurm.schedmd.com/sbatch.html#OPT_mail-type). -* `time` (optional): Value passed to Slurm option [--time](https://slurm.schedmd.com/sbatch.html#OPT_time). - -To define the fields (such as `workdir` or `jobfile`) you can use the following environment variables: - -* `$pipeline`: Name of the pipeline (corresponds to the stem of the pipeline configuration file name). -* `$date`: UTC date of submission of the pipeline in `YYYYMMDDTHHMMSSZ` format. -* `$stage`: Stage of the job. -* `$job`: Name of the job (corresponds to the name of the section in the configuration file). +* `stage` **(required)**: String defining at which stage the job must be executed. The execution follows the lexicographic order and jobs that have the same stage value can be executed in parallel. By default, all jobs in a stage must be completed without error before moving to the next stage. +* `command` **(required)**: Main shell command of the job. This entry contains the essence of what the job does. As the SIGTERM signal is sent only once, it is strongly recommended to write only one command per job (i.e. not to put several commands separated by semicolons in this field). +* `directory` **(optional)**: Path to the directory in which the job is executed. The directory is created if it does not exist when the pipeline is launched. Other paths specified in the configuration file will be resolved from this directory if they are relative. By default, the name of the directory contains the creation date of the pipeline and the name of the pipeline so that each launch of a pipeline corresponds to a different directory. +* `modules` **(optional)**: The list of modules to load with the `module load ` command. Using this field assumes that the [modules package](https://github.com/cea-hpc/modules) is installed, but this is usually the case on most clusters. To indicate multiple modules, separate module names with spaces. +* `venv` **(optional)**: The path to the Python virtual environment to activate. Using this field will place the `source /bin/activate` command in the first lines of the job script. This command must be used for virtual environments created with the [venv](https://docs.python.org/3/library/venv.html) package. +* `conda` **(optional)**: Name to the Python virtual environment to activate. Using this field will place the `conda activate ` command in the first lines of the job script. This command must be used for virtual environments created with Conda. +* `array` **(optional)**: Allows to execute the job for different values of a variable named `$var`. Specify the values taken by `$var` separated by spaces and use `$var` or `${var}` in the command line of the job. The suffix can be replaced by a string of alphanumeric characters and underscore. This allows to define several arrays with their associated variables. To limit the number of simultaneous tasks, add the field `tasks` followed by an integer. +* `dependency` **(optional)**: Choose among `after`, `afterany`, `afterburstbuffer`, `aftercorr`, `afternotok`, `afterok` to select the condition to be respected by dependencies to start the job. This corresponds to what is passed to Slurm option [--dependency](https://slurm.schedmd.com/sbatch.html#OPT_dependency) before the list of job ids. +* `script` **(optional)**: Path to the job script file. If the path is relative, it will be evaluated from the path specified in the `directory` field. +* `logging` **(optional)**: Path to the job log file. If the path is relative, it will be evaluated from the path specified in the `directory` field. +* `partition` **(optional)**: Value passed to Slurm option [--partition](https://slurm.schedmd.com/sbatch.html#OPT_partition). +* `nodes` **(optional)**: Value passed to Slurm option [--nodes](https://slurm.schedmd.com/sbatch.html#OPT_nodes). +* `email` **(optional)**: Value passed to Slurm option [--mail-user](https://slurm.schedmd.com/sbatch.html#OPT_mail-user). +* `notification` **(optional)**: Value passed to Slurm option [--mail-type](https://slurm.schedmd.com/sbatch.html#OPT_mail-type). +* `time` **(optional)**: Value passed to Slurm option [--time](https://slurm.schedmd.com/sbatch.html#OPT_time). + +To define the fields (such as `directory`, `script` or `logging`) you can use the following environment variables: + +* `$PIPING_PIPELINE`: Name of the pipeline (corresponds to the stem of the pipeline configuration file name). +* `$PIPING_DATE`: UTC date of submission of the pipeline in ISO `YYYYMMDDTHHMMSSZ` format. +* `$PIPING_STAGE`: Stage of the job. +* `$PIPING_JOB`: Name of the job (corresponds to the name of the section in the configuration file). ## Credits diff --git a/pipelines/customized.conf b/pipelines/customized.conf index 85b9189..00e57c5 100644 --- a/pipelines/customized.conf +++ b/pipelines/customized.conf @@ -1,32 +1,62 @@ +# The lines below do not correspond to a job but configure the default +# settings. The name "DEFAULT" is reserved for this use. [DEFAULT] partition = all.q -workdir = scratch/$date-$pipeline -jobfile = $pipeline-$stage-$job.job -logfile = $pipeline-$stage-$job.log -mail-user = your@email.com -mail-type = FAIL +directory = scratch/$PIPING_DATE-$PIPING_PIPELINE-test +script = $PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.sh +logging = $PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.txt +email = your@email.com +notification = FAIL time = 00:02:00 +# The "copy" job defined below is an example of how to copy data to the +# working directory created when the pipeline was launched. [copy] stage = 0 command = cp ../../material/executable.sh . --verbose -[step-1] +# The "build" job defined below will use the modules package to load +# two modules. After expansion of the variables, the command executed +# will be: "./executable.sh build" +[build] stage = 1 modules = gcc/9.3.0 mpi/openmpi-4.1.1-gcc9-3 -command = ./executable.sh step-1 +command = ./executable.sh $PIPING_JOB -[step-2] +# The "array-1" job defined below is an example of how to run a command +# for different values of a variable using Slurm arrays. During its +# execution, this job will generate two tasks with the commands: +# "./executable.sh array-1-a" and "./executable.sh array-1-b". +[array-1] stage = 2 array = a b -command = ./executable.sh step-2-${var} +command = ./executable.sh $PIPING_JOB-$var -[step-3] +# The "array-2" job defined below follows the example of the previous +# job but adding a variable. During its execution, this job will +# generate two tasks with the commands: +# "./executable.sh array-2-a-c" and "./executable.sh array-2-b-d". +[array-2] +stage = 2 +array_0 = a b +array_1 = c d +command = ./executable.sh $PIPING_JOB-${var_0}-${var_1} + +# The "post-process" job defined bellow shows how to run a job on +# multiple nodes. +[post-process] stage = 3 nodes = 2 -command = ./executable.sh step-3 +command = ./executable.sh $PIPING_JOB +# In Slurm versions prior to 19.05.3, arrays with requeued tasks always +# return an exit code different from 0. The following line allows to +# bypass this bug. Remove it if your version of Slurm is >= 19.05.3. +dependency = afterany +# This last job allows to store the produced data in an archive. Here, +# the type of event notified by email is modified in order to inform +# the user when the pipeline is completed. [archive] -mail-type = END,FAIL -stage = 4 -command = tar -cvf $date-$pipeline.tar output-* +notification = END,FAIL +stage = 5 +command = tar -cvf $PIPING_DATE-$PIPING_PIPELINE.tar output-* diff --git a/pipelines/vanilla.conf b/pipelines/vanilla.conf index 54f7d17..1325c32 100644 --- a/pipelines/vanilla.conf +++ b/pipelines/vanilla.conf @@ -1,7 +1,14 @@ +# The three lines below define a job called "print-hello" that just +# displays "Hello". [print-hello] stage = 0 command = echo Hello +# The three lines below define an other job called "print-world" that +# displays "world!". [print-world] stage = 1 command = echo world! + +# Given the values chosen for field "stage", the "print-hello" job will +# always be executed before "print-world" job. diff --git a/run.py b/run.py index 464d38e..d76b328 100755 --- a/run.py +++ b/run.py @@ -47,9 +47,9 @@ class Pipeline: """ defaults = { - 'workdir': 'scratch/$date-$pipeline', - 'jobfile': '$pipeline-$stage-$job.job', - 'logfile': '$pipeline-$stage-$job.log', + 'directory': 'scratch/$PIPING_DATE-$PIPING_PIPELINE', + 'script': '$PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.job', + 'logging': '$PIPING_PIPELINE-$PIPING_STAGE-$PIPING_JOB.log', } def __init__(self, config_path): @@ -92,10 +92,10 @@ class Pipeline: """ variables = { - 'pipeline': self.name, - 'date': self.date, - 'stage': self.config.get(job, 'stage'), - 'job': job, + 'PIPING_PIPELINE': self.name, + 'PIPING_DATE': self.date, + 'PIPING_STAGE': self.config.get(job, 'stage'), + 'PIPING_JOB': job, } return variables @@ -143,7 +143,7 @@ class SlurmPipeline(Pipeline): """ logger.debug("retrieving %s job settings", job) section = self.config[job] - logfile = self.substitute(section.get('logfile', ''), job) + logfile = self.substitute(section.get('logging', ''), job) variables = self.variables(job) content = "\n" requeue = "$SLURM_JOB_ID" @@ -152,10 +152,10 @@ class SlurmPipeline(Pipeline): options = { 'job-name': job, 'nodes': section.get('nodes', '1'), - 'mail-user': section.get('mail-user', ''), - 'mail-type': section.get('mail-type', ''), + 'mail-user': section.get('email', ''), + 'mail-type': section.get('notification', ''), 'time': section.get('time', '24:00:00'), - 'partition': section.get('parition', ''), + 'partition': section.get('partition', ''), 'mem': section.get('mem', '0'), 'error': logfile, 'output': logfile, @@ -241,8 +241,8 @@ class SlurmPipeline(Pipeline): """ logger.debug("retrieving %s job settings", job) section = self.config[job] - workdir = self.substitute(section['workdir'], job) - jobfile = self.substitute(section['jobfile'], job) + workdir = self.substitute(section['directory'], job) + jobfile = self.substitute(section['script'], job) logger.debug("initializing work directory") makedirs(workdir, exist_ok=True) @@ -359,9 +359,9 @@ class LocalPipeline(Pipeline): """ logger.debug("retrieving %s job settings", job) section = self.config[job] - workdir = self.substitute(section['workdir'], job) - jobfile = self.substitute(section['jobfile'], job) - logfile = self.substitute(section['logfile'], job) + workdir = self.substitute(section['directory'], job) + jobfile = self.substitute(section['script'], job) + logfile = self.substitute(section['logging'], job) logger.debug("initializing work directory") makedirs(workdir, exist_ok=True) -- GitLab