File Info

Filename
.command.run
Full Path
s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46/.command.run
Size
14.4 KB
Attempt
#!/bin/bash
### ---
### name: 'NFCORE_RNAFUSION:RNAFUSION:FUSIONINSPECTOR_WORKFLOW:FUSIONINSPECTOR (A673_FFPE_RNA_0001_B23LG7FLT4_1)'
### container: '292967571998.dkr.ecr.us-west-2.amazonaws.com/trinityctat/starfusion:1.12.0'
### outputs:
### - '*FusionInspector.fusions.tsv'
### - '*.coding_effect'
### - 'fi_workdir/*.gtf'
### - '*FusionInspector.log'
### - '*html'
### - '*abridged.tsv'
### - 'IGV_inputs'
### - 'fi_workdir'
### - 'chckpts_dir'
### - 'versions.yml'
### ...
set -e
set -u
NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x
NXF_ENTRY=${1:-nxf_main}

nxf_tree() {
    local pid=$1

    declare -a ALL_CHILDREN
    while read P PP;do
        ALL_CHILDREN[$PP]+=" $P"
    done < <(ps -e -o pid= -o ppid=)

    pstat() {
        local x_pid=$1
        local STATUS=$(2> /dev/null < /proc/$1/status grep -E 'Vm|ctxt')

        if [ $? = 0 ]; then
        local  x_vsz=$(echo "$STATUS" | grep VmSize | awk '{print $2}' || echo -n '0')
        local  x_rss=$(echo "$STATUS" | grep VmRSS | awk '{print $2}' || echo -n '0')
        local x_peak=$(echo "$STATUS" | grep -E 'VmPeak|VmHWM' | sed 's/^.*:\s*//' | sed 's/[\sa-zA-Z]*$//' | tr '\n' ' ' || echo -n '0 0')
        local x_pmem=$(awk -v rss=$x_rss -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo -n '0')
        local vol_ctxt=$(echo "$STATUS" | grep '\bvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
        local inv_ctxt=$(echo "$STATUS" | grep '\bnonvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
        cpu_stat[x_pid]="$x_pid $x_pmem $x_vsz $x_rss $x_peak $vol_ctxt $inv_ctxt"
        fi
    }

    pwalk() {
        pstat $1
        for i in ${ALL_CHILDREN[$1]:=}; do pwalk $i; done
    }

    pwalk $1
}

nxf_stat() {
    cpu_stat=()
    nxf_tree $1

    declare -a sum=(0 0 0 0 0 0 0 0)
    local pid
    local i
    for pid in "${!cpu_stat[@]}"; do
        local row=(${cpu_stat[pid]})
        [ $NXF_DEBUG = 1 ] && echo "++ stat mem=${row[*]}"
        for i in "${!row[@]}"; do
        if [ $i != 0 ]; then
            sum[i]=$((sum[i]+row[i]))
        fi
        done
    done

    [ $NXF_DEBUG = 1 ] && echo -e "++ stat SUM=${sum[*]}"

    for i in {1..7}; do
        if [ ${sum[i]} -lt ${cpu_peak[i]} ]; then
            sum[i]=${cpu_peak[i]}
        else
            cpu_peak[i]=${sum[i]}
        fi
    done

    [ $NXF_DEBUG = 1 ] && echo -e "++ stat PEAK=${sum[*]}\n"
    nxf_stat_ret=(${sum[*]})
}

nxf_mem_watch() {
    set -o pipefail
    local pid=$1
    local trace_file=.command.trace
    local count=0;
    declare -a cpu_stat=(0 0 0 0 0 0 0 0)
    declare -a cpu_peak=(0 0 0 0 0 0 0 0)
    local mem_tot=$(< /proc/meminfo grep MemTotal | awk '{print $2}')
    local timeout
    local DONE
    local STOP=''

    [ $NXF_DEBUG = 1 ] && nxf_sleep 0.2 && ps fx

    while true; do
        nxf_stat $pid
        if [ $count -lt 10 ]; then timeout=1;
        elif [ $count -lt 120 ]; then timeout=5;
        else timeout=30;
        fi
        read -t $timeout -r DONE || true
        [[ $DONE ]] && break
        if [ ! -e /proc/$pid ]; then
            [ ! $STOP ] && STOP=$(nxf_date)
            [ $(($(nxf_date)-STOP)) -gt 10000 ] && break
        fi
        count=$((count+1))
    done

    printf "%s\n" \
        "%mem=${nxf_stat_ret[1]}" \
        "vmem=${nxf_stat_ret[2]}" \
        "rss=${nxf_stat_ret[3]}" \
        "peak_vmem=${nxf_stat_ret[4]}" \
        "peak_rss=${nxf_stat_ret[5]}" \
        "vol_ctxt=${nxf_stat_ret[6]}" \
        "inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file"
}

nxf_write_trace() {
    printf "%s\n" \
        "nextflow.trace/v2" \
        "realtime=$wall_time" \
        "%cpu=$ucpu" \
        "cpu_model=$cpu_model" \
        "rchar=${io_stat1[0]}" \
        "wchar=${io_stat1[1]}" \
        "syscr=${io_stat1[2]}" \
        "syscw=${io_stat1[3]}" \
        "read_bytes=${io_stat1[4]}" \
        "write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"
}

nxf_trace_mac() {
    local start_millis=$(nxf_date)

    /bin/bash -euo pipefail .command.sh

    local end_millis=$(nxf_date)
    local wall_time=$((end_millis-start_millis))
    local ucpu=''
    local cpu_model=''
    local io_stat1=('' '' '' '' '' '')
    nxf_write_trace
}

nxf_fd() {
    local FD=11
    while [ -e /proc/$$/fd/$FD ]; do FD=$((FD+1)); done
    echo $FD
}

nxf_trace_linux() {
    local pid=$$
    command -v ps &>/dev/null || { >&2 echo "Command 'ps' required by nextflow to collect task metrics cannot be found"; exit 1; }
    local num_cpus=$(< /proc/cpuinfo grep '^processor' -c)
    local cpu_model=$(< /proc/cpuinfo grep '^model name' | head -n 1 | awk 'BEGIN{FS="\t: "} { print $2 }')
    local tot_time0=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
    local cpu_time0=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X')
    local io_stat0=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
    local start_millis=$(nxf_date)
    trap 'kill $mem_proc' ERR
    
    /bin/bash -euo pipefail .command.sh &
    local task=$!

    mem_fd=$(nxf_fd)
    eval "exec $mem_fd> >(nxf_mem_watch $task)"
    local mem_proc=$!

    wait $task

    local end_millis=$(nxf_date)
    local tot_time1=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
    local cpu_time1=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X')
    local ucpu=$(awk -v p1=$cpu_time1 -v p0=$cpu_time0 -v t1=$tot_time1 -v t0=$tot_time0 -v n=$num_cpus 'BEGIN { pct=(p1-p0)/(t1-t0)*100*n; printf("%.0f", pct>0 ? pct : 0) }' )

    local io_stat1=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
    local i
    for i in {0..5}; do
        io_stat1[i]=$((io_stat1[i]-io_stat0[i]))
    done

    local wall_time=$((end_millis-start_millis))
    [ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}"

    printf "%s\n" \
        "nextflow.trace/v2" \
        "realtime=$wall_time" \
        "%cpu=$ucpu" \
        "cpu_model=$cpu_model" \
        "rchar=${io_stat1[0]}" \
        "wchar=${io_stat1[1]}" \
        "syscr=${io_stat1[2]}" \
        "syscw=${io_stat1[3]}" \
        "read_bytes=${io_stat1[4]}" \
        "write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"

    [ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
    wait $mem_proc 2>/dev/null || true
    while [ -e /proc/$mem_proc ]; do nxf_sleep 0.1; done
}

nxf_trace() {
    local trace_file=.command.trace
    touch $trace_file
    if [[ $(uname) = Darwin ]]; then
        nxf_trace_mac
    else
        nxf_trace_linux
    fi
}
# bash helper functions
nxf_cp_retry() {
    local max_attempts=1
    local timeout=10
    local attempt=0
    local exitCode=0
    while (( $attempt < $max_attempts ))
    do
      if "$@"
        then
          return 0
      else
        exitCode=$?
      fi
      if [[ $exitCode == 0 ]]
      then
        break
      fi
      nxf_sleep $timeout
      attempt=$(( attempt + 1 ))
      timeout=$(( timeout * 2 ))
    done
}

nxf_parallel() {
    IFS=$'\n'
    local cmd=("$@")
    local cpus=$(nproc 2>/dev/null || < /proc/cpuinfo grep '^process' -c)
    local max=$(if (( cpus>4 )); then echo 4; else echo $cpus; fi)
    local i=0
    local pid=()
    (
    set +u
    while ((i<${#cmd[@]})); do
        local copy=()
        for x in "${pid[@]}"; do
          # if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
          # see https://github.com/nextflow-io/nextflow/pull/4050
          [[ -e /proc/$x ]] && copy+=($x) || wait $x
        done
        pid=("${copy[@]}")

        if ((${#pid[@]}>=$max)); then
          nxf_sleep 0.2
        else
          eval "${cmd[$i]}" &
          pid+=($!)
          ((i+=1))
        fi
    done
    for p in "${pid[@]}"; do
        wait $p
    done
    )
    unset IFS
}

# aws helper for s5cmd
nxf_s3_upload() {
    local name=$1
    local s3path=$2
    if [[ "$name" == - ]]; then
      local tmp=$(nxf_mktemp)
      cp /dev/stdin $tmp/$name
      /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD $tmp/$name "$s3path"
    elif [[ -d "$name" ]]; then
      /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD "$name/" "$s3path/$name/"
    else
      /opt/s5cmd/bin/s5cmd --log error cp --storage-class STANDARD "$name" "$s3path/$name"
    fi
}

nxf_s3_download() {
    local source=$1
    local target=$2
    echo "  Downloading: $source"
    local file_name=$(basename $1)
    local is_dir=$(/opt/s5cmd/bin/s5cmd ls $source | grep -F "DIR  ${file_name}/" -c)
    if [[ $is_dir == 1 ]]; then
        /opt/s5cmd/bin/s5cmd --log error cp "$source/*" "$target"
    else
        /opt/s5cmd/bin/s5cmd --log error cp "$source" "$target"
    fi
}

nxf_sleep() {
  sleep $1 2>/dev/null || sleep 1;
}

nxf_date() {
    local ts=$(date +%s%3N);
    if [[ ${#ts} == 10 ]]; then echo ${ts}000
    elif [[ $ts == *%3N ]]; then echo ${ts/\%3N/000}
    elif [[ $ts == *3N ]]; then echo ${ts/3N/000}
    elif [[ ${#ts} == 13 ]]; then echo $ts
    else echo "Unexpected timestamp value: $ts"; exit 1
    fi
}

nxf_env() {
    echo '============= task environment ============='
    env | sort | sed "s/\(.*\)AWS\(.*\)=\(.\{6\}\).*/\1AWS\2=\3xxxxxxxxxxxxx/"
    echo '============= task output =================='
}

nxf_kill() {
    declare -a children
    while read P PP;do
        children[$PP]+=" $P"
    done < <(ps -e -o pid= -o ppid=)

    kill_all() {
        [[ $1 != $$ ]] && kill $1 2>/dev/null || true
        for i in ${children[$1]:=}; do kill_all $i; done
    }

    kill_all $1
}

nxf_mktemp() {
    local base=${1:-/tmp}
    mkdir -p "$base"
    if [[ $(uname) = Darwin ]]; then mktemp -d $base/nxf.XXXXXXXXXX
    else TMPDIR="$base" mktemp -d -t nxf.XXXXXXXXXX
    fi
}

nxf_fs_copy() {
  local source=$1
  local target=$2
  local basedir=$(dirname $1)
  mkdir -p $target/$basedir
  cp -fRL $source $target/$basedir
}

nxf_fs_move() {
  local source=$1
  local target=$2
  local basedir=$(dirname $1)
  mkdir -p $target/$basedir
  mv -f $source $target/$basedir
}

nxf_fs_rsync() {
  rsync -rRl $1 $2
}

nxf_fs_rclone() {
  rclone copyto $1 $2/$1
}

nxf_fs_fcp() {
  fcp $1 $2/$1
}

on_exit() {
    local last_err=$?
    local exit_status=${nxf_main_ret:=0}
    [[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0}
    [[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
    printf -- $exit_status | nxf_s3_upload - s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46/.exitcode || true
    set +u
    rm -rf $NXF_SCRATCH || true
    exit $exit_status
}

on_term() {
    set +e
    [[ "$pid" ]] && nxf_kill $pid
}

nxf_launch() {
    /bin/bash -euo pipefail .command.run nxf_trace
}

nxf_stage() {
    true
    # stage input files
    downloads=(true)
    rm -f A673_FFPE_RNA_0001_B23LG7FLT4_1.fusionreport.tsv
    rm -f ctat_genome_lib_build_dir
    rm -f .command.sh
    rm -f .command.run
    rm -f A673_FFPE_RNA_0001_B23LG7FLT4_1_1.fastp.fastq.gz
    rm -f A673_FFPE_RNA_0001_B23LG7FLT4_1_2.fastp.fastq.gz
    downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/8a/a6f3b4dbb6d600569e509a2ae06412/A673_FFPE_RNA_0001_B23LG7FLT4_1.fusionreport.tsv A673_FFPE_RNA_0001_B23LG7FLT4_1.fusionreport.tsv")
    downloads+=("nxf_s3_download s3://natera-platform-sandbox/pipeline-resources/AIH/rna/GRCh38/starfusion/ctat_genome_lib_build_dir ctat_genome_lib_build_dir")
    downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46/.command.sh .command.sh")
    downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46/.command.run .command.run")
    downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/20/540a955e00c3ce41bcffae47ee2d1b/A673_FFPE_RNA_0001_B23LG7FLT4_1_1.fastp.fastq.gz A673_FFPE_RNA_0001_B23LG7FLT4_1_1.fastp.fastq.gz")
    downloads+=("nxf_s3_download s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/20/540a955e00c3ce41bcffae47ee2d1b/A673_FFPE_RNA_0001_B23LG7FLT4_1_2.fastp.fastq.gz A673_FFPE_RNA_0001_B23LG7FLT4_1_2.fastp.fastq.gz")
    nxf_parallel "${downloads[@]}"
    echo "==> STAGING COMPLETE (6 inputs)"
    echo ""
}

nxf_unstage_outputs() {
    true
    uploads=()
    IFS=$'\n'
    for name in $(eval "ls -1d *FusionInspector.fusions.tsv *.coding_effect fi_workdir/*.gtf *FusionInspector.log *html *abridged.tsv IGV_inputs fi_workdir chckpts_dir versions.yml" | sort | uniq); do
        uploads+=("nxf_s3_upload '$name' s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46")
    done
    unset IFS
    nxf_parallel "${uploads[@]}"
}

nxf_unstage_controls() {
    true
    nxf_s3_upload .command.out s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46 || true
    nxf_s3_upload .command.err s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46 || true
    nxf_s3_upload .command.trace s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46 || true
}

nxf_unstage() {
    if [[ ${nxf_main_ret:=0} == 0 ]]; then
        (set -e -o pipefail; (nxf_unstage_outputs | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
        nxf_unstage_ret=$?
    fi
    nxf_unstage_controls
}

nxf_main() {
    trap on_exit EXIT
    trap on_term TERM INT USR2
    trap '' USR1

    [[ "${NXF_CHDIR:-}" ]] && cd "$NXF_CHDIR"
    NXF_SCRATCH="$(set +u; nxf_mktemp /tmp)"
    [[ $NXF_DEBUG > 0 ]] && nxf_env
    echo start | nxf_s3_upload - s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/1b/84712208870eebd0f906dda7d88b46/.command.begin
    set +u
    set -u
    /opt/s5cmd/bin/s5cmd --log error cp "s3://natera-rnd-pltf-dev-nextflow-scratch-01/work/tmp/a5/ab012ece0a3155b88c657f2f7a80c7/bin/*" $NXF_SCRATCH/nextflow-bin/
    chmod +x $NXF_SCRATCH/nextflow-bin/* || true
    export PATH=$NXF_SCRATCH/nextflow-bin:$PATH
    export PYTHONNOUSERSITE="1"
    export R_PROFILE_USER="/.Rprofile"
    export R_ENVIRON_USER="/.Renviron"
    export JULIA_DEPOT_PATH="/usr/local/share/julia"
    [[ $NXF_SCRATCH ]] && cd $NXF_SCRATCH
    export NXF_TASK_WORKDIR="$PWD"
    nxf_stage

    set +e
    (set -o pipefail; (nxf_launch | tee .command.out) 3>&1 1>&2 2>&3 | tee .command.err) &
    pid=$!
    wait $pid || nxf_main_ret=$?
    nxf_unstage
}

$NXF_ENTRY