DagsterDocs

Source code for dagstermill.solids

import copy
import os
import pickle
import sys
import tempfile
import uuid

import nbformat
import papermill
from dagster import (
    AssetMaterialization,
    EventMetadataEntry,
    InputDefinition,
    Output,
    OutputDefinition,
    SolidDefinition,
    check,
    seven,
)
from dagster.core.definitions.reconstructable import ReconstructablePipeline
from dagster.core.definitions.utils import validate_tags
from dagster.core.execution.context.compute import SolidExecutionContext
from dagster.core.execution.context.system import SystemComputeExecutionContext
from dagster.core.storage.file_manager import FileHandle
from dagster.serdes import pack_value
from dagster.utils import mkdir_p, safe_tempfile_path
from dagster.utils.error import serializable_error_info_from_exc_info
from papermill.engines import papermill_engines
from papermill.iorw import load_notebook_node, write_ipynb
from papermill.parameterize import _find_first_tagged_cell_index

from .engine import DagstermillNBConvertEngine
from .errors import DagstermillError
from .serialize import read_value, write_value
from .translator import RESERVED_INPUT_NAMES, DagsterTranslator


# This is based on papermill.parameterize.parameterize_notebook
# Typically, papermill injects the injected-parameters cell *below* the parameters cell
# but we want to *replace* the parameters cell, which is what this function does.
def replace_parameters(context, nb, parameters):
    """Assigned parameters into the appropiate place in the input notebook

    Args:
        nb (NotebookNode): Executable notebook object
        parameters (dict): Arbitrary keyword arguments to pass to the notebook parameters.
    """
    check.dict_param(parameters, "parameters")

    # Copy the nb object to avoid polluting the input
    nb = copy.deepcopy(nb)

    # papermill method chooses translator based on kernel_name and language, but we just call the
    # DagsterTranslator to generate parameter content based on the kernel_name
    param_content = DagsterTranslator.codify(parameters)

    newcell = nbformat.v4.new_code_cell(source=param_content)
    newcell.metadata["tags"] = ["injected-parameters"]

    param_cell_index = _find_first_tagged_cell_index(nb, "parameters")
    injected_cell_index = _find_first_tagged_cell_index(nb, "injected-parameters")
    if injected_cell_index >= 0:
        # Replace the injected cell with a new version
        before = nb.cells[:injected_cell_index]
        after = nb.cells[injected_cell_index + 1 :]
        check.int_value_param(param_cell_index, -1, "param_cell_index")
        # We should have blown away the parameters cell if there is an injected-parameters cell
    elif param_cell_index >= 0:
        # Replace the parameter cell with the injected-parameters cell
        before = nb.cells[:param_cell_index]
        after = nb.cells[param_cell_index + 1 :]
    else:
        # Inject to the top of the notebook, presumably first cell includes dagstermill import
        context.log.debug(
            (
                "Executing notebook with no tagged parameters cell: injecting boilerplate in first "
                "cell."
            )
        )
        before = []
        after = nb.cells

    nb.cells = before + [newcell] + after
    nb.metadata.papermill["parameters"] = seven.json.dumps(parameters)

    return nb


def get_papermill_parameters(compute_context, inputs, output_log_path):
    check.inst_param(compute_context, "compute_context", SystemComputeExecutionContext)
    check.param_invariant(
        isinstance(compute_context.run_config, dict),
        "compute_context",
        "SystemComputeExecutionContext must have valid run_config",
    )
    check.dict_param(inputs, "inputs", key_type=str)

    run_id = compute_context.run_id

    marshal_dir = "/tmp/dagstermill/{run_id}/marshal".format(run_id=run_id)
    mkdir_p(marshal_dir)

    if not isinstance(compute_context.pipeline, ReconstructablePipeline):
        raise DagstermillError(
            "Can't execute a dagstermill solid from a pipeline that is not reconstructable. "
            "Use the reconstructable() function if executing from python"
        )

    dm_executable_dict = compute_context.pipeline.to_dict()

    dm_context_dict = {
        "output_log_path": output_log_path,
        "marshal_dir": marshal_dir,
        "run_config": compute_context.run_config,
    }

    dm_solid_handle_kwargs = compute_context.solid_handle._asdict()

    parameters = {}

    input_def_dict = compute_context.solid_def.input_dict
    for input_name, input_value in inputs.items():
        assert (
            input_name not in RESERVED_INPUT_NAMES
        ), "Dagstermill solids cannot have inputs named {input_name}".format(input_name=input_name)
        dagster_type = input_def_dict[input_name].dagster_type
        parameter_value = write_value(
            dagster_type, input_value, os.path.join(marshal_dir, "input-{}".format(input_name))
        )
        parameters[input_name] = parameter_value

    parameters["__dm_context"] = dm_context_dict
    parameters["__dm_executable_dict"] = dm_executable_dict
    parameters["__dm_pipeline_run_dict"] = pack_value(compute_context.pipeline_run)
    parameters["__dm_solid_handle_kwargs"] = dm_solid_handle_kwargs
    parameters["__dm_instance_ref_dict"] = pack_value(compute_context.instance.get_ref())

    return parameters


