Skip to main content

Spark over YT in Jupyter

This notebook demonstrates how to work with Spark over YTsaurus in a Jupyter notebook. We explore the following steps:

  1. Setting up a new standalone Spyt cluster.
  2. Connecting to the Spyt cluster from PySpark and running queries.

Prepare Spyt cluster

If you already have a running Spyt cluster, specify its directory in the variable user_spark_discovery_path below.

user_spark_discovery_path = None
import uuid
import subprocess
import sys
import random
import os

import yt.wrapper as yt
working_dir = f"//tmp/examples/spark-over-yt-in-jupyter_{uuid.uuid4()}"
yt.create("map_node", working_dir, recursive=True)
print(working_dir)

If user_spark_discovery_path is not specified, we will create a new small Spyt cluster and shut it down at the end of the notebook.

spark_discovery_path = user_spark_discovery_path

if spark_discovery_path is None:
spark_discovery_path = working_dir
port = random.randint(27100, 27200)

proxy = os.environ["YT_PROXY"]
spark_launch_command = [
"spark-launch-yt",
"--proxy", proxy,
"--operation-alias", f"spark-over-yt-in-jupyt-example-{uuid.uuid4()}",
"--worker-cores", "2",
"--worker-num", "1",
"--worker-memory", "64G",
"--discovery-path", spark_discovery_path,
"--params", f'{{spark_conf={{spark.shuffle.service.port={port}}}}}',
"--abort-existing"
]
print(" ".join(spark_launch_command))

process = subprocess.Popen(spark_launch_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ)
stdout, stderr = process.communicate()
print("Exit code: ", process.poll())
print("Stdout: ", stdout.decode())
print("Stderr: ", stderr.decode())

Run query

Let's run a query by pyspark library using spyt helper.

spyt.spark_session is a helper function to connect to the Spyt cluster. The session object behaves the same way as the original one from pyspark.

Let's find all the running squirrels in a dataset about squirrels from New York's Central Park.

from spyt import spark_session
with spark_session(discovery_path=spark_discovery_path) as spark:
df = spark.read.yt("//home/samples/squirrels")
filtered = df.filter(df["running"] == True)
filtered.show()

Run query with UDF

You can run spark queries with UDF.

from pyspark.sql.functions import udf

@udf
def to_upper(s):
if s is not None:
return s.upper()

with spark_session(discovery_path=spark_discovery_path) as spark:
display(spark.read.yt("//home/samples/squirrels").select(to_upper("age")).limit(20).toPandas())

Stop temporary SPYT cluster

if not user_spark_discovery_path:
operations = yt.list(f"{spark_discovery_path}/discovery/operation")
assert len(operations) == 1
yt.abort_operation(operation=operations[0])