#!/bin/bash set -euo pipefail # Detect pod CPU. cgroup v2 (cpu.max) then v1 fallback; "max" or "-1" means # no limit set, fall back to nproc. if read -r _q _p < /sys/fs/cgroup/cpu.max 2>/dev/null && [ "$_q" != "max" ]; then POD_CPUS=$((_q / _p)) elif _q=$(cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us 2>/dev/null) && [ -n "$_q" ] && [ "$_q" -gt 0 ]; then POD_CPUS=$((_q / $(cat /sys/fs/cgroup/cpu/cpu.cfs_period_us))) else POD_CPUS=$(nproc) fi [ "$POD_CPUS" -lt 1 ] && POD_CPUS=1 # S3 transfer parallelism: numworkers = parallel files in flight, # concurrency = parallel multipart parts per file. Cap small pods at -c 5 # to stay gentle on S3 prefix request rate when many pods stage at once. _s5_numworkers=256 [ "$POD_CPUS" -le 4 ] && _s5_concurrency=5 || _s5_concurrency=10 S5CMD="/opt/s5cmd/bin/s5cmd --log info --numworkers $_s5_numworkers" S5_CP_FLAGS="-c $_s5_concurrency" S3_TASK_DIR="s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/fe/fe19f8fec9b7334d385de635cbcd1995" TASK_DIR="/tmp/miniwdl_task" EXIT_CODE=1 # Default to failure UNSTAGE_RET=0 pid="" STAGE_MS=0 TASK_MS=0 TRACE_START=0 METRICS_ENABLED=0 _ms() { date +%s%3N; } log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] [miniwdl-eks.task] $1" >&2; } miniwdl_date() { local p=${1:-%s%3N}; date +"$p" 2>/dev/null || date +"$p"; } miniwdl_tree() { local pid=$1 declare -a ALL_CHILDREN while read P PP; do ALL_CHILDREN[$PP]+=" $P" done < <(ps -e -o pid= -o ppid=) pstat() { local STATUS=$(2>/dev/null < /proc/$1/status grep -E 'Vm|ctxt') if [ $? = 0 ]; then local x_vsz=$(echo "$STATUS" | grep VmSize | awk '{print $2}' || echo -n '0') local x_rss=$(echo "$STATUS" | grep VmRSS | awk '{print $2}' || echo -n '0') local x_peak=$(echo "$STATUS" | grep -E 'VmPeak|VmHWM' | sed 's/^.*:\s*//' | sed 's/[\sa-zA-Z]*$//' | tr '\n' ' ' || echo -n '0 0') local x_pmem=$(awk -v rss=$x_rss -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo -n '0') local vol_ctxt=$(echo "$STATUS" | grep '\bvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0') local inv_ctxt=$(echo "$STATUS" | grep '\bnonvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0') cpu_stat[$1]="$1 $x_pmem $x_vsz $x_rss $x_peak $vol_ctxt $inv_ctxt" fi } pwalk() { pstat $1 for i in ${ALL_CHILDREN[$1]:=}; do pwalk $i; done } pwalk $1 } miniwdl_pstat() { cpu_stat=() miniwdl_tree $1 declare -a sum=(0 0 0 0 0 0 0 0) local pid; local i for pid in "${!cpu_stat[@]}"; do local row=(${cpu_stat[pid]}) for i in "${!row[@]}"; do [ $i != 0 ] && sum[i]=$((sum[i]+row[i])) done done for i in {1..7}; do if [ ${sum[i]} -lt ${cpu_peak[i]} ]; then sum[i]=${cpu_peak[i]} else cpu_peak[i]=${sum[i]} fi done miniwdl_pstat_ret=(${sum[*]}) } # Background memory/context-switch poller. Adaptive interval: 1s -> 5s -> 30s. # Writes peak values to a temp file when signaled to stop. # RSS uses cgroup memory stats (no CoW double-counting across forks). # Context switches and vmem still use per-process /proc tree walk. miniwdl_mem_watch() { local pid=$1 local mem_file=$2 local count=0 declare -a cpu_stat=(0 0 0 0 0 0 0 0) declare -a cpu_peak=(0 0 0 0 0 0 0 0) local mem_tot=$(< /proc/meminfo grep MemTotal | awk '{print $2}') local timeout; local DONE; local STOP='' local cg_peak_kb=0 local cg_last_kb=0 while true; do miniwdl_pstat $pid # Read cgroup RSS (anon memory, bytes) — no CoW double-counting local cg_kb=$(( $(awk '/^anon / {print $2; exit}' /sys/fs/cgroup/memory.stat 2>/dev/null || echo 0) / 1024 )) cg_last_kb=$cg_kb [ $cg_kb -gt $cg_peak_kb ] && cg_peak_kb=$cg_kb if [ $count -lt 10 ]; then timeout=1 elif [ $count -lt 120 ]; then timeout=5 else timeout=30 fi read -t $timeout -r DONE || true [[ $DONE ]] && break if [ ! -e /proc/$pid ]; then [ ! $STOP ] && STOP=$(miniwdl_date) [ $(($(miniwdl_date)-STOP)) -gt 10000 ] && break fi count=$((count+1)) done # Write peak values to temp file (KB units) # rss: last-sampled cgroup anon memory; peak_rss: highest observed across all polls # %mem: peak-based (matches peak_rss, not rss) # vmem and peak_vmem use per-process tree walk (no cgroup equivalent) # Struct: %mem vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt local cg_pmem=$(awk -v rss=$cg_peak_kb -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo 0) echo "$cg_pmem ${miniwdl_pstat_ret[2]} $cg_last_kb ${miniwdl_pstat_ret[4]} $cg_peak_kb ${miniwdl_pstat_ret[6]} ${miniwdl_pstat_ret[7]}" > "$mem_file" } miniwdl_fd() { local FD=11 while [ -e /proc/$$/fd/$FD ]; do FD=$((FD+1)); done echo $FD } # Snapshot /proc before task, launch mem_watch background poller. miniwdl_trace_start() { local pid=$$ if ! command -v ps &>/dev/null; then log "Warning: 'ps' not found, skipping metrics collection" METRICS_ENABLED=0 return fi METRICS_ENABLED=1 MEM_FILE="$TASK_DIR/.mem_stats" NUM_CPUS=$(< /proc/cpuinfo grep '^processor' -c) CPU_MODEL=$(< /proc/cpuinfo grep '^model name' | head -n 1 | awk 'BEGIN{FS="\t: "} { print $2 }') TOT_TIME0=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}') CPU_TIME0=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X') IO_STAT0=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0')) TRACE_START=$(miniwdl_date) } # Snapshot /proc after task, signal mem_watch, compute deltas, write metrics.json. miniwdl_trace_end() { [ "${METRICS_ENABLED:-0}" = "0" ] && return [ "${TRACE_START:-0}" = "0" ] && return # trace_start never ran local pid=$$ local end_millis=$(miniwdl_date) local tot_time1=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}') local cpu_time1=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X') local cpu_pct=$(awk -v p1=$cpu_time1 -v p0=$CPU_TIME0 -v t1=$tot_time1 -v t0=$TOT_TIME0 -v n=$NUM_CPUS \ 'BEGIN { pct=(p1-p0)/(t1-t0)*100*n; printf("%.1f", pct>0 ? pct : 0) }') local io_stat1=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0')) local i for i in {0..5}; do io_stat1[i]=$((io_stat1[i]-${IO_STAT0[i]:-0})); done local wall_time=$((end_millis-TRACE_START)) # Signal mem_watch to stop and wait for it if [ "${mem_proc:-}" ]; then [ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true wait $mem_proc 2>/dev/null || true while [ -e /proc/$mem_proc ]; do sleep 0.1; done fi # Read mem_watch results (KB units) and convert to bytes local mem_pct=0 vmem=0 rss=0 peak_vmem=0 peak_rss=0 vol_ctxt=0 inv_ctxt=0 if [ -f "$MEM_FILE" ]; then read mem_pct vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt < "$MEM_FILE" # Convert KB to bytes vmem=$((vmem * 1024)) rss=$((rss * 1024)) peak_vmem=$((peak_vmem * 1024)) peak_rss=$((peak_rss * 1024)) fi cat > "$TASK_DIR/metrics.json" << METRICS_EOF {"realtime":$wall_time,"cpuPercent":$cpu_pct,"cpuModel":"$CPU_MODEL","memPercent":$mem_pct,"vmem":$vmem,"rss":$rss,"peakVmem":$peak_vmem,"peakRss":$peak_rss,"rchar":${io_stat1[0]},"wchar":${io_stat1[1]},"readBytes":${io_stat1[4]},"writeBytes":${io_stat1[5]},"volCtxt":$vol_ctxt,"invCtxt":$inv_ctxt} METRICS_EOF log "Metrics: cpu=${cpu_pct}% rss=$((rss/1048576))MB peak_rss=$((peak_rss/1048576))MB wall=${wall_time}ms" } # Recursively kill a process and all its descendants. miniwdl_kill() { declare -a children while read P PP; do children[$PP]+=" $P" done < <(ps -e -o pid= -o ppid=) _kill() { [[ $1 != $$ ]] && kill $1 2>/dev/null || true for i in ${children[$1]:=}; do _kill $i; done } _kill $1 } # On SIGTERM/SIGINT (K8s pod termination, scale-down), kill the task process tree. # The EXIT trap (cleanup) still fires after this to upload logs and work directory. miniwdl_on_term() { set +e [[ "$pid" ]] && miniwdl_kill $pid } # Trap to ensure we upload logs and work directory on ANY exit (success or failure) cleanup() { set +eu local exit_code=$EXIT_CODE # Collect metrics (best-effort, before unstage so we capture task-only values) miniwdl_trace_end 2>/dev/null || true $S5CMD cp "$TASK_DIR/stdout.txt" "$S3_TASK_DIR/stdout.txt" || true $S5CMD cp "$TASK_DIR/stderr.txt" "$S3_TASK_DIR/stderr.txt" || true cd "$TASK_DIR" for _unstage_attempt in 1 2 3; do $S5CMD sync $S5_CP_FLAGS --exclude "_miniwdl_inputs/*" work/ "$S3_TASK_DIR/work/" UNSTAGE_RET=$? [[ $UNSTAGE_RET -eq 0 ]] && break [[ $_unstage_attempt -lt 3 ]] && { log "Unstage attempt $_unstage_attempt/3 failed (ret=$UNSTAGE_RET), retrying in ~30s..." sleep $((30 + RANDOM % 10)) } done # Record empty directories so the runner can preserve them as S3 directory # markers. s5cmd sync only uploads objects — empty dirs vanish without this. # Runs after sync intentionally: (1) empty_dirs.txt is not part of the synced # work artifacts, and (2) we capture the true post-upload filesystem state. find work/ -type d -empty ! -path "work/_miniwdl_inputs*" > "$TASK_DIR/empty_dirs.txt" 2>/dev/null || true if [ -s "$TASK_DIR/empty_dirs.txt" ]; then $S5CMD cp "$TASK_DIR/empty_dirs.txt" "$S3_TASK_DIR/empty_dirs.txt" || true fi UNSTAGE_MS=$(( $(_ms) - _START_MS - STAGE_MS - TASK_MS )) TOTAL_MS=$(( $(_ms) - _START_MS )) cat > "$TASK_DIR/timing.json" << TIMING {"stage_ms":$STAGE_MS,"task_ms":$TASK_MS,"unstage_ms":$UNSTAGE_MS,"total_ms":$TOTAL_MS,"exit_code":$exit_code,"unstage_ret":$UNSTAGE_RET} TIMING $S5CMD cp "$TASK_DIR/timing.json" "$S3_TASK_DIR/timing.json" || true [ -f "$TASK_DIR/metrics.json" ] && $S5CMD cp "$TASK_DIR/metrics.json" "$S3_TASK_DIR/metrics.json" || true # Task failure takes priority; if task succeeded but unstage failed, use unstage code local final_code=$exit_code [[ $final_code -eq 0 && $UNSTAGE_RET -ne 0 ]] && final_code=$UNSTAGE_RET echo $final_code > "$TASK_DIR/exit_code" $S5CMD cp "$TASK_DIR/exit_code" "$S3_TASK_DIR/exit_code" || true log "Done exit_code=$exit_code (${UNSTAGE_MS}ms unstage, ${TOTAL_MS}ms total)" } trap cleanup EXIT trap miniwdl_on_term TERM INT USR2 _START_MS=$(_ms) log "Staging: downloading command and 5 inputs" $S5CMD cp "$S3_TASK_DIR/command" "$TASK_DIR/command" mkdir -p "/tmp/miniwdl_task/work/_miniwdl_inputs/0" mkdir -p "$TASK_DIR/work" # Stage inputs with outer retry on S3 throttling (503 SlowDown on same-prefix # reads). Fails the pod after 3 attempts — WDL task-level maxRetries provides # the final backstop. _do_stage() { $S5CMD run << STAGE_FILES cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/cyp21_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/cyp21_hg38.bed" cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/51/51da5025fc287e6f74ae5945dedcbe5b/work/downloaded.bam.bai" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/downloaded.bam.bai" cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/smn_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/smn_hg38.bed" cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/gba_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/gba_hg38.bed" cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/51/51da5025fc287e6f74ae5945dedcbe5b/work/downloaded.bam" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/downloaded.bam" STAGE_FILES local rc=$? if [[ $rc -ne 0 ]]; then return $rc; fi # no directories to sync return 0 } STAGE_RET=1 for _stage_attempt in 1 2 3; do # `|| STAGE_RET=$?` suspends set -e inside _do_stage and captures the code STAGE_RET=0 _do_stage || STAGE_RET=$? [[ $STAGE_RET -eq 0 ]] && break [[ $_stage_attempt -lt 3 ]] && { log "Stage attempt $_stage_attempt/3 failed (ret=$STAGE_RET), retrying in ~30s..." sleep $((30 + RANDOM % 10)) } done [[ $STAGE_RET -ne 0 ]] && { log "Stage failed after 3 attempts, aborting"; exit $STAGE_RET; } STAGE_MS=$(( $(_ms) - _START_MS )) log "Staging completed (5 inputs, ${STAGE_MS}ms) s5cmd=workers:$_s5_numworkers,concurrency:$_s5_concurrency,cpus:$POD_CPUS" # Everything after staging is best-effort — metrics collection and task command # failures must be captured, not abort the script. set +e # Start metrics collection (snapshots /proc, launches background memory poller) miniwdl_trace_start cd "$TASK_DIR/work" # fd-swap tee: stdout/stderr captured to files AND passed to outer layer for # kubectl logs. Background + wait allows SIGTERM to interrupt immediately. OUT="$TASK_DIR/stdout.txt" ERR="$TASK_DIR/stderr.txt" (set -o pipefail; (/bin/bash "$TASK_DIR/command" | tee "$OUT") 3>&1 1>&2 2>&3 | tee "$ERR") & pid=$! # Launch mem_watch background poller on the task subshell, not the wrapper PID. # Must start AFTER task so $pid is available (matches Nextflow's nxf_trace_linux pattern). if [ "${METRICS_ENABLED:-0}" = "1" ]; then mem_fd=$(miniwdl_fd) eval "exec $mem_fd> >(miniwdl_mem_watch $pid $MEM_FILE)" mem_proc=$! trap 'kill $mem_proc 2>/dev/null' ERR fi wait $pid; EXIT_CODE=$? TASK_MS=$(( $(_ms) - _START_MS - STAGE_MS )) log "Task finished exit_code=$EXIT_CODE (${TASK_MS}ms)" exit $EXIT_CODE