#!/bin/bash
set -euo pipefail
# Detect pod CPU. cgroup v2 (cpu.max) then v1 fallback; "max" or "-1" means
# no limit set, fall back to nproc.
if read -r _q _p < /sys/fs/cgroup/cpu.max 2>/dev/null && [ "$_q" != "max" ]; then
POD_CPUS=$((_q / _p))
elif _q=$(cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us 2>/dev/null) && [ -n "$_q" ] && [ "$_q" -gt 0 ]; then
POD_CPUS=$((_q / $(cat /sys/fs/cgroup/cpu/cpu.cfs_period_us)))
else
POD_CPUS=$(nproc)
fi
[ "$POD_CPUS" -lt 1 ] && POD_CPUS=1
# S3 transfer parallelism: numworkers = parallel files in flight,
# concurrency = parallel multipart parts per file. Cap small pods at -c 5
# to stay gentle on S3 prefix request rate when many pods stage at once.
_s5_numworkers=256
[ "$POD_CPUS" -le 4 ] && _s5_concurrency=5 || _s5_concurrency=10
S5CMD="/opt/s5cmd/bin/s5cmd --log info --numworkers $_s5_numworkers"
S5_CP_FLAGS="-c $_s5_concurrency"
S3_TASK_DIR="s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/2c/2c973df3596be9d790dcdcd39c517f9c"
TASK_DIR="/tmp/miniwdl_task"
EXIT_CODE=1 # Default to failure
UNSTAGE_RET=0
pid=""
STAGE_MS=0
TASK_MS=0
TRACE_START=0
METRICS_ENABLED=0
_ms() { date +%s%3N; }
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] [miniwdl-eks.task] $1" >&2; }
miniwdl_date() { local p=${1:-%s%3N}; date +"$p" 2>/dev/null || date +"$p"; }
miniwdl_tree() {
local pid=$1
declare -a ALL_CHILDREN
while read P PP; do
ALL_CHILDREN[$PP]+=" $P"
done < <(ps -e -o pid= -o ppid=)
pstat() {
local STATUS=$(2>/dev/null < /proc/$1/status grep -E 'Vm|ctxt')
if [ $? = 0 ]; then
local x_vsz=$(echo "$STATUS" | grep VmSize | awk '{print $2}' || echo -n '0')
local x_rss=$(echo "$STATUS" | grep VmRSS | awk '{print $2}' || echo -n '0')
local x_peak=$(echo "$STATUS" | grep -E 'VmPeak|VmHWM' | sed 's/^.*:\s*//' | sed 's/[\sa-zA-Z]*$//' | tr '\n' ' ' || echo -n '0 0')
local x_pmem=$(awk -v rss=$x_rss -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo -n '0')
local vol_ctxt=$(echo "$STATUS" | grep '\bvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
local inv_ctxt=$(echo "$STATUS" | grep '\bnonvoluntary_ctxt_switches' | awk '{print $2}' || echo -n '0')
cpu_stat[$1]="$1 $x_pmem $x_vsz $x_rss $x_peak $vol_ctxt $inv_ctxt"
fi
}
pwalk() {
pstat $1
for i in ${ALL_CHILDREN[$1]:=}; do pwalk $i; done
}
pwalk $1
}
miniwdl_pstat() {
cpu_stat=()
miniwdl_tree $1
declare -a sum=(0 0 0 0 0 0 0 0)
local pid; local i
for pid in "${!cpu_stat[@]}"; do
local row=(${cpu_stat[pid]})
for i in "${!row[@]}"; do
[ $i != 0 ] && sum[i]=$((sum[i]+row[i]))
done
done
for i in {1..7}; do
if [ ${sum[i]} -lt ${cpu_peak[i]} ]; then
sum[i]=${cpu_peak[i]}
else
cpu_peak[i]=${sum[i]}
fi
done
miniwdl_pstat_ret=(${sum[*]})
}
# Background memory/context-switch poller. Adaptive interval: 1s -> 5s -> 30s.
# Writes peak values to a temp file when signaled to stop.
# RSS uses cgroup memory stats (no CoW double-counting across forks).
# Context switches and vmem still use per-process /proc tree walk.
miniwdl_mem_watch() {
local pid=$1
local mem_file=$2
local count=0
declare -a cpu_stat=(0 0 0 0 0 0 0 0)
declare -a cpu_peak=(0 0 0 0 0 0 0 0)
local mem_tot=$(< /proc/meminfo grep MemTotal | awk '{print $2}')
local timeout; local DONE; local STOP=''
local cg_peak_kb=0
local cg_last_kb=0
while true; do
miniwdl_pstat $pid
# Read cgroup RSS (anon memory, bytes) — no CoW double-counting
local cg_kb=$(( $(awk '/^anon / {print $2; exit}' /sys/fs/cgroup/memory.stat 2>/dev/null || echo 0) / 1024 ))
cg_last_kb=$cg_kb
[ $cg_kb -gt $cg_peak_kb ] && cg_peak_kb=$cg_kb
if [ $count -lt 10 ]; then timeout=1
elif [ $count -lt 120 ]; then timeout=5
else timeout=30
fi
read -t $timeout -r DONE || true
[[ $DONE ]] && break
if [ ! -e /proc/$pid ]; then
[ ! $STOP ] && STOP=$(miniwdl_date)
[ $(($(miniwdl_date)-STOP)) -gt 10000 ] && break
fi
count=$((count+1))
done
# Write peak values to temp file (KB units)
# rss: last-sampled cgroup anon memory; peak_rss: highest observed across all polls
# %mem: peak-based (matches peak_rss, not rss)
# vmem and peak_vmem use per-process tree walk (no cgroup equivalent)
# Struct: %mem vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt
local cg_pmem=$(awk -v rss=$cg_peak_kb -v mem_tot=$mem_tot 'BEGIN {printf "%.0f", rss/mem_tot*100*10}' || echo 0)
echo "$cg_pmem ${miniwdl_pstat_ret[2]} $cg_last_kb ${miniwdl_pstat_ret[4]} $cg_peak_kb ${miniwdl_pstat_ret[6]} ${miniwdl_pstat_ret[7]}" > "$mem_file"
}
miniwdl_fd() {
local FD=11
while [ -e /proc/$$/fd/$FD ]; do FD=$((FD+1)); done
echo $FD
}
# Snapshot /proc before task, launch mem_watch background poller.
miniwdl_trace_start() {
local pid=$$
if ! command -v ps &>/dev/null; then
log "Warning: 'ps' not found, skipping metrics collection"
METRICS_ENABLED=0
return
fi
METRICS_ENABLED=1
MEM_FILE="$TASK_DIR/.mem_stats"
NUM_CPUS=$(< /proc/cpuinfo grep '^processor' -c)
CPU_MODEL=$(< /proc/cpuinfo grep '^model name' | head -n 1 | awk 'BEGIN{FS="\t: "} { print $2 }')
TOT_TIME0=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
CPU_TIME0=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X')
IO_STAT0=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
TRACE_START=$(miniwdl_date)
}
# Snapshot /proc after task, signal mem_watch, compute deltas, write metrics.json.
miniwdl_trace_end() {
[ "${METRICS_ENABLED:-0}" = "0" ] && return
[ "${TRACE_START:-0}" = "0" ] && return # trace_start never ran
local pid=$$
local end_millis=$(miniwdl_date)
local tot_time1=$(grep '^cpu ' /proc/stat | awk '{sum=$2+$3+$4+$5+$6+$7+$8+$9; printf "%.0f",sum}')
local cpu_time1=$(2>/dev/null < /proc/$pid/stat awk '{printf "%.0f", $16+$17 }' || echo -n 'X')
local cpu_pct=$(awk -v p1=$cpu_time1 -v p0=$CPU_TIME0 -v t1=$tot_time1 -v t0=$TOT_TIME0 -v n=$NUM_CPUS \
'BEGIN { pct=(p1-p0)/(t1-t0)*100*n; printf("%.1f", pct>0 ? pct : 0) }')
local io_stat1=($(2>/dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
local i
for i in {0..5}; do io_stat1[i]=$((io_stat1[i]-${IO_STAT0[i]:-0})); done
local wall_time=$((end_millis-TRACE_START))
# Signal mem_watch to stop and wait for it
if [ "${mem_proc:-}" ]; then
[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
wait $mem_proc 2>/dev/null || true
while [ -e /proc/$mem_proc ]; do sleep 0.1; done
fi
# Read mem_watch results (KB units) and convert to bytes
local mem_pct=0 vmem=0 rss=0 peak_vmem=0 peak_rss=0 vol_ctxt=0 inv_ctxt=0
if [ -f "$MEM_FILE" ]; then
read mem_pct vmem rss peak_vmem peak_rss vol_ctxt inv_ctxt < "$MEM_FILE"
# Convert KB to bytes
vmem=$((vmem * 1024))
rss=$((rss * 1024))
peak_vmem=$((peak_vmem * 1024))
peak_rss=$((peak_rss * 1024))
fi
cat > "$TASK_DIR/metrics.json" << METRICS_EOF
{"realtime":$wall_time,"cpuPercent":$cpu_pct,"cpuModel":"$CPU_MODEL","memPercent":$mem_pct,"vmem":$vmem,"rss":$rss,"peakVmem":$peak_vmem,"peakRss":$peak_rss,"rchar":${io_stat1[0]},"wchar":${io_stat1[1]},"readBytes":${io_stat1[4]},"writeBytes":${io_stat1[5]},"volCtxt":$vol_ctxt,"invCtxt":$inv_ctxt}
METRICS_EOF
log "Metrics: cpu=${cpu_pct}% rss=$((rss/1048576))MB peak_rss=$((peak_rss/1048576))MB wall=${wall_time}ms"
}
# Recursively kill a process and all its descendants.
miniwdl_kill() {
declare -a children
while read P PP; do
children[$PP]+=" $P"
done < <(ps -e -o pid= -o ppid=)
_kill() {
[[ $1 != $$ ]] && kill $1 2>/dev/null || true
for i in ${children[$1]:=}; do _kill $i; done
}
_kill $1
}
# On SIGTERM/SIGINT (K8s pod termination, scale-down), kill the task process tree.
# The EXIT trap (cleanup) still fires after this to upload logs and work directory.
miniwdl_on_term() {
set +e
[[ "$pid" ]] && miniwdl_kill $pid
}
# Trap to ensure we upload logs and work directory on ANY exit (success or failure)
cleanup() {
set +eu
local exit_code=$EXIT_CODE
# Collect metrics (best-effort, before unstage so we capture task-only values)
miniwdl_trace_end 2>/dev/null || true
$S5CMD cp "$TASK_DIR/stdout.txt" "$S3_TASK_DIR/stdout.txt" || true
$S5CMD cp "$TASK_DIR/stderr.txt" "$S3_TASK_DIR/stderr.txt" || true
cd "$TASK_DIR"
for _unstage_attempt in 1 2 3; do
$S5CMD sync $S5_CP_FLAGS --exclude "_miniwdl_inputs/*" work/ "$S3_TASK_DIR/work/"
UNSTAGE_RET=$?
[[ $UNSTAGE_RET -eq 0 ]] && break
[[ $_unstage_attempt -lt 3 ]] && {
log "Unstage attempt $_unstage_attempt/3 failed (ret=$UNSTAGE_RET), retrying in ~30s..."
sleep $((30 + RANDOM % 10))
}
done
# Record empty directories so the runner can preserve them as S3 directory
# markers. s5cmd sync only uploads objects — empty dirs vanish without this.
# Runs after sync intentionally: (1) empty_dirs.txt is not part of the synced
# work artifacts, and (2) we capture the true post-upload filesystem state.
find work/ -type d -empty ! -path "work/_miniwdl_inputs*" > "$TASK_DIR/empty_dirs.txt" 2>/dev/null || true
if [ -s "$TASK_DIR/empty_dirs.txt" ]; then
$S5CMD cp "$TASK_DIR/empty_dirs.txt" "$S3_TASK_DIR/empty_dirs.txt" || true
fi
UNSTAGE_MS=$(( $(_ms) - _START_MS - STAGE_MS - TASK_MS ))
TOTAL_MS=$(( $(_ms) - _START_MS ))
cat > "$TASK_DIR/timing.json" << TIMING
{"stage_ms":$STAGE_MS,"task_ms":$TASK_MS,"unstage_ms":$UNSTAGE_MS,"total_ms":$TOTAL_MS,"exit_code":$exit_code,"unstage_ret":$UNSTAGE_RET}
TIMING
$S5CMD cp "$TASK_DIR/timing.json" "$S3_TASK_DIR/timing.json" || true
[ -f "$TASK_DIR/metrics.json" ] && $S5CMD cp "$TASK_DIR/metrics.json" "$S3_TASK_DIR/metrics.json" || true
# Task failure takes priority; if task succeeded but unstage failed, use unstage code
local final_code=$exit_code
[[ $final_code -eq 0 && $UNSTAGE_RET -ne 0 ]] && final_code=$UNSTAGE_RET
echo $final_code > "$TASK_DIR/exit_code"
$S5CMD cp "$TASK_DIR/exit_code" "$S3_TASK_DIR/exit_code" || true
log "Done exit_code=$exit_code (${UNSTAGE_MS}ms unstage, ${TOTAL_MS}ms total)"
}
trap cleanup EXIT
trap miniwdl_on_term TERM INT USR2
_START_MS=$(_ms)
log "Staging: downloading command and 5 inputs"
$S5CMD cp "$S3_TASK_DIR/command" "$TASK_DIR/command"
mkdir -p "/tmp/miniwdl_task/work/_miniwdl_inputs/0"
mkdir -p "$TASK_DIR/work"
# Stage inputs with outer retry on S3 throttling (503 SlowDown on same-prefix
# reads). Fails the pod after 3 attempts — WDL task-level maxRetries provides
# the final backstop.
_do_stage() {
$S5CMD run << STAGE_FILES
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/ref/Homo_sapiens_assembly38_no_alt_contigs_v1.masked_CBSL/Homo_sapiens_assembly38_no_alt_contigs_v1.masked_CBSL.fasta" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/Homo_sapiens_assembly38_no_alt_contigs_v1.masked_CBSL.fasta"
cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/2b/2b7c0d132c92047c333171bb7ed713cb/work/cov_by_lane_8.tsv" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/cov_by_lane_8.tsv"
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/cyp21_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/cyp21_hg38.bed"
cp ${S5_CP_FLAGS} "s3://natera-rnd-pltf-dev-miniwdl-scratch-01/work/2b/2b7c0d132c92047c333171bb7ed713cb/work/mapped_reads_lane_8.tsv" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/mapped_reads_lane_8.tsv"
cp ${S5_CP_FLAGS} "s3://natera-platform-sandbox/pipeline-resources/germline_pipeline/allele_specific_expression_beds/hg38/cyp21_exon_info_hg38.bed" "/tmp/miniwdl_task/work/_miniwdl_inputs/0/cyp21_exon_info_hg38.bed"
STAGE_FILES
local rc=$?
if [[ $rc -ne 0 ]]; then return $rc; fi
# no directories to sync
return 0
}
STAGE_RET=1
for _stage_attempt in 1 2 3; do
# `|| STAGE_RET=$?` suspends set -e inside _do_stage and captures the code
STAGE_RET=0
_do_stage || STAGE_RET=$?
[[ $STAGE_RET -eq 0 ]] && break
[[ $_stage_attempt -lt 3 ]] && {
log "Stage attempt $_stage_attempt/3 failed (ret=$STAGE_RET), retrying in ~30s..."
sleep $((30 + RANDOM % 10))
}
done
[[ $STAGE_RET -ne 0 ]] && { log "Stage failed after 3 attempts, aborting"; exit $STAGE_RET; }
STAGE_MS=$(( $(_ms) - _START_MS ))
log "Staging completed (5 inputs, ${STAGE_MS}ms) s5cmd=workers:$_s5_numworkers,concurrency:$_s5_concurrency,cpus:$POD_CPUS"
# Everything after staging is best-effort — metrics collection and task command
# failures must be captured, not abort the script.
set +e
# Start metrics collection (snapshots /proc, launches background memory poller)
miniwdl_trace_start
cd "$TASK_DIR/work"
# fd-swap tee: stdout/stderr captured to files AND passed to outer layer for
# kubectl logs. Background + wait allows SIGTERM to interrupt immediately.
OUT="$TASK_DIR/stdout.txt"
ERR="$TASK_DIR/stderr.txt"
(set -o pipefail; (/bin/bash "$TASK_DIR/command" | tee "$OUT") 3>&1 1>&2 2>&3 | tee "$ERR") &
pid=$!
# Launch mem_watch background poller on the task subshell, not the wrapper PID.
# Must start AFTER task so $pid is available (matches Nextflow's nxf_trace_linux pattern).
if [ "${METRICS_ENABLED:-0}" = "1" ]; then
mem_fd=$(miniwdl_fd)
eval "exec $mem_fd> >(miniwdl_mem_watch $pid $MEM_FILE)"
mem_proc=$!
trap 'kill $mem_proc 2>/dev/null' ERR
fi
wait $pid; EXIT_CODE=$?
TASK_MS=$(( $(_ms) - _START_MS - STAGE_MS ))
log "Task finished exit_code=$EXIT_CODE (${TASK_MS}ms)"
exit $EXIT_CODE