DagsterDocs

Using Dagster with PySpark#

This guide includes two examples:

Running PySpark code in solids#

You can find the code for this example on Github

Passing PySpark DataFrames between solids requires a little bit of extra care, for a couple reasons:

  • Spark has a lazy execution model, which means that PySpark won't process any data until an action like write or collect is called on a DataFrame.
  • PySpark DataFrames cannot be pickled, which means that IO Managers like the fs_io_manager won't work for them.

In this example, we've defined an Asset Store that knows how to store and retrieve PySpark DataFrames that are produced and consumed by solids.

This example assumes that all the outputs within the pipeline will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same pipeline, take a look at the IO Manager concept page.

This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the write and read invocations.

import os

from dagster import IOManager, ModeDefinition, io_manager, pipeline, repository, solid
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


class LocalParquetStore(IOManager):
    def _get_path(self, context):
        return os.path.join(context.run_id, context.step_key, context.name)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))

    def load_input(self, context):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet(self._get_path(context.upstream_output))


@io_manager
def local_parquet_store(_):
    return LocalParquetStore()


@solid
def make_people(_):
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)


@solid
def filter_over_50(_, people):
    return people.filter(people["age"] > 50)


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": local_parquet_store})])
def my_pipeline():
    filter_over_50(make_people())

Submitting PySpark solids on EMR#

You can find the code for this example on Github

This example demonstrates how to have a solid run as a Spark step on an EMR cluster. In it, each of the three solids will be executed as a separate EMR step on the same EMR cluster.

from pathlib import Path

from dagster import (
    ModeDefinition,
    make_python_type_usable_as_dagster_type,
    pipeline,
    repository,
    solid,
)
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)


@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)


@solid(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(_, people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


@solid(required_resource_keys={"pyspark_step_launcher"})
def count_people(_, people: DataFrame) -> int:
    return people.count()


emr_mode = ModeDefinition(
    name="emr",
    resource_defs={
        "pyspark_step_launcher": emr_pyspark_step_launcher.configured(
            {
                "cluster_id": {"env": "EMR_CLUSTER_ID"},
                "local_pipeline_package_path": str(Path(__file__).parent),
                "deploy_local_pipeline_package": True,
                "region_name": "us-west-1",
                "staging_bucket": "my_staging_bucket",
                "wait_for_logs": True,
            }
        ),
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
        "s3": s3_resource,
        "io_manager": s3_pickle_io_manager.configured(
            {"s3_bucket": "my_staging_bucket", "s3_prefix": "simple-pyspark"}
        ),
    },
)

local_mode = ModeDefinition(
    name="local",
    resource_defs={
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
    },
)


@pipeline(mode_defs=[emr_mode, local_mode])
def my_pipeline():
    count_people(filter_over_50(make_people()))

It accomplishes this by using the emr_pyspark_step_launcher, which knows how to launch an EMR step that runs the contents of a solid. The example defines a mode that links the resource key "pyspark_step_launcher" to the emr_pyspark_step_launcher resource definition, and then requires that "pyspark_step_launcher" resource key for the solid which it wants to launch remotely.

The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.

More generally, a step launcher is any resource that extends the StepLauncher abstract class, whose methods can be invoked where a solid would otherwise be executed in-process to instead launch a remote process with the solid running inside it. To use a step launcher for a particular solid, set a required resource key for the solid that points to that resource.