node_local_scheduling

This is an old revision of the document!

# Node-local scheduling

There are some use cases, where you would want to simply request a full cluster node from the LSF batch system and then run many (e.g. much more than 64) smaller (e.g. only a fragment of the total job runtime) tasks on this full node. Then of course you will need some local scheduling on this node to ensure proper utilization of all cores.

To accomplish this, we suggest you use the GNU Parallel program. The program is installed to /cluster/bin, but you can also simply load the modulefile software/gnu_parallel so that you can also access its man page.

For more documentation on how to use GNU Parallel, please read man parallel and man parallel_tutorial, where you'll find a great number of examples and explanations.

Let's say we have a number of input data files that contain differing parameters that are going to be processed independently by our program:

$ls data_*.in data_001.in data_002.in [...] data_149.in data_150.in$ cat data_001.in
1 2 3 4 5
6 7 8 9 0

Now of course we could submit 150 jobs using LSF or we could use one job which processes the files one after another, but the most elegant way would be to submit one job for 64 cores (e.g. a whole node) and process the files in parallel. This is especially convenient, since we can then use the nodelong queue which has better scheduling characteristics than long.

Let's further assume that our program is able to work in parallel itself using OpenMP. We determined that OMP_NUM_THREADS=4 is the best amount of parallel work for one set of input data. This means we can launch 64/4=16 processes using GNU Parallel on the one node we have.

parallel_job
#!/bin/bash
# LSF Job parameters (could also be given on the bsub command line)
# Job name
#BSUB -J parallel_job
# Queue
#BSUB -q nodelong
# Number of cores
#BSUB -n 64
# Memory reservation
#BSUB -app Reserve1800M
# Allowed job runtime (maximum)
#BSUB -W 7200

# Store working directory to be safe
SAVEDPWD=pwd

# First, we copy the input data files and the program to the local filesystem of our node
cp "${SAVEDPWD}"/data_*.in "/jobdir/${LSB_JOBID}/"
cp "${SAVEDPWD}"/program "/jobdir/${LSB_JOBID}/"

# Change directory to jobdir
cd "/jobdir/${LSB_JOBID}/" export OMP_NUM_THREADS=4 # -t enables verbose output to stderr # We could also set -j$((LSB_DJOB_NUMPROC/OMP_NUM_THREADS)) to be more dynamic
# The --delay parameter is used to distribute I/O load at the beginning of program execution by
#   introducing a delay of 1 second before starting the next task
# --progress will output the current progress of the parallel task execution
# {} will be replaced by each filename
# {#} will be replaced by the consecutive job number
# Both variants will have equal results:
#parallel -t -j 16 --delay 1 --progress "./program {/} > {/.}.out" ::: data_*.in
find . -name 'data_*.in' | parallel -t -j 16 --delay 1 --progress "./program {/} > {/.}.out"
# See the GNU Parallel documentation for more examples and explanation

# Now capture exit status code, parallel will have set it to the number of failed tasks
STATUS=$? # Copy output data back to the previous working directory cp "/jobdir/${LSB_JOBID}"/data_*.out "${SAVEDPWD}/" exit$STATUS

For this example. the program only sleeps for some seconds and then counts the words in the input data file using wc.