def _dm_solid_compute(name, notebook_path, output_notebook=None, asset_key_prefix=None):
    check.str_param(name, "name")
    check.str_param(notebook_path, "notebook_path")
    check.opt_str_param(output_notebook, "output_notebook")
    check.opt_list_param(asset_key_prefix, "asset_key_prefix")

    def _t_fn(compute_context, inputs):
        check.inst_param(compute_context, "compute_context", SolidExecutionContext)
        check.param_invariant(
            isinstance(compute_context.run_config, dict),
            "context",
            "SystemComputeExecutionContext must have valid run_config",
        )

        system_compute_context = compute_context.get_system_context()

        with tempfile.TemporaryDirectory() as output_notebook_dir:
            with safe_tempfile_path() as output_log_path:

                parameterized_notebook_path = os.path.join(
                    output_notebook_dir, "{prefix}-inter.ipynb".format(prefix=str(uuid.uuid4()))
                )

                executed_notebook_path = os.path.join(
                    output_notebook_dir, "{prefix}-out.ipynb".format(prefix=str(uuid.uuid4()))
                )

                # Scaffold the registration here
                nb = load_notebook_node(notebook_path)
                nb_no_parameters = replace_parameters(
                    system_compute_context,
                    nb,
                    get_papermill_parameters(system_compute_context, inputs, output_log_path),
                )
                write_ipynb(nb_no_parameters, parameterized_notebook_path)

                try:
                    papermill_engines.register("dagstermill", DagstermillNBConvertEngine)
                    papermill.execute_notebook(
                        input_path=parameterized_notebook_path,
                        output_path=executed_notebook_path,
                        engine_name="dagstermill",
                        log_output=True,
                    )

                except Exception:  # pylint: disable=broad-except
                    try:
                        with open(executed_notebook_path, "rb") as fd:
                            executed_notebook_file_handle = (
                                compute_context.resources.file_manager.write(
                                    fd, mode="wb", ext="ipynb"
                                )
                            )
                            executed_notebook_materialization_path = (
                                executed_notebook_file_handle.path_desc
                            )
                    except Exception:  # pylint: disable=broad-except
                        compute_context.log.warning(
                            "Error when attempting to materialize executed notebook using file manager (falling back to local): {exc}".format(
                                exc=str(serializable_error_info_from_exc_info(sys.exc_info()))
                            )
                        )
                        executed_notebook_materialization_path = executed_notebook_path

                    yield AssetMaterialization(
                        asset_key=(asset_key_prefix + [f"{name}_output_notebook"]),
                        description="Location of output notebook in file manager",
                        metadata_entries=[
                            EventMetadataEntry.fspath(
                                executed_notebook_materialization_path,
                                label="executed_notebook_path",
                            )
                        ],
                    )
                    raise

            system_compute_context.log.debug(
                "Notebook execution complete for {name} at {executed_notebook_path}.".format(
                    name=name,
                    executed_notebook_path=executed_notebook_path,
                )
            )

            executed_notebook_file_handle = None
            try:
                # use binary mode when when moving the file since certain file_managers such as S3
                # may try to hash the contents
                with open(executed_notebook_path, "rb") as fd:
                    executed_notebook_file_handle = compute_context.resources.file_manager.write(
                        fd, mode="wb", ext="ipynb"
                    )
                    executed_notebook_materialization_path = executed_notebook_file_handle.path_desc
            except Exception:  # pylint: disable=broad-except
                compute_context.log.warning(
                    "Error when attempting to materialize executed notebook using file manager (falling back to local): {exc}".format(
                        exc=str(serializable_error_info_from_exc_info(sys.exc_info()))
                    )
                )
                executed_notebook_materialization_path = executed_notebook_path

            yield AssetMaterialization(
                asset_key=(asset_key_prefix + [f"{name}_output_notebook"]),
                description="Location of output notebook in file manager",
                metadata_entries=[
                    EventMetadataEntry.fspath(executed_notebook_materialization_path)
                ],
            )

            if output_notebook is not None:
                yield Output(executed_notebook_file_handle, output_notebook)

            # deferred import for perf
            import scrapbook

            output_nb = scrapbook.read_notebook(executed_notebook_path)

            for (output_name, output_def) in system_compute_context.solid_def.output_dict.items():
                data_dict = output_nb.scraps.data_dict
                if output_name in data_dict:
                    value = read_value(output_def.dagster_type, data_dict[output_name])

                    yield Output(value, output_name)

            for key, value in output_nb.scraps.items():
                if key.startswith("event-"):
                    with open(value.data, "rb") as fd:
                        yield pickle.loads(fd.read())

    return _t_fn


