File Info

Filename
wrapper.sh
Full Path
s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/c7/c7c0c0e9b29eb1be07b015d979f5aa41/wrapper.sh
Size
13.7 KB
Attempt
#!/bin/bash
set -euo pipefail

# Detect pod CPU. cgroup v2 (cpu.max) then v1 fallback; "max" or "-1" means
# no limit set, fall back to nproc.
if read -r _q _p < /sys/fs/cgroup/cpu.max 2>/dev/null && [ "$_q" != "max" ]; then
    POD_CPUS=$((_q / _p))
elif _q=$(cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us 2>/dev/null) && [ -n "$_q" ] && [ "$_q" -gt 0 ]; then
    POD_CPUS=$((_q / $(cat /sys/fs/cgroup/cpu/cpu.cfs_period_us)))
else
    POD_CPUS=$(nproc)
fi
[ "$POD_CPUS" -lt 1 ] && POD_CPUS=1

# S3 transfer parallelism: numworkers = parallel files in flight,
# concurrency = parallel multipart parts per file. Cap small pods at -c 5
# to stay gentle on S3 prefix request rate when many pods stage at once.
_s5_numworkers=256
[ "$POD_CPUS" -le 4 ] && _s5_concurrency=5 || _s5_concurrency=10

S5CMD="/opt/s5cmd/bin/s5cmd --log info --numworkers $_s5_numworkers"
S5_CP_FLAGS="-c $_s5_concurrency"
S3_TASK_DIR="s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/c7/c7c0c0e9b29eb1be07b015d979f5aa41"
TASK_DIR="/tmp/miniwdl_task"
EXIT_CODE=1  # Default to failure
UNSTAGE_RET=0
pid=""
STAGE_MS=0
TASK_MS=0
TRACE_START=0
METRICS_ENABLED=0

_ms() { date +%s%3N; }
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] [miniwdl-eks.task] $1" >&2; }


miniwdl_date() { local p=${1:-%s%3N}; date +"$p" 2>/dev/null || date +"$p"; }

