diff --git a/benchmarks/tpc/.gitignore b/benchmarks/tpc/.gitignore index 477aaef0c3..09681b3169 100644 --- a/benchmarks/tpc/.gitignore +++ b/benchmarks/tpc/.gitignore @@ -1,2 +1,3 @@ +data *.json -*.png \ No newline at end of file +*.png diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index fac54a7894..e9e9f36925 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -26,6 +26,14 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C [Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html +## Deployment options + +| Method | Guide | Use case | +| ------ | ----- | -------- | +| Standalone | This page | Direct Spark standalone cluster | +| Docker Compose | [docs/docker.md](docs/docker.md) | Isolated local cluster via containers | +| Kubernetes | [docs/kubernetes.md](docs/kubernetes.md) | Spark on K8s with S3 results | + ## Setup TPC queries are bundled in `benchmarks/tpc/queries/` (derived from TPC-H/DS under the TPC Fair Use Policy). @@ -193,172 +201,6 @@ physical plan output. | `--catalog` | No | `local` | Iceberg catalog name | | `--database` | No | benchmark name | Database name for the tables | -## Running with Docker - -A Docker Compose setup is provided in `infra/docker/` for running benchmarks in an isolated -Spark standalone cluster. The Docker image supports both **Linux (amd64)** and **macOS (arm64)** -via architecture-agnostic Java symlinks created at build time. - -### Build the image - -The image must be built for the correct platform to match the native libraries in the -engine JARs (e.g. Comet bundles `libcomet.so` for a specific OS/arch). - -```shell -docker build -t comet-bench -f benchmarks/tpc/infra/docker/Dockerfile . -``` - -### Building a compatible Comet JAR - -The Comet JAR contains platform-specific native libraries (`libcomet.so` / `libcomet.dylib`). -A JAR built on the host may not work inside the Docker container due to OS, architecture, -or glibc version mismatches. Use `Dockerfile.build-comet` to build a JAR with compatible -native libraries: - -- **macOS (Apple Silicon):** The host JAR contains `darwin/aarch64` libraries which - won't work in Linux containers. You **must** use the build Dockerfile. -- **Linux:** If your host glibc version differs from the container's, the native library - will fail to load with a `GLIBC_x.xx not found` error. The build Dockerfile uses - Ubuntu 20.04 (glibc 2.31) for broad compatibility. Use it if you see - `UnsatisfiedLinkError` mentioning glibc when running benchmarks. - -```shell -mkdir -p output -docker build -t comet-builder \ - -f benchmarks/tpc/infra/docker/Dockerfile.build-comet . -docker run --rm -v $(pwd)/output:/output comet-builder -export COMET_JAR=$(pwd)/output/comet-spark-spark3.5_2.12-*.jar -``` - -### Platform notes - -**macOS (Apple Silicon):** Docker Desktop is required. - -- **Memory:** Docker Desktop defaults to a small memory allocation (often 8 GB) which - is not enough for Spark benchmarks. Go to **Docker Desktop > Settings > Resources > - Memory** and increase it to at least 48 GB (each worker requests 16 GB for its executor - plus overhead, and the driver needs 8 GB). Without enough memory, executors will be - OOM-killed (exit code 137). -- **File Sharing:** You may need to add your data directory (e.g. `/opt`) to - **Docker Desktop > Settings > Resources > File Sharing** before mounting host volumes. - -**Linux (amd64):** Docker uses cgroup memory limits directly without a VM layer. No -special Docker configuration is needed, but you may still need to build the Comet JAR -using `Dockerfile.build-comet` (see above) if your host glibc version doesn't match -the container's. - -The Docker image auto-detects the container architecture (amd64/arm64) and sets up -arch-agnostic Java symlinks. The compose file uses `BENCH_JAVA_HOME` (not `JAVA_HOME`) -to avoid inheriting the host's Java path into the container. - -### Start the cluster - -Set environment variables pointing to your host paths, then start the Spark master and -two workers: - -```shell -export DATA_DIR=/mnt/bigdata/tpch/sf100 -export RESULTS_DIR=/tmp/bench-results -export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar - -mkdir -p $RESULTS_DIR/spark-events -docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d -``` - -Set `COMET_JAR`, `GLUTEN_JAR`, or `ICEBERG_JAR` to the host path of the engine JAR you -want to use. Each JAR is mounted individually into the container, so you can easily switch -between versions by changing the path and restarting. - -### Run benchmarks - -Use `docker compose run --rm` to execute benchmarks. The `--rm` flag removes the -container when it exits, preventing port conflicts on subsequent runs. Pass -`--no-restart` since the cluster is already managed by Compose, and `--output /results` -so that output files land in the mounted results directory: - -```shell -docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ - run --rm -p 4040:4040 bench \ - python3 /opt/benchmarks/run.py \ - --engine comet --benchmark tpch --output /results --no-restart -``` - -The `-p 4040:4040` flag exposes the Spark Application UI on the host. The following -UIs are available during a benchmark run: - -| UI | URL | -| ----------------- | ---------------------- | -| Spark Master | http://localhost:8080 | -| Worker 1 | http://localhost:8081 | -| Worker 2 | http://localhost:8082 | -| Spark Application | http://localhost:4040 | -| History Server | http://localhost:18080 | - -> **Note:** The Master UI links to the Application UI using the container's internal -> hostname, which is not reachable from the host. Use `http://localhost:4040` directly -> to access the Application UI. - -The Spark Application UI is only available while a benchmark is running. To inspect -completed runs, uncomment the `history-server` service in `docker-compose.yml` and -restart the cluster. The History Server reads event logs from `$RESULTS_DIR/spark-events`. - -For Gluten (requires Java 8), you must restart the **entire cluster** with `JAVA_HOME` -set so that all services (master, workers, and bench) use Java 8: - -```shell -export BENCH_JAVA_HOME=/usr/lib/jvm/java-8-openjdk -docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml down -docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d - -docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ - run --rm bench \ - python3 /opt/benchmarks/run.py \ - --engine gluten --benchmark tpch --output /results --no-restart -``` - -> **Important:** Only passing `-e JAVA_HOME=...` to the `bench` container is not -> sufficient -- the workers also need Java 8 or Gluten will fail at runtime with -> `sun.misc.Unsafe` errors. Unset `BENCH_JAVA_HOME` (or switch it back to Java 17) -> and restart the cluster before running Comet or Spark benchmarks. - -### Memory limits - -Two compose files are provided for different hardware profiles: - -| File | Workers | Total memory | Use case | -| --------------------------- | ------- | ------------ | ------------------------------ | -| `docker-compose.yml` | 2 | ~74 GB | SF100+ on a workstation/server | -| `docker-compose-laptop.yml` | 1 | ~12 GB | SF1–SF10 on a laptop | - -**`docker-compose.yml`** (workstation default): - -| Container | Container limit (`mem_limit`) | Spark JVM allocation | -| -------------- | ----------------------------- | ------------------------- | -| spark-worker-1 | 32 GB | 16 GB executor + overhead | -| spark-worker-2 | 32 GB | 16 GB executor + overhead | -| bench (driver) | 10 GB | 8 GB driver | -| **Total** | **74 GB** | | - -Configure via environment variables: `WORKER_MEM_LIMIT` (default: 32g per worker), -`BENCH_MEM_LIMIT` (default: 10g), `WORKER_MEMORY` (default: 16g, Spark executor memory), -`WORKER_CORES` (default: 8). - -### Running on a laptop with small scale factors - -For local development or testing with small scale factors (e.g. SF1 or SF10), use the -laptop compose file which runs a single worker with reduced memory: - -```shell -docker compose -f benchmarks/tpc/infra/docker/docker-compose-laptop.yml up -d -``` - -This starts one worker (4 GB executor inside an 8 GB container) and a 4 GB bench -container, totaling approximately **12 GB** of memory. - -The benchmark scripts request 2 executor instances and 16 max cores by default -(`run.py`). Spark will simply use whatever resources are available on the single worker, -so no script changes are needed. - ### Comparing Parquet vs Iceberg performance Run both benchmarks and compare: diff --git a/benchmarks/tpc/docs/docker.md b/benchmarks/tpc/docs/docker.md new file mode 100644 index 0000000000..0c3937e68e --- /dev/null +++ b/benchmarks/tpc/docs/docker.md @@ -0,0 +1,186 @@ + + +# Running Benchmarks with Docker Compose + +A Docker Compose setup is provided in `infra/docker/` for running benchmarks in an isolated +Spark standalone cluster. The Docker image supports both **Linux (amd64)** and **macOS (arm64)** +via architecture-agnostic Java symlinks created at build time. + +See the [main README](../README.md) for general benchmark usage and options. + +## Build the image + +The image must be built for the correct platform to match the native libraries in the +engine JARs (e.g. Comet bundles `libcomet.so` for a specific OS/arch). + +```shell +docker build -t comet-bench -f benchmarks/tpc/infra/docker/Dockerfile . +``` + +## Building a compatible Comet JAR + +The Comet JAR contains platform-specific native libraries (`libcomet.so` / `libcomet.dylib`). +A JAR built on the host may not work inside the Docker container due to OS, architecture, +or glibc version mismatches. Use `Dockerfile.build-comet` to build a JAR with compatible +native libraries: + +- **macOS (Apple Silicon):** The host JAR contains `darwin/aarch64` libraries which + won't work in Linux containers. You **must** use the build Dockerfile. +- **Linux:** If your host glibc version differs from the container's, the native library + will fail to load with a `GLIBC_x.xx not found` error. The build Dockerfile uses + Ubuntu 20.04 (glibc 2.31) for broad compatibility. Use it if you see + `UnsatisfiedLinkError` mentioning glibc when running benchmarks. + +```shell +mkdir -p output +docker build -t comet-builder \ + -f benchmarks/tpc/infra/docker/Dockerfile.build-comet . +docker run --rm -v $(pwd)/output:/output comet-builder +export COMET_JAR=$(pwd)/output/comet-spark-spark3.5_2.12-*.jar +``` + +## Platform notes + +**macOS (Apple Silicon):** Docker Desktop is required. + +- **Memory:** Docker Desktop defaults to a small memory allocation (often 8 GB) which + is not enough for Spark benchmarks. Go to **Docker Desktop > Settings > Resources > + Memory** and increase it to at least 48 GB (each worker requests 16 GB for its executor + plus overhead, and the driver needs 8 GB). Without enough memory, executors will be + OOM-killed (exit code 137). +- **File Sharing:** You may need to add your data directory (e.g. `/opt`) to + **Docker Desktop > Settings > Resources > File Sharing** before mounting host volumes. + +**Linux (amd64):** Docker uses cgroup memory limits directly without a VM layer. No +special Docker configuration is needed, but you may still need to build the Comet JAR +using `Dockerfile.build-comet` (see above) if your host glibc version doesn't match +the container's. + +The Docker image auto-detects the container architecture (amd64/arm64) and sets up +arch-agnostic Java symlinks. The compose file uses `BENCH_JAVA_HOME` (not `JAVA_HOME`) +to avoid inheriting the host's Java path into the container. + +## Start the cluster + +Set environment variables pointing to your host paths, then start the Spark master and +two workers: + +```shell +export DATA_DIR=/mnt/bigdata/tpch/sf100 +export RESULTS_DIR=/tmp/bench-results +export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar + +mkdir -p $RESULTS_DIR/spark-events +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d +``` + +Set `COMET_JAR`, `GLUTEN_JAR`, or `ICEBERG_JAR` to the host path of the engine JAR you +want to use. Each JAR is mounted individually into the container, so you can easily switch +between versions by changing the path and restarting. + +## Run benchmarks + +Use `docker compose run --rm` to execute benchmarks. The `--rm` flag removes the +container when it exits, preventing port conflicts on subsequent runs. Pass +`--no-restart` since the cluster is already managed by Compose, and `--output /results` +so that output files land in the mounted results directory: + +```shell +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ + run --rm -p 4040:4040 bench \ + python3 /opt/benchmarks/run.py \ + --engine comet --benchmark tpch --output /results --no-restart +``` + +The `-p 4040:4040` flag exposes the Spark Application UI on the host. The following +UIs are available during a benchmark run: + +| UI | URL | +| ----------------- | ---------------------- | +| Spark Master | http://localhost:8080 | +| Worker 1 | http://localhost:8081 | +| Worker 2 | http://localhost:8082 | +| Spark Application | http://localhost:4040 | +| History Server | http://localhost:18080 | + +> **Note:** The Master UI links to the Application UI using the container's internal +> hostname, which is not reachable from the host. Use `http://localhost:4040` directly +> to access the Application UI. + +The Spark Application UI is only available while a benchmark is running. To inspect +completed runs, uncomment the `history-server` service in `docker-compose.yml` and +restart the cluster. The History Server reads event logs from `$RESULTS_DIR/spark-events`. + +For Gluten (requires Java 8), you must restart the **entire cluster** with `JAVA_HOME` +set so that all services (master, workers, and bench) use Java 8: + +```shell +export BENCH_JAVA_HOME=/usr/lib/jvm/java-8-openjdk +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml down +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d + +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ + run --rm bench \ + python3 /opt/benchmarks/run.py \ + --engine gluten --benchmark tpch --output /results --no-restart +``` + +> **Important:** Only passing `-e JAVA_HOME=...` to the `bench` container is not +> sufficient -- the workers also need Java 8 or Gluten will fail at runtime with +> `sun.misc.Unsafe` errors. Unset `BENCH_JAVA_HOME` (or switch it back to Java 17) +> and restart the cluster before running Comet or Spark benchmarks. + +## Memory limits + +Two compose files are provided for different hardware profiles: + +| File | Workers | Total memory | Use case | +| --------------------------- | ------- | ------------ | ------------------------------ | +| `docker-compose.yml` | 2 | ~74 GB | SF100+ on a workstation/server | +| `docker-compose-laptop.yml` | 1 | ~12 GB | SF1–SF10 on a laptop | + +**`docker-compose.yml`** (workstation default): + +| Container | Container limit (`mem_limit`) | Spark JVM allocation | +| -------------- | ----------------------------- | ------------------------- | +| spark-worker-1 | 32 GB | 16 GB executor + overhead | +| spark-worker-2 | 32 GB | 16 GB executor + overhead | +| bench (driver) | 10 GB | 8 GB driver | +| **Total** | **74 GB** | | + +Configure via environment variables: `WORKER_MEM_LIMIT` (default: 32g per worker), +`BENCH_MEM_LIMIT` (default: 10g), `WORKER_MEMORY` (default: 16g, Spark executor memory), +`WORKER_CORES` (default: 8). + +## Running on a laptop with small scale factors + +For local development or testing with small scale factors (e.g. SF1 or SF10), use the +laptop compose file which runs a single worker with reduced memory: + +```shell +docker compose -f benchmarks/tpc/infra/docker/docker-compose-laptop.yml up -d +``` + +This starts one worker (4 GB executor inside an 8 GB container) and a 4 GB bench +container, totaling approximately **12 GB** of memory. + +The benchmark scripts request 2 executor instances and 16 max cores by default +(`run.py`). Spark will simply use whatever resources are available on the single worker, +so no script changes are needed. diff --git a/benchmarks/tpc/docs/kubernetes.md b/benchmarks/tpc/docs/kubernetes.md new file mode 100644 index 0000000000..57bf4eb1e7 --- /dev/null +++ b/benchmarks/tpc/docs/kubernetes.md @@ -0,0 +1,119 @@ + + +# Running Benchmarks on Kubernetes + +The benchmark infrastructure supports Spark's native Kubernetes integration. Instead of +a standalone Spark cluster, the driver and executors run as K8s pods. Results are written +to S3 (or S3-compatible storage) since the driver pod has no shared local filesystem with +the submitter. + +See the [main README](../README.md) for general benchmark usage and options. + +## Prerequisites + +- A Kubernetes cluster with `kubectl` configured +- The benchmark Docker image pushed to a registry accessible from the cluster +- A service account with permissions to create pods (see RBAC setup below) +- An S3 bucket (or S3-compatible storage) for benchmark results +- `SPARK_HOME` pointing to a local Spark installation (for `spark-submit`) + +## Setup + +**1. Create RBAC resources:** + +```shell +kubectl apply -f benchmarks/tpc/infra/k8s/rbac.yaml -n +``` + +**2. Build and push the Docker image:** + +Build the Comet JAR on Linux (native libraries must match the container OS) and bake it +into the image: + +```shell +# Build the Comet JAR (must be built on Linux for K8s pods) +make release + +docker build \ + --build-arg COMET_JAR=spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar \ + -t /comet-bench \ + -f benchmarks/tpc/infra/docker/Dockerfile . +docker push /comet-bench +``` + +The JAR is installed at `/jars/comet.jar` inside the image. The `comet.toml` engine config +defaults `COMET_JAR` to this path, so no extra environment variable is needed. + +**3. Set environment variables:** + +```shell +export SPARK_HOME=/opt/spark-3.5.3-bin-hadoop3/ +export SPARK_MASTER=k8s://https://:6443 +export BENCH_IMAGE=/comet-bench +export TPCH_DATA=s3a://my-bucket/tpch/sf100/ +``` + +## Running benchmarks + +```shell +python3 run.py --engine comet --benchmark tpch \ + --output s3a://my-bucket/bench-results --no-restart +``` + +The runner auto-detects K8s mode from the `SPARK_MASTER` prefix (`k8s://`) and: +- Uses `--deploy-mode cluster` (driver runs as a K8s pod) +- Sets `spark.kubernetes.container.image` from `BENCH_IMAGE` +- References `tpcbench.py` at its container path (`/opt/benchmarks/tpcbench.py`) +- Skips local Spark master/worker restart + +Preview the generated command: + +```shell +python3 run.py --engine comet --benchmark tpch \ + --output s3a://my-bucket/bench-results --dry-run +``` + +## Retrieving results + +Results are written as JSON to the S3 output path: + +```shell +aws s3 ls s3://my-bucket/bench-results/ +aws s3 cp s3://my-bucket/bench-results/comet-tpch-*.json . +``` + +## Optional: PVC for data + +If you prefer to serve TPC data from a PersistentVolumeClaim instead of S3, use the +provided pod template: + +```shell +# Create a PVC named "tpc-data" with your dataset, then: +python3 run.py --engine comet --benchmark tpch \ + --output s3a://my-bucket/bench-results --no-restart +``` + +Add these Spark configs to your engine TOML or pass them via `--conf`: + +``` +spark.kubernetes.driver.podTemplateFile=benchmarks/tpc/infra/k8s/pod-template.yaml +spark.kubernetes.executor.podTemplateFile=benchmarks/tpc/infra/k8s/pod-template.yaml +``` + diff --git a/benchmarks/tpc/engines/comet-hashjoin.toml b/benchmarks/tpc/engines/comet-hashjoin.toml index 1aa4957241..2574cc83c8 100644 --- a/benchmarks/tpc/engines/comet-hashjoin.toml +++ b/benchmarks/tpc/engines/comet-hashjoin.toml @@ -21,6 +21,9 @@ name = "comet-hashjoin" [env] required = ["COMET_JAR"] +[env.defaults] +COMET_JAR = "/jars/comet.jar" + [spark_submit] jars = ["$COMET_JAR"] driver_class_path = ["$COMET_JAR"] diff --git a/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml index 84a5333728..b1bb3bc50e 100644 --- a/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml +++ b/benchmarks/tpc/engines/comet-iceberg-hashjoin.toml @@ -22,6 +22,7 @@ name = "comet-iceberg-hashjoin" required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"] [env.defaults] +COMET_JAR = "/jars/comet.jar" ICEBERG_CATALOG = "local" [spark_submit] diff --git a/benchmarks/tpc/engines/comet-iceberg.toml b/benchmarks/tpc/engines/comet-iceberg.toml index 3654f359e3..d58acc60bc 100644 --- a/benchmarks/tpc/engines/comet-iceberg.toml +++ b/benchmarks/tpc/engines/comet-iceberg.toml @@ -22,6 +22,7 @@ name = "comet-iceberg" required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"] [env.defaults] +COMET_JAR = "/jars/comet.jar" ICEBERG_CATALOG = "local" [spark_submit] diff --git a/benchmarks/tpc/engines/comet.toml b/benchmarks/tpc/engines/comet.toml index 05b2cb22ba..17bd42ec79 100644 --- a/benchmarks/tpc/engines/comet.toml +++ b/benchmarks/tpc/engines/comet.toml @@ -21,6 +21,9 @@ name = "comet" [env] required = ["COMET_JAR"] +[env.defaults] +COMET_JAR = "/jars/comet.jar" + [spark_submit] jars = ["$COMET_JAR"] driver_class_path = ["$COMET_JAR"] diff --git a/benchmarks/tpc/infra/docker/Dockerfile b/benchmarks/tpc/infra/docker/Dockerfile index 9bf5ae3935..7d19dae986 100644 --- a/benchmarks/tpc/infra/docker/Dockerfile +++ b/benchmarks/tpc/infra/docker/Dockerfile @@ -61,8 +61,14 @@ COPY benchmarks/tpc/queries /opt/benchmarks/queries COPY benchmarks/tpc/create-iceberg-tables.py /opt/benchmarks/create-iceberg-tables.py COPY benchmarks/tpc/generate-comparison.py /opt/benchmarks/generate-comparison.py -# Engine JARs are bind-mounted or copied in at runtime via --jars. -# Data and query paths are also bind-mounted. +# Optionally bake engine JARs into the image for Kubernetes mode. +# Usage: docker build --build-arg COMET_JAR=path/to/comet.jar ... +RUN mkdir -p /jars +ARG COMET_JAR="" +COPY benchmarks/tpc/infra/docker/Dockerfile ${COMET_JAR} /tmp/_stage/ +RUN if ls /tmp/_stage/*.jar 1>/dev/null 2>&1; then \ + mv /tmp/_stage/*.jar /jars/comet.jar; \ + fi && rm -rf /tmp/_stage WORKDIR /opt/benchmarks diff --git a/benchmarks/tpc/infra/k8s/pod-template.yaml b/benchmarks/tpc/infra/k8s/pod-template.yaml new file mode 100644 index 0000000000..3478f3788b --- /dev/null +++ b/benchmarks/tpc/infra/k8s/pod-template.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Optional pod template for mounting TPC data from a PersistentVolumeClaim. +# +# Usage: set these Spark configs to use this template: +# spark.kubernetes.driver.podTemplateFile=infra/k8s/pod-template.yaml +# spark.kubernetes.executor.podTemplateFile=infra/k8s/pod-template.yaml +# +# Before using, create a PVC named "tpc-data" containing the TPC dataset. + +apiVersion: v1 +kind: Pod +spec: + containers: + - name: spark + volumeMounts: + - name: tpc-data + mountPath: /data + readOnly: true + volumes: + - name: tpc-data + persistentVolumeClaim: + claimName: tpc-data diff --git a/benchmarks/tpc/infra/k8s/rbac.yaml b/benchmarks/tpc/infra/k8s/rbac.yaml new file mode 100644 index 0000000000..1b731d0112 --- /dev/null +++ b/benchmarks/tpc/infra/k8s/rbac.yaml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ServiceAccount and RBAC for Spark on Kubernetes. +# Spark needs permissions to create and manage executor pods. +# +# Apply to your namespace: +# kubectl apply -f rbac.yaml -n + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: spark-role +rules: + - apiGroups: [""] + resources: ["pods", "services", "configmaps"] + verbs: ["create", "get", "list", "watch", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-role-binding +subjects: + - kind: ServiceAccount + name: spark +roleRef: + kind: Role + name: spark-role + apiGroup: rbac.authorization.k8s.io diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py index 5a89166cda..9af7dd5766 100755 --- a/benchmarks/tpc/run.py +++ b/benchmarks/tpc/run.py @@ -199,12 +199,23 @@ def check_required_env(config): sys.exit(1) +def is_k8s_master(): + """Check if SPARK_MASTER points to a Kubernetes cluster.""" + return os.environ.get("SPARK_MASTER", "").startswith("k8s://") + + def check_common_env(): """Validate SPARK_HOME and SPARK_MASTER are set.""" for var in ("SPARK_HOME", "SPARK_MASTER"): if not os.environ.get(var): print(f"Error: {var} is not set", file=sys.stderr) sys.exit(1) + if is_k8s_master() and not os.environ.get("BENCH_IMAGE"): + print( + "Error: BENCH_IMAGE must be set for Kubernetes mode", + file=sys.stderr, + ) + sys.exit(1) def check_benchmark_env(config, benchmark): @@ -236,13 +247,26 @@ def build_spark_submit_cmd(config, benchmark, args): spark_master = os.environ["SPARK_MASTER"] profile = BENCHMARK_PROFILES[benchmark] + k8s_mode = is_k8s_master() + cmd = [os.path.join(spark_home, "bin", "spark-submit")] cmd += ["--master", spark_master] + if k8s_mode: + cmd += ["--deploy-mode", "cluster"] + # --jars jars = config.get("spark_submit", {}).get("jars", []) if jars: - cmd += ["--jars", ",".join(resolve_env_in_list(jars))] + resolved_jars = resolve_env_in_list(jars) + if k8s_mode: + # Prefix absolute local paths with local:// so Spark uses the + # JAR baked into the container image instead of uploading it. + resolved_jars = [ + f"local://{j}" if j.startswith("/") and "://" not in j else j + for j in resolved_jars + ] + cmd += ["--jars", ",".join(resolved_jars)] # --driver-class-path driver_cp = config.get("spark_submit", {}).get("driver_class_path", []) @@ -261,6 +285,12 @@ def build_spark_submit_cmd(config, benchmark, args): val = "true" if val else "false" conf[resolve_env(key)] = resolve_env(str(val)) + # K8s-specific Spark configuration + if k8s_mode: + bench_image = os.environ["BENCH_IMAGE"] + conf["spark.kubernetes.container.image"] = bench_image + conf["spark.kubernetes.authenticate.driver.serviceAccountName"] = "spark" + # JFR profiling: append to extraJavaOptions (preserving any existing values) if args.jfr: jfr_dir = args.jfr_dir @@ -314,8 +344,12 @@ def build_spark_submit_cmd(config, benchmark, args): for key, val in sorted(conf.items()): cmd += ["--conf", f"{key}={val}"] - # tpcbench.py path - cmd.append("tpcbench.py") + # tpcbench.py path — in K8s cluster mode, reference the path baked into + # the Docker image since there's no shared filesystem with the submitter + if k8s_mode: + cmd.append("/opt/benchmarks/tpcbench.py") + else: + cmd.append("tpcbench.py") # tpcbench args engine_name = config.get("engine", {}).get("name", args.engine) @@ -340,6 +374,14 @@ def build_spark_submit_cmd(config, benchmark, args): if profile["format"] and not use_iceberg: cmd += ["--format", profile["format"]] + # Pass profiling dirs and upload path through to tpcbench.py + if args.jfr: + cmd += ["--jfr-dir", args.jfr_dir] + if args.async_profiler: + cmd += ["--async-profiler-dir", args.async_profiler_dir] + if args.profile_upload: + cmd += ["--profile-upload", args.profile_upload] + return cmd @@ -438,6 +480,10 @@ def main(): choices=["flamegraph", "jfr", "collapsed", "text"], help="async-profiler output format (default: flamegraph)", ) + parser.add_argument( + "--profile-upload", + help="Remote path (e.g. s3a://bucket/profiles) to upload profiling artifacts", + ) args = parser.parse_args() config = load_engine_config(args.engine) @@ -450,12 +496,13 @@ def main(): check_required_env(config) check_benchmark_env(config, args.benchmark) - # Restart Spark unless --no-restart or --dry-run - if not args.no_restart and not args.dry_run: + # Restart Spark unless --no-restart, --dry-run, or K8s mode + if not args.no_restart and not args.dry_run and not is_k8s_master(): restart_spark() - # Create profiling output directories (skip for dry-run) - if not args.dry_run: + # Create profiling output directories (skip for dry-run and K8s mode + # where dirs are inside the pod, not on the submitter) + if not args.dry_run and not is_k8s_master(): if args.jfr: os.makedirs(args.jfr_dir, exist_ok=True) if args.async_profiler: diff --git a/benchmarks/tpc/tpcbench.py b/benchmarks/tpc/tpcbench.py index 036d7b0e9a..2385cb1a33 100644 --- a/benchmarks/tpc/tpcbench.py +++ b/benchmarks/tpc/tpcbench.py @@ -56,6 +56,39 @@ def result_hash(rows): return h.hexdigest() +def is_remote_path(path): + """Check if a path is a remote filesystem path (S3, GCS, etc.).""" + return path and any( + path.startswith(prefix) for prefix in ("s3a://", "s3://", "gs://") + ) + + +def upload_local_dir_to_remote(spark, local_dir, remote_dir): + """Upload all files in a local directory to a remote path using Hadoop FS.""" + hadoop_conf = spark._jsc.hadoopConfiguration() + local_path = spark._jvm.org.apache.hadoop.fs.Path(local_dir) + local_fs = local_path.getFileSystem(hadoop_conf) + + if not local_fs.exists(local_path): + print(f"Skipping upload: {local_dir} does not exist") + return + + remote_path = spark._jvm.org.apache.hadoop.fs.Path(remote_dir) + remote_fs = remote_path.getFileSystem(hadoop_conf) + + statuses = local_fs.listStatus(local_path) + if not statuses: + print(f"Skipping upload: {local_dir} is empty") + return + + for status in statuses: + src = status.getPath() + dst = spark._jvm.org.apache.hadoop.fs.Path(remote_dir, src.getName()) + print(f"Uploading {src} -> {dst}") + remote_fs.copyFromLocalFile(False, True, src, dst) + print(f"Uploaded {len(statuses)} file(s) to {remote_dir}") + + def main( benchmark: str, data_path: str, @@ -68,6 +101,9 @@ def main( query_num: int = None, write_path: str = None, options: Dict[str, str] = None, + jfr_dir: str = None, + async_profiler_dir: str = None, + profile_upload: str = None, ): if options is None: options = {} @@ -110,7 +146,7 @@ def main( else: # Support both "customer/" and "customer.parquet/" layouts source = f"{data_path}/{table}.{format}" - if not os.path.exists(source): + if not is_remote_path(source) and not os.path.exists(source): source = f"{data_path}/{table}" print(f"Registering table {table} from {source}") df = spark.read.format(format).options(**options).load(source) @@ -194,8 +230,28 @@ def main( current_time_millis = int(datetime.now().timestamp() * 1000) results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json" print(f"\nWriting results to {results_path}") - with open(results_path, "w") as f: - f.write(result_str) + if is_remote_path(output): + # Use Hadoop FileSystem API for remote paths (S3, GCS, etc.) + hadoop_conf = spark._jsc.hadoopConfiguration() + path = spark._jvm.org.apache.hadoop.fs.Path(results_path) + fs = path.getFileSystem(hadoop_conf) + out = fs.create(path) + out.write(bytearray(result_str, "utf-8")) + out.close() + else: + with open(results_path, "w") as f: + f.write(result_str) + + # Upload profiling artifacts to remote storage before the pod terminates + if profile_upload: + if jfr_dir: + upload_local_dir_to_remote( + spark, jfr_dir, f"{profile_upload}/jfr/" + ) + if async_profiler_dir: + upload_local_dir_to_remote( + spark, async_profiler_dir, f"{profile_upload}/async-profiler/" + ) spark.stop() @@ -256,6 +312,18 @@ def main( "--write", help="Path to save query results as Parquet" ) + parser.add_argument( + "--jfr-dir", + help="Local directory containing JFR profiling output" + ) + parser.add_argument( + "--async-profiler-dir", + help="Local directory containing async-profiler output" + ) + parser.add_argument( + "--profile-upload", + help="Remote path (e.g. s3a://bucket/path) to upload profiling artifacts" + ) args = parser.parse_args() main( @@ -270,4 +338,7 @@ def main( args.query, args.write, args.options, + args.jfr_dir, + args.async_profiler_dir, + args.profile_upload, )