[docs]def define_dagstermill_solid( name, notebook_path, input_defs=None, output_defs=None, config_schema=None, required_resource_keys=None, output_notebook=None, asset_key_prefix=None, description=None, tags=None, ): """Wrap a Jupyter notebook in a solid. Arguments: name (str): The name of the solid. notebook_path (str): Path to the backing notebook. input_defs (Optional[List[InputDefinition]]): The solid's inputs. output_defs (Optional[List[OutputDefinition]]): The solid's outputs. Your notebook should call :py:func:`~dagstermill.yield_result` to yield each of these outputs. required_resource_keys (Optional[Set[str]]): The string names of any required resources. output_notebook (Optional[str]): If set, will be used as the name of an injected output of type :py:class:`~dagster.FileHandle` that will point to the executed notebook (in addition to the :py:class:`~dagster.AssetMaterialization` that is always created). This respects the :py:class:`~dagster.core.storage.file_manager.FileManager` configured on the pipeline resources via the "file_manager" resource key, so, e.g., if :py:class:`~dagster_aws.s3.s3_file_manager` is configured, the output will be a : py:class:`~dagster_aws.s3.S3FileHandle`. asset_key_prefix (Optional[Union[List[str], str]]): If set, will be used to prefix the asset keys for materialized notebooks. description (Optional[str]): If set, description used for solid. tags (Optional[Dict[str, str]]): If set, additional tags used to annotate solid. Dagster uses the tag keys `notebook_path` and `kind`, which cannot be overwritten by the user. Returns: :py:class:`~dagster.SolidDefinition` """ check.str_param(name, "name") check.str_param(notebook_path, "notebook_path") input_defs = check.opt_list_param(input_defs, "input_defs", of_type=InputDefinition) output_defs = check.opt_list_param(output_defs, "output_defs", of_type=OutputDefinition) required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) if output_notebook is not None: required_resource_keys.add("file_manager") if isinstance(asset_key_prefix, str): asset_key_prefix = [asset_key_prefix] asset_key_prefix = check.opt_list_param(asset_key_prefix, "asset_key_prefix", of_type=str) default_description = f"This solid is backed by the notebook at {notebook_path}" description = check.opt_str_param(description, "description", default=default_description) user_tags = validate_tags(tags) if tags is not None: check.invariant( "notebook_path" not in tags, "user-defined solid tags contains the `notebook_path` key, but the `notebook_path` key is reserved for use by Dagster", ) check.invariant( "kind" not in tags, "user-defined solid tags contains the `kind` key, but the `kind` key is reserved for use by Dagster", ) default_tags = {"notebook_path": notebook_path, "kind": "ipynb"} return SolidDefinition( name=name, input_defs=input_defs, compute_fn=_dm_solid_compute( name, notebook_path, output_notebook, asset_key_prefix=asset_key_prefix ), output_defs=output_defs + ( [OutputDefinition(dagster_type=FileHandle, name=output_notebook)] if output_notebook else [] ), config_schema=config_schema, required_resource_keys=required_resource_keys, description=description, tags={**user_tags, **default_tags}, )