$bsub < parallel_job After this job has run, we should have the results/output data (in this case, it's just the output of wc, for demonstration): $ ls data_*.out
data_001.out
data_002.out
[...]
data_149.out
data_150.out
$cat data_001.out 2 10 20 data_001.in This example shows how to use user-defined functions, variables and anonymous pipes in bash. It uses bwa, a bio-informatics tool to map unknown DNA-sequences to a given reference. The reason, why it is chosen, is that it requires different kind of inputs: A reference file or directory and at least two input files. These input files may not be compressed, but, as the script shows, uncompression be means of a gzip / zcat and redirection is a working solution. • Note, that this example sets set -x to print command traces. This is intended merely to ease comprehension. For a script in production, this should be out-commented. • Also note, that information about the function is carried to the sub-shells with the export -f statement. • Variables which are not stated upon the call of GNU parallel can be made available to a function with additional export statements (here: $OUTPUTDIR).
• Positional variables (here, just 1 in our example function) are given by the call through GNU parallel. This is useful for parameters which should change for every iteration, here: the input file name. #!/bin/bash #BSUB -n 64 #BSUB -R 'span[ptile=64]' #BSUB -q nodelong #BSUB -o %J.log #BSUB -e %J.log #BSUB -N #BSUB -W 3600 #BSUB -J 'bwa template' #BSUB -R 'rusage[ramdisk=10000,mem=1600, local=1000]' # This script is written by Christian Meesters (HPC-team, ZDV, Mainz) # # Please note: It is valid for Mogon I. The following restrictions apply: # - if your fastq-files in the defined inputdirectory are big, the given # memory might not be sufficient. In this case restrict yourself to # a data subset. # in order to see the output of all commands, we set this: set -x #1 we purge all possible module to avoid a mangled setup module purge #2 we load out GNU parallel module module load software/gnu_parallel #3 in order perform our alignment (here: bwa sampe) and subsequent sorting we # load bwa and samtools module load software/bioinf/bwa/0.7.13 module load software/bioinf/samtools/1.3 #4 make the return value of the last pipe command which fails the return value set -o pipefail #5 set a path the reference genome, extract its directory path export REFERENCEGENOME="reference/hg19.fasta" REFERENCEDIR=(dirname $REFERENCEGENOME) #6 select a base directory for all input and traverse through it: INPUTBASEDIR=./input #6b now we gather all input files: # - a first fastq file (ending on _1.fastq) # - its mate (ending on _2.fastq) # If your files name use a different scheme, adjust this script FORWARD_READS=$(find -L $INPUTBASEDIR -type f -name '*_1.fastq*') #7 create an output directory, here: according to bwa and samtools versions BWA_VERSION=$(bwa |& grep Version | cut -d ' ' -f2 | cut -d '-' -f1 )
export OUTPUTDIR="bwa${BWA_VERSION}_samtools1.1_${LSB_JOBID}"

if [ ! -d "$OUTPUTDIR" ]; then mkdir -p "/jobdir/${LSB_JOBID}/$OUTPUTDIR" mkdir -p "$OUTPUTDIR"
fi

#8 copy the reference to the ramdisk
#df -h /jobdir/${LSB_JOBID}/ramdisk mkdir -p$REFERENCEDIR /jobdir/${LSB_JOBID}/ramdisk/reference cp -r$REFERENCEDIR /jobdir/${LSB_JOBID}/ramdisk/ REFERENCEGENOME=/jobdir/${LSB_JOBID}/ramdisk/reference/$(basename$REFERENCEGENOME)
REFERENCEDIR=/jobdir/{LSB_JOBID}/ramdisk/reference #9 create an alignment function with the appropriate calls for bwa and samtools function bwa_aln { TEMPOUT=(basename $1) # check file ending: is the file ending on gz? if [ "$1" == "*.gz" ]; then
#bwa sampe $REFERENCEGENOME <( bwa aln -t 4$REFERENCEGENOME <(zcat $1) ) \ # <( bwa aln -t 4$REFERENCEGENOME <(zcat ${1/_1/_2} ) ) \ # <(zcat$1) <(zcat ${1/_1/_2}) | \ #samtools view -Shb /dev/stdin > "$OUTPUTDIR/{TEMPOUT%_1.fastq.gz}_aligned.bam" bwa mem -M -t 8REFERENCEGENOME <(zcat $1) <(zcat${1/_1/_2})  |
samtools view -Shb /dev/stdin > "$OUTPUTDIR/${TEMPOUT%_1.fastq}_aligned.bam"
else
#bwa sampe $REFERENCEGENOME <( bwa aln -t 4$REFERENCEGENOME $1 ) \ # <( bwa aln -t 4$REFERENCEGENOME ${1/_1/_2} ) \ #$1 ${1/_1/_2} | \ bwa mem -M -t 8$REFERENCEGENOME $1${1/_1/_2} |
samtools view -Shb /dev/stdin > "$OUTPUTDIR/${TEMPOUT%_1.fastq}_aligned.bam"
fi
}

#9b we need to export this function, such that all subprocesses will see it (only works in bash)
export -f bwa_aln

# finally we start processing
#  we consider taking 4 thread for each call of bwa aln, hence 8 threads
#  and 64 / 8 is 8. This results in a little over subscription, as bwa sampe
#  and samtools is run, too.
parallel -v --env bwa_aln --no-notice -j 16 bwa_aln ::: $FORWARD_READS # copy all files to the actual output director cp -r "/jobdir/${LSB_JOBID}/$OUTPUTDIR/*" "$OUTPUTDIR/."

LSF offers $LSB_HOSTS within a job to identify the hosts assigned to a particular job. This can be used to execute a command distributed over those hosts: parallel --no-notice --onall -S$(echo \$LSB_HOSTS | tr ' ' ',') echo ::: foo bar

will print the host and 'D', 'E' and 'F' (not necessarily in order and wrapped around, if more than 3 hosts are requested via bsub).

• node_local_scheduling.1456751424.txt.gz