miniwdl_tree() {
    local pid=$1
    declare -a ALL_CHILDREN
    while read P PP; do
        ALL_CHILDREN[$PP]+=" $P"
    done < <(ps -e -o pid= -o ppid=)

    pstat() {
        local STATUS=$(2>/dev/null < /proc/$1/status grep -E 'Vm|ctxt')
        if [ $? = 0 ]; then
            local x_vsz=$(echo "$STATUS" | grep VmSize | awk '{print $2}' || echo -n '0')
            local x_rss=$(echo "$STATUS" | grep VmRSS | awk '{print $2}' || echo -n '0')
            local x_peak=$(echo "$STATUS" | grep -E 'VmPeak|VmHWM' | sed 's/^.*:\s*//' | sed 's/[\sa-zA-Z]*$//' | tr '\n' ' ' || echo -n '0 0')
            local x_pmem=$(awk -v rss=$x_rss -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo -n '0')
            local vol_ctxt=$(echo "$STATUS" | grep '\bvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
            local inv_ctxt=$(echo "$STATUS" | grep '\bnonvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
            cpu_stat[$1]="$1 $x_pmem $x_vsz $x_rss $x_peak $vol_ctxt $inv_ctxt"
        fi
    }

    pwalk() {
        pstat $1
        for i in ${ALL_CHILDREN[$1]:=}; do pwalk $i; done
    }
    pwalk $1
}

miniwdl_pstat() {
    cpu_stat=()
    miniwdl_tree $1
    declare -a sum=(0 0 0 0 0 0 0 0)
    local pid; local i
    for pid in "${!cpu_stat[@]}"; do
        local row=(${cpu_stat[pid]})
        for i in "${!row[@]}"; do
            [ $i != 0 ] && sum[i]=$((sum[i]+row[i]))
        done
    done
    for i in {1..7}; do
        if [ ${sum[i]} -lt ${cpu_peak[i]} ]; then
            sum[i]=${cpu_peak[i]}
        else
            cpu_peak[i]=${sum[i]}
        fi
    done
    miniwdl_pstat_ret=(${sum[*]})
}

# Background memory/context-switch poller. Adaptive interval: 1s -> 5s -> 30s.
# Writes peak values to a temp file when signaled to stop.
# RSS uses cgroup memory stats (no CoW double-counting across forks).
# Context switches and vmem still use per-process /proc tree walk.
miniwdl_mem_watch() {
    local pid=$1
    local mem_file=$2
    local count=0
    declare -a cpu_stat=(0 0 0 0 0 0 0 0)
    declare -a cpu_peak=(0 0 0 0 0 0 0 0)
    local mem_tot=$(< /proc/meminfo grep MemTotal | awk '{print $2}')
    local timeout; local DONE; local STOP=''
    local cg_peak_kb=0
    local cg_last_kb=0

    while true; do
        miniwdl_pstat $pid
        # Read cgroup RSS (anon memory, bytes) — no CoW double-counting
        local cg_kb=$(( $(awk '/^anon / {print $2; exit}' /sys/fs/cgroup/memory.stat 2>/dev/null || echo 0) / 1024 ))
        cg_last_kb=$cg_kb
        [ $cg_kb -gt $cg_peak_kb ] && cg_peak_kb=$cg_kb
        if [ $count -lt 10 ]; then timeout=1
        elif [ $count -lt 120 ]; then timeout=5
        else timeout=30
        fi
        read -t $timeout -r DONE || true
        [[ $DONE ]] && break
        if [ ! -e /proc/$pid ]; then
            [ ! $STOP ] && STOP=$(miniwdl_date)
            [ $(($(miniwdl_date)-STOP)) -gt 10000 ] && break
        fi
        count=$((count+1))
    done

    # Write peak values to temp file (KB units)
    # rss: last-sampled cgroup anon memory; peak_rss: highest observed across all polls
    # %mem: peak-based (matches peak_rss, not rss)
    # vmem and peak_vmem use per-process tree walk (no cgroup equivalent)
    # Struct: %mem vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt
    local cg_pmem=$(awk -v rss=$cg_peak_kb -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo 0)
    echo "$cg_pmem ${miniwdl_pstat_ret[2]} $cg_last_kb ${miniwdl_pstat_ret[4]} $cg_peak_kb ${miniwdl_pstat_ret[6]} ${miniwdl_pstat_ret[7]}" > "$mem_file"
}

miniwdl_fd() {
    local FD=11
    while [ -e /proc/$$/fd/$FD ]; do FD=$((FD+1)); done
    echo $FD
}

# Snapshot /proc before task, launch mem_watch background poller.
miniwdl_trace_start() {
    local pid=$$
    if ! command -v ps &>/dev/null; then
        log "Warning: 'ps' not found, skipping metrics collection"
        METRICS_ENABLED=0
        return
    fi
    METRICS_ENABLED=1
    MEM_FILE="$TASK_DIR/.mem_stats"
    NUM_CPUS=$(< /proc/cpuinfo grep '^processor' -c)
    CPU_MODEL=$(< /proc/cpuinfo grep '^model name' | head -n 1 | awk 'BEGIN{FS="\t: "} { print $2 }')
    TOT_TIME0=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
    CPU_TIME0=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X')
    IO_STAT0=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
    TRACE_START=$(miniwdl_date)
}

# Snapshot /proc after task, signal mem_watch, compute deltas, write metrics.json.
miniwdl_trace_end() {
    [ "${METRICS_ENABLED:-0}" = "0" ] && return
    [ "${TRACE_START:-0}" = "0" ] && return  # trace_start never ran
    local pid=$$
    local end_millis=$(miniwdl_date)
    local tot_time1=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
    local cpu_time1=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X')
    local cpu_pct=$(awk -v p1=$cpu_time1 -v p0=$CPU_TIME0 -v t1=$tot_time1 -v t0=$TOT_TIME0 -v n=$NUM_CPUS \
        'BEGIN { pct=(p1-p0)/(t1-t0)*100*n; printf("%.1f", pct>0 ? pct : 0) }')

    local io_stat1=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
    local i
    for i in {0..5}; do io_stat1[i]=$((io_stat1[i]-${IO_STAT0[i]:-0})); done

    local wall_time=$((end_millis-TRACE_START))

    # Signal mem_watch to stop and wait for it
    if [ "${mem_proc:-}" ]; then
        [ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
        wait $mem_proc 2>/dev/null || true
        while [ -e /proc/$mem_proc ]; do sleep 0.1; done
    fi

    # Read mem_watch results (KB units) and convert to bytes
    local mem_pct=0 vmem=0 rss=0 peak_vmem=0 peak_rss=0 vol_ctxt=0 inv_ctxt=0
    if [ -f "$MEM_FILE" ]; then
        read mem_pct vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt < "$MEM_FILE"
        # Convert KB to bytes
        vmem=$((vmem * 1024))
        rss=$((rss * 1024))
        peak_vmem=$((peak_vmem * 1024))
        peak_rss=$((peak_rss * 1024))
    fi

    cat > "$TASK_DIR/metrics.json" << METRICS_EOF
{"realtime":$wall_time,"cpuPercent":$cpu_pct,"cpuModel":"$CPU_MODEL","memPercent":$mem_pct,"vmem":$vmem,"rss":$rss,"peakVmem":$peak_vmem,"peakRss":$peak_rss,"rchar":${io_stat1[0]},"wchar":${io_stat1[1]},"readBytes":${io_stat1[4]},"writeBytes":${io_stat1[5]},"volCtxt":$vol_ctxt,"invCtxt":$inv_ctxt}
METRICS_EOF
    log "Metrics: cpu=${cpu_pct}% rss=$((rss/1048576))MB peak_rss=$((peak_rss/1048576))MB wall=${wall_time}ms"
}


# Recursively kill a process and all its descendants.
miniwdl_kill() {
    declare -a children
    while read P PP; do
        children[$PP]+=" $P"
    done < <(ps -e -o pid= -o ppid=)
    _kill() {
        [[ $1 != $$ ]] && kill $1 2>/dev/null || true
        for i in ${children[$1]:=}; do _kill $i; done
    }
    _kill $1
}

# On SIGTERM/SIGINT (K8s pod termination, scale-down), kill the task process tree.
# The EXIT trap (cleanup) still fires after this to upload logs and work directory.
miniwdl_on_term() {
    set +e
    [[ "$pid" ]] && miniwdl_kill $pid
}

# Trap to ensure we upload logs and work directory on ANY exit (success or failure)
cleanup() {
    set +eu
    local exit_code=$EXIT_CODE

    # Collect metrics (best-effort, before unstage so we capture task-only values)
    miniwdl_trace_end 2>/dev/null || true

    $S5CMD cp "$TASK_DIR/stdout.txt" "$S3_TASK_DIR/stdout.txt" || true
    $S5CMD cp "$TASK_DIR/stderr.txt" "$S3_TASK_DIR/stderr.txt" || true

    cd "$TASK_DIR"
    for _unstage_attempt in 1 2 3; do
        $S5CMD sync $S5_CP_FLAGS --exclude "_miniwdl_inputs/*" work/ "$S3_TASK_DIR/work/"
        UNSTAGE_RET=$?
        [[ $UNSTAGE_RET -eq 0 ]] && break
        [[ $_unstage_attempt -lt 3 ]] && {
            log "Unstage attempt $_unstage_attempt/3 failed (ret=$UNSTAGE_RET), retrying in ~30s..."
            sleep $((30 + RANDOM % 10))
        }
    done

    # Record empty directories so the runner can preserve them as S3 directory
    # markers. s5cmd sync only uploads objects — empty dirs vanish without this.
    # Runs after sync intentionally: (1) empty_dirs.txt is not part of the synced
    # work artifacts, and (2) we capture the true post-upload filesystem state.
    find work/ -type d -empty ! -path "work/_miniwdl_inputs*"         > "$TASK_DIR/empty_dirs.txt" 2>/dev/null || true
    if [ -s "$TASK_DIR/empty_dirs.txt" ]; then
        $S5CMD cp "$TASK_DIR/empty_dirs.txt" "$S3_TASK_DIR/empty_dirs.txt" || true
    fi

    UNSTAGE_MS=$(( $(_ms) - _START_MS - STAGE_MS - TASK_MS ))
    TOTAL_MS=$(( $(_ms) - _START_MS ))
    cat > "$TASK_DIR/timing.json" << TIMING
{"stage_ms":$STAGE_MS,"task_ms":$TASK_MS,"unstage_ms":$UNSTAGE_MS,"total_ms":$TOTAL_MS,"exit_code":$exit_code,"unstage_ret":$UNSTAGE_RET}
TIMING
    $S5CMD cp "$TASK_DIR/timing.json" "$S3_TASK_DIR/timing.json" || true
    [ -f "$TASK_DIR/metrics.json" ] && $S5CMD cp "$TASK_DIR/metrics.json" "$S3_TASK_DIR/metrics.json" || true

    # Task failure takes priority; if task succeeded but unstage failed, use unstage code
    local final_code=$exit_code
    [[ $final_code -eq 0 && $UNSTAGE_RET -ne 0 ]] && final_code=$UNSTAGE_RET
    echo $final_code > "$TASK_DIR/exit_code"
    $S5CMD cp "$TASK_DIR/exit_code" "$S3_TASK_DIR/exit_code" || true
    log "Done exit_code=$exit_code (${UNSTAGE_MS}ms unstage, ${TOTAL_MS}ms total)"
}
trap cleanup EXIT
trap miniwdl_on_term TERM INT USR2

_START_MS=$(_ms)
log "Staging: downloading command and 5 inputs"
$S5CMD cp "$S3_TASK_DIR/command" "$TASK_DIR/command"

mkdir -p "/tmp/miniwdl_task/work/_miniwdl_inputs/0"
mkdir -p "$TASK_DIR/work"

# Stage inputs with outer retry on S3 throttling (503 SlowDown on same-prefix
# reads). Fails the pod after 3 attempts — WDL task-level maxRetries provides
# the final backstop.
_do_stage() {
    $S5CMD run << STAGE_FILES
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/cyp21_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/cyp21_hg38.bed"
cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/0f/0f04f993029d3a5b9b27eba6b510c43a/work/downloaded.bam.bai" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/downloaded.bam.bai"
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/smn_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/smn_hg38.bed"
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/gba_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/gba_hg38.bed"
cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/0f/0f04f993029d3a5b9b27eba6b510c43a/work/downloaded.bam" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/downloaded.bam"
STAGE_FILES
    local rc=$?
    if [[ $rc -ne 0 ]]; then return $rc; fi
# no directories to sync
    return 0
}
STAGE_RET=1
for _stage_attempt in 1 2 3; do
    # `|| STAGE_RET=$?` suspends set -e inside _do_stage and captures the code
    STAGE_RET=0
    _do_stage || STAGE_RET=$?
    [[ $STAGE_RET -eq 0 ]] && break
    [[ $_stage_attempt -lt 3 ]] && {
        log "Stage attempt $_stage_attempt/3 failed (ret=$STAGE_RET), retrying in ~30s..."
        sleep $((30 + RANDOM % 10))
    }
done
[[ $STAGE_RET -ne 0 ]] && { log "Stage failed after 3 attempts, aborting"; exit $STAGE_RET; }

STAGE_MS=$(( $(_ms) - _START_MS ))
log "Staging completed (5 inputs, ${STAGE_MS}ms) s5cmd=workers:$_s5_numworkers,concurrency:$_s5_concurrency,cpus:$POD_CPUS"

# Everything after staging is best-effort — metrics collection and task command
# failures must be captured, not abort the script.
set +e

# Start metrics collection (snapshots /proc, launches background memory poller)
miniwdl_trace_start

cd "$TASK_DIR/work"
# fd-swap tee: stdout/stderr captured to files AND passed to outer layer for
# kubectl logs. Background + wait allows SIGTERM to interrupt immediately.
OUT="$TASK_DIR/stdout.txt"
ERR="$TASK_DIR/stderr.txt"

(set -o pipefail; (/bin/bash "$TASK_DIR/command" | tee "$OUT") 3>&1 1>&2 2>&3 | tee "$ERR") &
pid=$!

# Launch mem_watch background poller on the task subshell, not the wrapper PID.
# Must start AFTER task so $pid is available (matches Nextflow's nxf_trace_linux pattern).
if [ "${METRICS_ENABLED:-0}" = "1" ]; then
    mem_fd=$(miniwdl_fd)
    eval "exec $mem_fd> >(miniwdl_mem_watch $pid $MEM_FILE)"
    mem_proc=$!
    trap 'kill $mem_proc 2>/dev/null' ERR
fi

wait $pid; EXIT_CODE=$?
TASK_MS=$(( $(_ms) - _START_MS - STAGE_MS ))
log "Task finished exit_code=$EXIT_CODE (${TASK_MS}ms)"

exit $EXIT_CODE