Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Genomics Kit integrates the utilities for bioinformatics analysis

1. [igv-report](./igv-report): Generate the IGV report html format
2. [spark-on-slurm](./spark-on-slurm/): Spark on SLURM cluster configuration
6 changes: 0 additions & 6 deletions spark-on-slurm/.bash_history

This file was deleted.

6 changes: 4 additions & 2 deletions spark-on-slurm/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
sparkcluster*
sparkcluster-.*log
.spark*
.cache
spark_master.out
job
job
*.log
.bash_history
7 changes: 5 additions & 2 deletions spark-on-slurm/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SLURM_IMAGE:= nttg8100/river-slurm:1.1.0
.PHONY: setup example clean
.PHONY: setup example clean install

${HOME}/.pixi/bin/pixi:
curl -sSL https://pixi.sh/install.sh | sh
Expand Down Expand Up @@ -32,11 +32,14 @@ get-slurm:
exit 1; \
fi

install:
${HOME}/.pixi/bin/pixi run install-sparkhpc

# inside user river is the standard user with 1001 mapping id, Github Actions does not have this user
test: ${HOME}/.pixi/bin/pixi start-slurm get-slurm
docker exec \
-w /tmp/spark \
slurm-dev /root/.pixi/bin/pixi run sparkhpc-example
slurm-dev /root/.pixi/bin/pixi run sparkhpc-hail

test-slurm-local:
${HOME}/.pixi/bin/pixi run sparkhpc-example
Expand Down
5 changes: 3 additions & 2 deletions spark-on-slurm/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ When you run the example workflow:
User entrypoints (`setup`, `example`, `clean`).
- `pixi.toml`
Environment definition and task `sparkhpc-example`.
- `sparkhpc/run_example.py`
- `example/*.py`
End-to-end test runner: submit cluster, wait for master, run Spark actions, cleanup.
- `sparkhpc/sparkhpc/sparkjob.py`
Core Spark-on-Slurm orchestration logic.
Expand Down Expand Up @@ -110,6 +110,7 @@ squeue -u $USER
This README describes **Spark-on-Slurm only**. HDFS/Hadoop integration is intentionally deferred for a separate step.

## References

