#!/bin/bash ### --- ### name: 'DAQ:CONTROL_VARIANT_CALLING:CONTROL_GERMLINE_VC:BAM_VARIANT_CALLING_SENTIEON_HAPLOTYPER_RF:BAM_VARIANT_CALLING_SENTIEON_HAPLOTYPER:SENTIEON_HAPLOTYPER (germline_control_2;chr8_93795409-93795507.bed)' ### container: '292967571998.dkr.ecr.us-west-2.amazonaws.com/community.wave.seqera.io/library/sentieon:202308.03--b599c8ad694846bd' ### outputs: ### - '*.unfiltered.vcf.gz' ### - '*.unfiltered.vcf.gz.tbi' ### - '*.g.vcf.gz' ### - '*.g.vcf.gz.tbi' ### - 'versions.yml' ### ... set -e set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x NXF_ENTRY=${1:-nxf_main} nxf_tree() { local pid=$1 declare -a ALL_CHILDREN while read P PP;do ALL_CHILDREN[$PP]+=" $P" done < <(ps -e -o pid= -o ppid=) pstat() { local x_pid=$1 local STATUS=$(2> /dev/null < /proc/$1/status grep -E 'Vm|ctxt') if [ $? = 0 ]; then local x_vsz=$(echo "$STATUS" | grep VmSize | awk '{print $2}' || echo -n '0') local x_rss=$(echo "$STATUS" | grep VmRSS | awk '{print $2}' || echo -n '0') local x_peak=$(echo "$STATUS" | grep -E 'VmPeak|VmHWM' | sed 's/^.*:\s*//' | sed 's/[\sa-zA-Z]*$//' | tr '\n' ' ' || echo -n '0 0') local x_pmem=$(awk -v rss=$x_rss -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo -n '0') local vol_ctxt=$(echo "$STATUS" | grep '\bvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0') local inv_ctxt=$(echo "$STATUS" | grep '\bnonvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0') cpu_stat[x_pid]="$x_pid $x_pmem $x_vsz $x_rss $x_peak $vol_ctxt $inv_ctxt" fi } pwalk() { pstat $1 for i in ${ALL_CHILDREN[$1]:=}; do pwalk $i; done } pwalk $1 } nxf_stat() { cpu_stat=() nxf_tree $1 declare -a sum=(0 0 0 0 0 0 0 0) local pid local i for pid in "${!cpu_stat[@]}"; do local row=(${cpu_stat[pid]}) [ $NXF_DEBUG = 1 ] && echo "++ stat mem=${row[*]}" for i in "${!row[@]}"; do if [ $i != 0 ]; then sum[i]=$((sum[i]+row[i])) fi done done [ $NXF_DEBUG = 1 ] && echo -e "++ stat SUM=${sum[*]}" for i in {1..7}; do if [ ${sum[i]} -lt ${cpu_peak[i]} ]; then sum[i]=${cpu_peak[i]} else cpu_peak[i]=${sum[i]} fi done [ $NXF_DEBUG = 1 ] && echo -e "++ stat PEAK=${sum[*]}\n" nxf_stat_ret=(${sum[*]}) } nxf_mem_watch() { set -o pipefail local pid=$1 local trace_file=.command.trace local count=0; declare -a cpu_stat=(0 0 0 0 0 0 0 0) declare -a cpu_peak=(0 0 0 0 0 0 0 0) local mem_tot=$(< /proc/meminfo grep MemTotal | awk '{print $2}') local timeout local DONE local STOP='' [ $NXF_DEBUG = 1 ] && nxf_sleep 0.2 && ps fx while true; do nxf_stat $pid if [ $count -lt 10 ]; then timeout=1; elif [ $count -lt 120 ]; then timeout=5; else timeout=30; fi read -t $timeout -r DONE || true [[ $DONE ]] && break if [ ! -e /proc/$pid ]; then [ ! $STOP ] && STOP=$(nxf_date) [ $(($(nxf_date)-STOP)) -gt 10000 ] && break fi count=$((count+1)) done printf "%s\n" \ "%mem=${nxf_stat_ret[1]}" \ "vmem=${nxf_stat_ret[2]}" \ "rss=${nxf_stat_ret[3]}" \ "peak_vmem=${nxf_stat_ret[4]}" \ "peak_rss=${nxf_stat_ret[5]}" \ "vol_ctxt=${nxf_stat_ret[6]}" \ "inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file" } nxf_write_trace() { printf "%s\n" \ "nextflow.trace/v2" \ "realtime=$wall_time" \ "%cpu=$ucpu" \ "cpu_model=$cpu_model" \ "rchar=${io_stat1[0]}" \ "wchar=${io_stat1[1]}" \ "syscr=${io_stat1[2]}" \ "syscw=${io_stat1[3]}" \ "read_bytes=${io_stat1[4]}" \ "write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file" } nxf_trace_mac() { local start_millis=$(nxf_date) /bin/bash -Ceuo pipefail .command.sh local end_millis=$(nxf_date) local wall_time=$((end_millis-start_millis)) local ucpu='' local cpu_model='' local io_stat1=('' '' '' '' '' '') nxf_write_trace } nxf_fd() { local FD=11 while [ -e /proc/$$/fd/$FD ]; do FD=$((FD+1)); done echo $FD } nxf_trace_linux() { local pid=$$ command -v ps &>/dev/null || { >&2 echo "Command 'ps' required by nextflow to collect task metrics cannot be found"; exit 1; } local num_cpus=$(< /proc/cpuinfo grep '^processor' -c) local cpu_model=$(< /proc/cpuinfo grep '^model name' | head -n 1 | awk 'BEGIN{FS="\t: "} { print $2 }') local tot_time0=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}') local cpu_time0=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X') local io_stat0=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0')) local start_millis=$(nxf_date) trap 'kill $mem_proc' ERR /bin/bash -Ceuo pipefail .command.sh & local task=$! mem_fd=$(nxf_fd) eval "exec $mem_fd> >(nxf_mem_watch $task)" local mem_proc=$! wait $task local end_millis=$(nxf_date) local tot_time1=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}') local cpu_time1=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X') local ucpu=$(awk -v p1=$cpu_time1 -v p0=$cpu_time0 -v t1=$tot_time1 -v t0=$tot_time0 -v n=$num_cpus 'BEGIN { pct=(p1-p0)/(t1-t0)*100*n; printf("%.0f", pct>0 ? pct : 0) }' ) local io_stat1=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0')) local i for i in {0..5}; do io_stat1[i]=$((io_stat1[i]-io_stat0[i])) done local wall_time=$((end_millis-start_millis)) [ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}" printf "%s\n" \ "nextflow.trace/v2" \ "realtime=$wall_time" \ "%cpu=$ucpu" \ "cpu_model=$cpu_model" \ "rchar=${io_stat1[0]}" \ "wchar=${io_stat1[1]}" \ "syscr=${io_stat1[2]}" \ "syscw=${io_stat1[3]}" \ "read_bytes=${io_stat1[4]}" \ "write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file" [ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true wait $mem_proc 2>/dev/null || true while [ -e /proc/$mem_proc ]; do nxf_sleep 0.1; done } nxf_trace() { local trace_file=.command.trace touch $trace_file if [[ $(uname) = Darwin ]]; then nxf_trace_mac else nxf_trace_linux fi } # bash helper functions nxf_cp_retry() { local max_attempts=1 local timeout=10 local attempt=0 local exitCode=0 while (( $attempt < $max_attempts )) do if "$@" then return 0 else exitCode=$? fi if [[ $exitCode == 0 ]] then break fi nxf_sleep $timeout attempt=$(( attempt + 1 )) timeout=$(( timeout * 2 )) done } nxf_parallel() { IFS=$'\n' local cmd=("$@") local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c) local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi) local i=0 local pid=() ( set +u while ((i<${#cmd[@]})); do local copy=() for x in "${pid[@]}"; do # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code # see https://github.com/nextflow-io/nextflow/pull/4050 [[ -e /proc/$x ]] && copy+=($x) || wait $x done pid=("${copy[@]}") if ((${#pid[@]}>=$max)); then nxf_sleep 0.2 else eval "${cmd[$i]}" & pid+=($!) ((i+=1)) fi done for p in "${pid[@]}"; do wait $p done ) unset IFS } # aws helper for s5cmd nxf_s3_upload() { local name=$1 local s3path=$2 if [[ "$name" == - ]]; then local tmp=$(nxf_mktemp) cp /dev/stdin $tmp/$name /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD $tmp/$name "$s3path" elif [[ -d "$name" ]]; then /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD "$name/" "$s3path/$name/" else /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD "$name" "$s3path/$name" fi } nxf_s3_download() { local source=$1 local target=$2 echo " Downloading: $source" local file_name=$(basename $1) local is_dir=$(/opt/s5cmd/bin/s5cmd ls $source | grep -F "DIR ${file_name}/" -c) if [[ $is_dir == 1 ]]; then /opt/s5cmd/bin/s5cmd --log error cp "$source/*" "$target" else /opt/s5cmd/bin/s5cmd --log error cp "$source" "$target" fi } nxf_sleep() { sleep $1 2>/dev/null || sleep 1; } nxf_date() { local ts=$(date +%s%3N); if [[ ${#ts} == 10 ]]; then echo ${ts}000 elif [[ $ts == *%3N ]]; then echo ${ts/\%3N/000} elif [[ $ts == *3N ]]; then echo ${ts/3N/000} elif [[ ${#ts} == 13 ]]; then echo $ts else echo "Unexpected timestamp value: $ts"; exit 1 fi } nxf_env() { echo '============= task environment =============' env | sort | sed "s/\(.*\)AWS\(.*\)=\(.\{6\}\).*/\1AWS\2=\3xxxxxxxxxxxxx/" echo '============= task output ==================' } nxf_kill() { declare -a children while read P PP;do children[$PP]+=" $P" done < <(ps -e -o pid= -o ppid=) kill_all() { [[ $1 != $$ ]] && kill $1 2>/dev/null || true for i in ${children[$1]:=}; do kill_all $i; done } kill_all $1 } nxf_mktemp() { local base=${1:-/tmp} mkdir -p "$base" if [[ $(uname) = Darwin ]]; then mktemp -d $base/nxf.XXXXXXXXXX else TMPDIR="$base" mktemp -d -t nxf.XXXXXXXXXX fi } nxf_fs_copy() { local source=$1 local target=$2 local basedir=$(dirname $1) mkdir -p $target/$basedir cp -fRL $source $target/$basedir } nxf_fs_move() { local source=$1 local target=$2 local basedir=$(dirname $1) mkdir -p $target/$basedir mv -f $source $target/$basedir } nxf_fs_rsync() { rsync -rRl $1 $2 } nxf_fs_rclone() { rclone copyto $1 $2/$1 } nxf_fs_fcp() { fcp $1 $2/$1 } on_exit() { local last_err=$? local exit_status=${nxf_main_ret:=0} [[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0} [[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err} printf -- $exit_status | nxf_s3_upload - s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025/.exitcode || true set +u rm -rf $NXF_SCRATCH || true exit $exit_status } on_term() { set +e [[ "$pid" ]] && nxf_kill $pid } nxf_launch() { /bin/bash -Ceuo pipefail .command.run nxf_trace } nxf_stage() { true # stage input files downloads=(true) rm -f Homo_sapiens_assembly38.fasta rm -f dbsnp_146.hg38.vcf.gz.tbi rm -f .command.sh rm -f Homo_sapiens_assembly38.fasta.fai rm -f .command.run rm -f Sig_18_Blood.recalibrated.bam.bai rm -f chr8_93795409-93795507.bed.gz rm -f Sig_18_Blood.recalibrated.bam rm -f dbsnp_146.hg38.vcf.gz downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/ngi-igenomes/igenomes/Homo_sapiens/GATK/GRCh38/Sequence/WholeGenomeFasta/Homo_sapiens_assembly38.fasta Homo_sapiens_assembly38.fasta") downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/ngi-igenomes/igenomes/Homo_sapiens/GATK/GRCh38/Annotation/GATKBundle/dbsnp_146.hg38.vcf.gz.tbi dbsnp_146.hg38.vcf.gz.tbi") downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025/.command.sh .command.sh") downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/ngi-igenomes/igenomes/Homo_sapiens/GATK/GRCh38/Sequence/WholeGenomeFasta/Homo_sapiens_assembly38.fasta.fai Homo_sapiens_assembly38.fasta.fai") downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025/.command.run .command.run") downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/03/b9327ccd07f95c93afdb2eed727086/Sig_18_Blood.recalibrated.bam.bai Sig_18_Blood.recalibrated.bam.bai") downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/ngi-igenomes/igenomes/Homo_sapiens/GATK/GRCh38/Annotation/intervals/altera_intervals/scatter/chr8_93795409-93795507.bed.gz chr8_93795409-93795507.bed.gz") downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/03/b9327ccd07f95c93afdb2eed727086/Sig_18_Blood.recalibrated.bam Sig_18_Blood.recalibrated.bam") downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/ngi-igenomes/igenomes/Homo_sapiens/GATK/GRCh38/Annotation/GATKBundle/dbsnp_146.hg38.vcf.gz dbsnp_146.hg38.vcf.gz") nxf_parallel "${downloads[@]}" echo "==> STAGING COMPLETE (9 inputs)" echo "" } nxf_unstage_outputs() { true uploads=() IFS=$'\n' for name in $(eval "ls -1d *.unfiltered.vcf.gz *.unfiltered.vcf.gz.tbi *.g.vcf.gz *.g.vcf.gz.tbi versions.yml" | sort | uniq); do uploads+=("nxf_s3_upload '$name' s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025") done unset IFS nxf_parallel "${uploads[@]}" } nxf_unstage_controls() { true nxf_s3_upload .command.out s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025 || true nxf_s3_upload .command.err s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025 || true nxf_s3_upload .command.trace s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025 || true } nxf_unstage() { if [[ ${nxf_main_ret:=0} == 0 ]]; then (set -e -o pipefail; (nxf_unstage_outputs | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err) nxf_unstage_ret=$? fi nxf_unstage_controls } nxf_main() { trap on_exit EXIT trap on_term TERM INT USR2 trap '' USR1 [[ "${NXF_CHDIR:-}" ]] && cd "$NXF_CHDIR" NXF_SCRATCH="$(set +u; nxf_mktemp /tmp)" [[ $NXF_DEBUG > 0 ]] && nxf_env echo start | nxf_s3_upload - s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/ef/3f0f828b6d851080910468fd105025/.command.begin set +u # beforeScript directive export SENTIEON_LICENSE=$(grep -o '"SENTIEON_LICENSE":"[^"]*"' /mnt/secrets/sandbox_batch_runner_sentieon | cut -d'"' -f4) set -u /opt/s5cmd/bin/s5cmd --log error cp "s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/tmp/06/8b45f0609c781577d728c2eeee7c80/bin/*" $NXF_SCRATCH/nextflow-bin/ chmod +x $NXF_SCRATCH/nextflow-bin/* || true export PATH=$NXF_SCRATCH/nextflow-bin:$PATH [[ $NXF_SCRATCH ]] && cd $NXF_SCRATCH export NXF_TASK_WORKDIR="$PWD" nxf_stage set +e (set -o pipefail; (nxf_launch | tee .command.out) 3>&1 1>&2 2>&3 | tee .command.err) & pid=$! wait $pid || nxf_main_ret=$? nxf_unstage } $NXF_ENTRY