Spark over YT in Jupyter
This notebook demonstrates how to work with Spark over YTsaurus in a Jupyter notebook. We explore the following steps:
- Setting up a new standalone
Spytcluster. - Connecting to the
Spytcluster fromPySparkand running queries.
Prepare Spyt cluster
If you already have a running Spyt cluster, specify its directory in the variable user_spark_discovery_path below.
user_spark_discovery_path = None
import uuid
import subprocess
import sys
import random
import os
import yt.wrapper as yt
working_dir = f"//tmp/examples/spark-over-yt-in-jupyter_{uuid.uuid4()}"
yt.create("map_node", working_dir, recursive=True)
print(working_dir)
If user_spark_discovery_path is not specified, we will create a new small Spyt cluster and shut it down at the end of the notebook.
spark_discovery_path = user_spark_discovery_path
if spark_discovery_path is None:
spark_discovery_path = working_dir
port = random.randint(27100, 27200)
proxy = os.environ["YT_PROXY"]
spark_launch_command = [
"spark-launch-yt",
"--proxy", proxy,
"--operation-alias", f"spark-over-yt-in-jupyt-example-{uuid.uuid4()}",
"--worker-cores", "2",
"--worker-num", "1",
"--worker-memory", "64G",
"--discovery-path", spark_discovery_path,
"--params", f'{{spark_conf={{spark.shuffle.service.port={port}}}}}',
"--abort-existing"
]
print(" ".join(spark_launch_command))
process = subprocess.Popen(spark_launch_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ)
stdout, stderr = process.communicate()
print("Exit code: ", process.poll())
print("Stdout: ", stdout.decode())
print("Stderr: ", stderr.decode())
Run query
Let's run a query by pyspark library using spyt helper.
spyt.spark_session is a helper function to connect to the Spyt cluster. The session object behaves the same way as the original one from pyspark.
Let's find all the running squirrels in a dataset about squirrels from New York's Central Park.
from spyt import spark_session
with spark_session(discovery_path=spark_discovery_path) as spark:
df = spark.read.yt("//home/samples/squirrels")
filtered = df.filter(df["running"] == True)
filtered.show()
Run query with UDF
You can run spark queries with UDF.
from pyspark.sql.functions import udf
@udf
def to_upper(s):
if s is not None:
return s.upper()
with spark_session(discovery_path=spark_discovery_path) as spark:
display(spark.read.yt("//home/samples/squirrels").select(to_upper("age")).limit(20).toPandas())
Stop temporary SPYT cluster
if not user_spark_discovery_path:
operations = yt.list(f"{spark_discovery_path}/discovery/operation")
assert len(operations) == 1
yt.abort_operation(operation=operations[0])