This toolkit is adapted from the original [sparkhpc project](https://github.com/rokroskar/sparkhpc), which provides a framework for running Spark on HPC environments. However, the original repository is no longer actively maintained and requires updates to function with modern Spark and SLURM configurations.

Therefore, this implementation integrates and modifies the original toolkit with several adjustments to ensure compatibility and reliable operation in current environments.
Therefore, this implementation integrates and modifies the original toolkit with several adjustments to ensure compatibility and reliable operation in current environments.
Binary file not shown.
113 changes: 113 additions & 0 deletions spark-on-slurm/examples/pi_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python
"""
Calculate Pi using Monte Carlo method in PySpark
Connects to existing Spark cluster started via sparkcluster CLI

Usage:
1. Start cluster: sparkcluster start 4 --partition main --spark-home $SPARK_HOME
2. Run this script: python examples/pi_counter.py
3. Stop cluster: kill the sparkcluster process or wait for walltime
"""
import sys
import os

# Add sparkhpc to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'sparkhpc'))

from sparkhpc import sparkjob
from pyspark import SparkConf, SparkContext
from random import random
from math import pi as math_pi
import time

def inside(p):
"""Check if a random point (x, y) is inside the unit circle"""
x, y = random(), random()
return x*x + y*y <= 1

def main():
print(f"\n{'='*70}")
print(f"PySpark Pi Calculation - Monte Carlo Method")
print(f"{'='*70}")

# Get the running cluster (cluster 0)
try:
sj = sparkjob.sparkjob(clusterid=0)
except Exception as e:
print(f" Error: Could not find running cluster!")
print(f" Make sure to start cluster first:")
print(f" $ sparkcluster start 4 --partition main --spark-home $SPARK_HOME")
print(f" Error details: {e}")
sys.exit(1)

print(f"\nConnecting to Spark cluster...")
print(f" Cluster ID: 0")
print(f" Job ID: {sj.jobid}")

# Wait for master to be available
print(f"\nWaiting for Spark Master (timeout: 60 seconds)...")
deadline = time.time() + 60
master_url = None

while time.time() < deadline:
try:
master_url = sj.master_url()
if master_url:
print(f" Master URL: {master_url}")
break
except:
pass
time.sleep(0.5)

if not master_url:
print(" Could not connect to Spark master!")
print(f" Verify cluster is running:")
print(f" $ squeue")
print(f" Check logs:")
print(f" $ tail spark_master.out")
sys.exit(1)

ui_url = sj.master_ui()
print(f" Master UI: {ui_url}")

# Create SparkContext and connect to the master
print(f"\nCreating SparkContext...")
conf = SparkConf().setMaster(master_url).setAppName("PiCalculator")
sc = SparkContext(conf=conf)

try:
# Calculate pi
num_samples = 100000000 # 100 million samples
num_partitions = 4

print(f"\nCalculating Pi with {num_samples:,} samples across {num_partitions} partitions...")
print(f"This will take 15-20 seconds...\n")

# Create RDD and calculate pi
start_time = time.time()
count = sc.parallelize(range(0, num_samples), num_partitions).filter(lambda _: inside(_)).count()
elapsed_time = time.time() - start_time

# Calculate result
pi_estimate = 4.0 * count / num_samples
error = abs(pi_estimate - math_pi)
accuracy = (1 - error/math_pi) * 100

print(f"{'='*70}")
print(f"RESULTS:")
print(f"{'='*70}")
print(f"Pi estimated value: {pi_estimate:.10f}")
print(f"Actual Pi value: {math_pi:.10f}")
print(f"Error: {error:.10f}")
print(f"Accuracy: {accuracy:.6f}%")
print(f"Execution time: {elapsed_time:.2f} seconds")
print(f"Points inside: {count:,} / {num_samples:,}")
print(f"{'='*70}\n")

finally:
print(f"Stopping SparkContext...")
sc.stop()
print(f" Job completed successfully!")

if __name__ == "__main__":
main()
51 changes: 51 additions & 0 deletions spark-on-slurm/examples/simulate_gwas_by_hail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pyspark.sql import SparkSession
import hail as hl
from sparkhpc import sparkjob
import sys
import os

# Connect to existing Spark cluster
sj_list = sparkjob.sparkjob().current_clusters()
if not sj_list:
print("ERROR: No running Spark cluster found!")
print("Please start a cluster first with: sparkcluster start <ncores>")
sys.exit(1)

cluster = sj_list[0]
master_url = cluster.master_url()
print(f"Connecting to Spark cluster at {master_url}")

# Get Hail JAR path
hail_dir = os.path.dirname(hl.__file__)
hail_jar = os.path.join(hail_dir, 'backend', 'hail-all-spark.jar')
print(f"Using Hail JAR: {hail_jar}")

# Connect to the existing cluster with Hail JAR and serialization configuration
spark = (SparkSession.builder
.master(master_url)
.appName("GWAS-Hail")
.config("spark.jars", hail_jar)
.config("spark.driver.extraClassPath", hail_jar)
.config("spark.executor.extraClassPath", f"./hail-all-spark.jar:{hail_jar}")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "is.hail.kryo.HailKryoRegistrator")
.getOrCreate())

print(f"Spark version: {spark.version}")
print(f"Spark master: {spark.sparkContext.master}")
print(f"Spark serializer: {spark.sparkContext.getConf().get('spark.serializer')}")

print("\nInitializing Hail...")
hl.init(sc=spark.sparkContext)
print("✓ Hail initialized successfully!")

# Submit jobs to simulate the data
mt = hl.balding_nichols_model(n_populations=3,
n_samples=500,
n_variants=500_000,
n_partitions=32)
mt = mt.annotate_cols(drinks_coffee = hl.rand_bool(0.33))
gwas = hl.linear_regression_rows(y=mt.drinks_coffee,
x=mt.GT.n_alt_alleles(),
covariates=[1.0])
gwas.order_by(gwas.p_value).show(25)
Loading
Loading