import sys
from contextlib import contextmanager
from typing import Any, Dict, FrozenSet, Iterator, List, Optional, Tuple, Union
from dagster import check
from dagster.core.definitions import IPipeline, PipelineDefinition
from dagster.core.definitions.pipeline import PipelineSubsetDefinition
from dagster.core.definitions.pipeline_base import InMemoryPipeline
from dagster.core.errors import DagsterExecutionInterruptedError, DagsterInvariantViolationError
from dagster.core.events import DagsterEvent
from dagster.core.execution.context.system import SystemPipelineExecutionContext
from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator
from dagster.core.execution.plan.plan import ExecutionPlan
from dagster.core.execution.plan.state import KnownExecutionState
from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan
from dagster.core.execution.retries import RetryMode
from dagster.core.instance import DagsterInstance, is_memoized_run
from dagster.core.selector import parse_step_selection
from dagster.core.storage.mem_io_manager import InMemoryIOManager
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.telemetry import log_repo_stats, telemetry_wrapper
from dagster.core.utils import str_format_set
from dagster.utils import merge_dicts
from dagster.utils.error import serializable_error_info_from_exc_info
from dagster.utils.interrupts import capture_interrupts
from .context_creation_pipeline import (
ExecutionContextManager,
PipelineExecutionContextManager,
PlanExecutionContextManager,
scoped_pipeline_context,
)
from .results import PipelineExecutionResult
## Brief guide to the execution APIs
# | function name | operates over | sync | supports | creates new PipelineRun |
# | | | | reexecution | in instance |
# | --------------------------- | ------------------ | ----- | ----------- | ----------------------- |
# | execute_pipeline_iterator | IPipeline | async | no | yes |
# | execute_pipeline | IPipeline | sync | no | yes |
# | execute_run_iterator | PipelineRun | async | (1) | no |
# | execute_run | PipelineRun | sync | (1) | no |
# | execute_plan_iterator | ExecutionPlan | async | (2) | no |
# | execute_plan | ExecutionPlan | sync | (2) | no |
# | reexecute_pipeline | IPipeline | sync | yes | yes |
# | reexecute_pipeline_iterator | IPipeline | async | yes | yes |
#
# Notes on reexecution support:
# (1) The appropriate bits must be set on the PipelineRun passed to this function. Specifically,
# parent_run_id and root_run_id must be set and consistent, and if a solids_to_execute or
# step_keys_to_execute are set they must be consistent with the parent and root runs.
# (2) As for (1), but the ExecutionPlan passed must also agree in all relevant bits.
def execute_run_iterator(
pipeline: IPipeline, pipeline_run: PipelineRun, instance: DagsterInstance
) -> Iterator[DagsterEvent]:
check.inst_param(pipeline, "pipeline", IPipeline)
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
check.inst_param(instance, "instance", DagsterInstance)
if pipeline_run.status == PipelineRunStatus.CANCELED:
# This can happen if the run was force-terminated while it was starting
def gen_execute_on_cancel():
yield instance.report_engine_event(
"Not starting execution since the run was canceled before execution could start",
pipeline_run,
)
return gen_execute_on_cancel()
check.invariant(
pipeline_run.status == PipelineRunStatus.NOT_STARTED
or pipeline_run.status == PipelineRunStatus.STARTING,
desc="Pipeline run {} ({}) in state {}, expected NOT_STARTED or STARTING".format(
pipeline_run.pipeline_name, pipeline_run.run_id, pipeline_run.status
),
)
if pipeline_run.solids_to_execute:
pipeline_def = pipeline.get_definition()
if isinstance(pipeline_def, PipelineSubsetDefinition):
check.invariant(
pipeline_run.solids_to_execute == pipeline.solids_to_execute,
"Cannot execute PipelineRun with solids_to_execute {solids_to_execute} that conflicts "
"with pipeline subset {pipeline_solids_to_execute}.".format(
pipeline_solids_to_execute=str_format_set(pipeline.solids_to_execute),
solids_to_execute=str_format_set(pipeline_run.solids_to_execute),
),
)
else:
# when `execute_run_iterator` is directly called, the sub pipeline hasn't been created
# note that when we receive the solids to execute via PipelineRun, it won't support
# solid selection query syntax
pipeline = pipeline.subset_for_execution_from_existing_pipeline(
pipeline_run.solids_to_execute
)
known_state = None
if pipeline_run.parent_run_id and pipeline_run.step_keys_to_execute:
plan_snap = instance.get_execution_plan_snapshot(pipeline_run.execution_plan_snapshot_id)
known_state = plan_snap.initial_known_state
execution_plan = create_execution_plan(
pipeline,
run_config=pipeline_run.run_config,
mode=pipeline_run.mode,
step_keys_to_execute=pipeline_run.step_keys_to_execute,
known_state=known_state,
)
return iter(
ExecuteRunWithPlanIterable(
execution_plan=execution_plan,
iterator=pipeline_execution_iterator,
execution_context_manager=PipelineExecutionContextManager(
execution_plan=execution_plan,
pipeline_run=pipeline_run,
instance=instance,
run_config=pipeline_run.run_config,
raise_on_error=False,
),
)
)
def execute_run(
pipeline: IPipeline,
pipeline_run: PipelineRun,
instance: DagsterInstance,
raise_on_error: bool = False,
) -> PipelineExecutionResult:
"""Executes an existing pipeline run synchronously.
Synchronous version of execute_run_iterator.
Args:
pipeline (IPipeline): The pipeline to execute.
pipeline_run (PipelineRun): The run to execute
instance (DagsterInstance): The instance in which the run has been created.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``False``.
Returns:
PipelineExecutionResult: The result of the execution.
"""
if isinstance(pipeline, PipelineDefinition):
raise DagsterInvariantViolationError(
"execute_run requires an IPipeline but received a PipelineDefinition "
"directly instead. To support hand-off to other processes provide a "
"ReconstructablePipeline which can be done using reconstructable(). For in "
"process only execution you can use InMemoryPipeline."
)
check.inst_param(pipeline, "pipeline", IPipeline)
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
check.inst_param(instance, "instance", DagsterInstance)
if pipeline_run.status == PipelineRunStatus.CANCELED:
message = "Not starting execution since the run was canceled before execution could start"
instance.report_engine_event(
message,
pipeline_run,
)
raise DagsterInvariantViolationError(message)
check.invariant(
pipeline_run.status == PipelineRunStatus.NOT_STARTED
or pipeline_run.status == PipelineRunStatus.STARTING,
desc="Pipeline run {} ({}) in state {}, expected NOT_STARTED or STARTING".format(
pipeline_run.pipeline_name, pipeline_run.run_id, pipeline_run.status
),
)
pipeline_def = pipeline.get_definition()
if pipeline_run.solids_to_execute:
if isinstance(pipeline_def, PipelineSubsetDefinition):
check.invariant(
pipeline_run.solids_to_execute == pipeline.solids_to_execute,
"Cannot execute PipelineRun with solids_to_execute {solids_to_execute} that "
"conflicts with pipeline subset {pipeline_solids_to_execute}.".format(
pipeline_solids_to_execute=str_format_set(pipeline.solids_to_execute),
solids_to_execute=str_format_set(pipeline_run.solids_to_execute),
),
)
else:
# when `execute_run` is directly called, the sub pipeline hasn't been created
# note that when we receive the solids to execute via PipelineRun, it won't support
# solid selection query syntax
pipeline = pipeline.subset_for_execution_from_existing_pipeline(
pipeline_run.solids_to_execute
)
known_state = None
if pipeline_run.parent_run_id and pipeline_run.step_keys_to_execute:
plan_snap = instance.get_execution_plan_snapshot(pipeline_run.execution_plan_snapshot_id)
known_state = plan_snap.initial_known_state
execution_plan = create_execution_plan(
pipeline,
run_config=pipeline_run.run_config,
mode=pipeline_run.mode,
step_keys_to_execute=pipeline_run.step_keys_to_execute,
known_state=known_state,
)
if is_memoized_run(pipeline_run.tags):
execution_plan = resolve_memoized_execution_plan(
execution_plan, pipeline_run.run_config, instance
)
_execute_run_iterable = ExecuteRunWithPlanIterable(
execution_plan=execution_plan,
iterator=pipeline_execution_iterator,
execution_context_manager=PipelineExecutionContextManager(
execution_plan=execution_plan,
pipeline_run=pipeline_run,
instance=instance,
run_config=pipeline_run.run_config,
raise_on_error=raise_on_error,
),
)
event_list = list(_execute_run_iterable)
pipeline_context = _execute_run_iterable.pipeline_context
# workaround for mem_io_manager to work in reconstruct_context, e.g. result.result_for_solid
# in-memory values dict will get lost when the resource is re-initiated in reconstruct_context
# so instead of re-initiating every single resource, we pass the resource instances to
# reconstruct_context directly to avoid re-building from resource def.
resource_instances_to_override = {}
if pipeline_context: # None if we have a pipeline failure
for (
key,
resource_instance,
) in pipeline_context.scoped_resources_builder.resource_instance_dict.items():
if isinstance(resource_instance, InMemoryIOManager):
resource_instances_to_override[key] = resource_instance
return PipelineExecutionResult(
pipeline.get_definition(),
pipeline_run.run_id,
event_list,
lambda hardcoded_resources_arg: scoped_pipeline_context(
execution_plan,
pipeline_run.run_config,
pipeline_run,
instance,
intermediate_storage=pipeline_context.intermediate_storage,
resource_instances_to_override=hardcoded_resources_arg,
),
resource_instances_to_override=resource_instances_to_override,
)
[docs]def execute_pipeline_iterator(
pipeline: Union[PipelineDefinition, IPipeline],
run_config: Optional[dict] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
solid_selection: Optional[List[str]] = None,
instance: Optional[DagsterInstance] = None,
) -> Iterator[DagsterEvent]:
"""Execute a pipeline iteratively.
Rather than package up the result of running a pipeline into a single object, like
:py:func:`execute_pipeline`, this function yields the stream of events resulting from pipeline
execution.
This is intended to allow the caller to handle these events on a streaming basis in whatever
way is appropriate.
Parameters:
pipeline (Union[IPipeline, PipelineDefinition]): The pipeline to execute.
run_config (Optional[dict]): The environment configuration that parametrizes this run,
as a dict.
mode (Optional[str]): The name of the pipeline mode to use. You may not set both ``mode``
and ``preset``.
preset (Optional[str]): The name of the pipeline preset to use. You may not set both
``mode`` and ``preset``.
tags (Optional[Dict[str, Any]]): Arbitrary key-value pairs that will be added to pipeline
logs.
solid_selection (Optional[List[str]]): A list of solid selection queries (including single
solid names) to execute. For example:
- ``['some_solid']``: selects ``some_solid`` itself.
- ``['*some_solid']``: select ``some_solid`` and all its ancestors (upstream dependencies).
- ``['*some_solid+++']``: select ``some_solid``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
- ``['*some_solid', 'other_solid_a', 'other_solid_b+']``: select ``some_solid`` and all its
ancestors, ``other_solid_a`` itself, and ``other_solid_b`` and its direct child solids.
instance (Optional[DagsterInstance]): The instance to execute against. If this is ``None``,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
Returns:
Iterator[DagsterEvent]: The stream of events resulting from pipeline execution.
"""
with ephemeral_instance_if_missing(instance) as execute_instance:
(
pipeline,
run_config,
mode,
tags,
solids_to_execute,
solid_selection,
) = _check_execute_pipeline_args(
pipeline=pipeline,
run_config=run_config,
mode=mode,
preset=preset,
tags=tags,
solid_selection=solid_selection,
)
pipeline_run = execute_instance.create_run_for_pipeline(
pipeline_def=pipeline.get_definition(),
run_config=run_config,
mode=mode,
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
tags=tags,
)
return execute_run_iterator(pipeline, pipeline_run, execute_instance)
@contextmanager
def ephemeral_instance_if_missing(
instance: Optional[DagsterInstance],
) -> Iterator[DagsterInstance]:
if instance:
yield instance
else:
with DagsterInstance.ephemeral() as ephemeral_instance:
yield ephemeral_instance
[docs]def execute_pipeline(
pipeline: Union[PipelineDefinition, IPipeline],
run_config: Optional[dict] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
solid_selection: Optional[List[str]] = None,
instance: Optional[DagsterInstance] = None,
raise_on_error: bool = True,
) -> PipelineExecutionResult:
"""Execute a pipeline synchronously.
Users will typically call this API when testing pipeline execution, or running standalone
scripts.
Parameters:
pipeline (Union[IPipeline, PipelineDefinition]): The pipeline to execute.
run_config (Optional[dict]): The environment configuration that parametrizes this run,
as a dict.
mode (Optional[str]): The name of the pipeline mode to use. You may not set both ``mode``
and ``preset``.
preset (Optional[str]): The name of the pipeline preset to use. You may not set both
``mode`` and ``preset``.
tags (Optional[Dict[str, Any]]): Arbitrary key-value pairs that will be added to pipeline
logs.
instance (Optional[DagsterInstance]): The instance to execute against. If this is ``None``,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``True``, since this is the most useful behavior in test.
solid_selection (Optional[List[str]]): A list of solid selection queries (including single
solid names) to execute. For example:
- ``['some_solid']``: selects ``some_solid`` itself.
- ``['*some_solid']``: select ``some_solid`` and all its ancestors (upstream dependencies).
- ``['*some_solid+++']``: select ``some_solid``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
- ``['*some_solid', 'other_solid_a', 'other_solid_b+']``: select ``some_solid`` and all its
ancestors, ``other_solid_a`` itself, and ``other_solid_b`` and its direct child solids.
Returns:
:py:class:`PipelineExecutionResult`: The result of pipeline execution.
For the asynchronous version, see :py:func:`execute_pipeline_iterator`.
"""
with ephemeral_instance_if_missing(instance) as execute_instance:
return _logged_execute_pipeline(
pipeline,
instance=execute_instance,
run_config=run_config,
mode=mode,
preset=preset,
tags=tags,
solid_selection=solid_selection,
raise_on_error=raise_on_error,
)
@telemetry_wrapper
def _logged_execute_pipeline(
pipeline: Union[IPipeline, PipelineDefinition],
instance: DagsterInstance,
run_config: Optional[dict] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
solid_selection: Optional[List[str]] = None,
raise_on_error: bool = True,
) -> PipelineExecutionResult:
check.inst_param(instance, "instance", DagsterInstance)
(
pipeline,
run_config,
mode,
tags,
solids_to_execute,
solid_selection,
) = _check_execute_pipeline_args(
pipeline=pipeline,
run_config=run_config,
mode=mode,
preset=preset,
tags=tags,
solid_selection=solid_selection,
)
log_repo_stats(instance=instance, pipeline=pipeline, source="execute_pipeline")
pipeline_run = instance.create_run_for_pipeline(
pipeline_def=pipeline.get_definition(),
run_config=run_config,
mode=mode,
solid_selection=solid_selection,
solids_to_execute=solids_to_execute,
tags=tags,
)
return execute_run(pipeline, pipeline_run, instance, raise_on_error=raise_on_error)
[docs]def reexecute_pipeline(
pipeline: Union[IPipeline, PipelineDefinition],
parent_run_id: str,
run_config: Optional[dict] = None,
step_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
instance: DagsterInstance = None,
raise_on_error: bool = True,
) -> PipelineExecutionResult:
"""Reexecute an existing pipeline run.
Users will typically call this API when testing pipeline reexecution, or running standalone
scripts.
Parameters:
pipeline (Union[IPipeline, PipelineDefinition]): The pipeline to execute.
parent_run_id (str): The id of the previous run to reexecute. The run must exist in the
instance.
run_config (Optional[dict]): The environment configuration that parametrizes this run,
as a dict.
solid_selection (Optional[List[str]]): A list of solid selection queries (including single
solid names) to execute. For example:
- ``['some_solid']``: selects ``some_solid`` itself.
- ``['*some_solid']``: select ``some_solid`` and all its ancestors (upstream dependencies).
- ``['*some_solid+++']``: select ``some_solid``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
- ``['*some_solid', 'other_solid_a', 'other_solid_b+']``: select ``some_solid`` and all its
ancestors, ``other_solid_a`` itself, and ``other_solid_b`` and its direct child solids.
mode (Optional[str]): The name of the pipeline mode to use. You may not set both ``mode``
and ``preset``.
preset (Optional[str]): The name of the pipeline preset to use. You may not set both
``mode`` and ``preset``.
tags (Optional[Dict[str, Any]]): Arbitrary key-value pairs that will be added to pipeline
logs.
instance (Optional[DagsterInstance]): The instance to execute against. If this is ``None``,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur.
Defaults to ``True``, since this is the most useful behavior in test.
Returns:
:py:class:`PipelineExecutionResult`: The result of pipeline execution.
For the asynchronous version, see :py:func:`reexecute_pipeline_iterator`.
"""
check.opt_list_param(step_selection, "step_selection", of_type=str)
check.str_param(parent_run_id, "parent_run_id")
with ephemeral_instance_if_missing(instance) as execute_instance:
(pipeline, run_config, mode, tags, _, _) = _check_execute_pipeline_args(
pipeline=pipeline,
run_config=run_config,
mode=mode,
preset=preset,
tags=tags,
)
parent_pipeline_run = execute_instance.get_run_by_id(parent_run_id)
check.invariant(
parent_pipeline_run,
"No parent run with id {parent_run_id} found in instance.".format(
parent_run_id=parent_run_id
),
)
step_keys_to_execute: Optional[List[str]] = None
execution_plan: Optional[ExecutionPlan] = None
# resolve step selection DSL queries using parent execution information
if step_selection:
step_keys_to_execute, execution_plan = _resolve_reexecute_step_selection(
execute_instance,
pipeline,
mode,
run_config,
parent_pipeline_run,
step_selection,
)
pipeline_run = execute_instance.create_run_for_pipeline(
pipeline_def=pipeline.get_definition(),
execution_plan=execution_plan,
run_config=run_config,
mode=mode,
tags=tags,
solid_selection=parent_pipeline_run.solid_selection,
solids_to_execute=parent_pipeline_run.solids_to_execute,
# convert to frozenset https://github.com/dagster-io/dagster/issues/2914
step_keys_to_execute=list(step_keys_to_execute) if step_keys_to_execute else None,
root_run_id=parent_pipeline_run.root_run_id or parent_pipeline_run.run_id,
parent_run_id=parent_pipeline_run.run_id,
)
return execute_run(
pipeline,
pipeline_run,
execute_instance,
raise_on_error=raise_on_error,
)
[docs]def reexecute_pipeline_iterator(
pipeline: Union[IPipeline, PipelineDefinition],
parent_run_id: str,
run_config: Optional[dict] = None,
step_selection: Optional[List[str]] = None,
mode: Optional[str] = None,
preset: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
instance: DagsterInstance = None,
) -> Iterator[DagsterEvent]:
"""Reexecute a pipeline iteratively.
Rather than package up the result of running a pipeline into a single object, like
:py:func:`reexecute_pipeline`, this function yields the stream of events resulting from pipeline
reexecution.
This is intended to allow the caller to handle these events on a streaming basis in whatever
way is appropriate.
Parameters:
pipeline (Union[IPipeline, PipelineDefinition]): The pipeline to execute.
parent_run_id (str): The id of the previous run to reexecute. The run must exist in the
instance.
run_config (Optional[dict]): The environment configuration that parametrizes this run,
as a dict.
solid_selection (Optional[List[str]]): A list of solid selection queries (including single
solid names) to execute. For example:
- ``['some_solid']``: selects ``some_solid`` itself.
- ``['*some_solid']``: select ``some_solid`` and all its ancestors (upstream dependencies).
- ``['*some_solid+++']``: select ``some_solid``, all its ancestors, and its descendants
(downstream dependencies) within 3 levels down.
- ``['*some_solid', 'other_solid_a', 'other_solid_b+']``: select ``some_solid`` and all its
ancestors, ``other_solid_a`` itself, and ``other_solid_b`` and its direct child solids.
mode (Optional[str]): The name of the pipeline mode to use. You may not set both ``mode``
and ``preset``.
preset (Optional[str]): The name of the pipeline preset to use. You may not set both
``mode`` and ``preset``.
tags (Optional[Dict[str, Any]]): Arbitrary key-value pairs that will be added to pipeline
logs.
instance (Optional[DagsterInstance]): The instance to execute against. If this is ``None``,
an ephemeral instance will be used, and no artifacts will be persisted from the run.
Returns:
Iterator[DagsterEvent]: The stream of events resulting from pipeline reexecution.
"""
check.opt_list_param(step_selection, "step_selection", of_type=str)
check.str_param(parent_run_id, "parent_run_id")
with ephemeral_instance_if_missing(instance) as execute_instance:
(pipeline, run_config, mode, tags, _, _) = _check_execute_pipeline_args(
pipeline=pipeline,
run_config=run_config,
mode=mode,
preset=preset,
tags=tags,
solid_selection=None,
)
parent_pipeline_run = execute_instance.get_run_by_id(parent_run_id)
check.invariant(
parent_pipeline_run,
"No parent run with id {parent_run_id} found in instance.".format(
parent_run_id=parent_run_id
),
)
step_keys_to_execute: Optional[List[str]] = None
execution_plan: Optional[ExecutionPlan] = None
# resolve step selection DSL queries using parent execution information
if step_selection:
step_keys_to_execute, execution_plan = _resolve_reexecute_step_selection(
execute_instance,
pipeline,
mode,
run_config,
parent_pipeline_run,
step_selection,
)
pipeline_run = execute_instance.create_run_for_pipeline(
pipeline_def=pipeline.get_definition(),
run_config=run_config,
execution_plan=execution_plan,
mode=mode,
tags=tags,
solid_selection=parent_pipeline_run.solid_selection,
solids_to_execute=parent_pipeline_run.solids_to_execute,
# convert to frozenset https://github.com/dagster-io/dagster/issues/2914
step_keys_to_execute=list(step_keys_to_execute) if step_keys_to_execute else None,
root_run_id=parent_pipeline_run.root_run_id or parent_pipeline_run.run_id,
parent_run_id=parent_pipeline_run.run_id,
)
return execute_run_iterator(pipeline, pipeline_run, execute_instance)
def execute_plan_iterator(
execution_plan: ExecutionPlan,
pipeline_run: PipelineRun,
instance: DagsterInstance,
retry_mode: Optional[RetryMode] = None,
run_config: Optional[dict] = None,
) -> Iterator[DagsterEvent]:
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
check.inst_param(instance, "instance", DagsterInstance)
retry_mode = check.opt_inst_param(retry_mode, "retry_mode", RetryMode, RetryMode.DISABLED)
run_config = check.opt_dict_param(run_config, "run_config")
return iter(
ExecuteRunWithPlanIterable(
execution_plan=execution_plan,
iterator=inner_plan_execution_iterator,
execution_context_manager=PlanExecutionContextManager(
retry_mode=retry_mode,
execution_plan=execution_plan,
run_config=run_config,
pipeline_run=pipeline_run,
instance=instance,
raise_on_error=False,
),
)
)
def execute_plan(
execution_plan: ExecutionPlan,
instance: DagsterInstance,
pipeline_run: PipelineRun,
run_config: Optional[Dict] = None,
retry_mode: Optional[RetryMode] = None,
) -> List[DagsterEvent]:
"""This is the entry point of dagster-graphql executions. For the dagster CLI entry point, see
execute_pipeline() above.
"""
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
check.inst_param(instance, "instance", DagsterInstance)
check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_inst_param(retry_mode, "retry_mode", RetryMode)
return list(
execute_plan_iterator(
execution_plan=execution_plan,
run_config=run_config,
pipeline_run=pipeline_run,
instance=instance,
retry_mode=retry_mode,
)
)
def _check_pipeline(pipeline: Union[PipelineDefinition, IPipeline]) -> IPipeline:
# backcompat
if isinstance(pipeline, PipelineDefinition):
pipeline = InMemoryPipeline(pipeline)
check.inst_param(pipeline, "pipeline", IPipeline)
return pipeline
def create_execution_plan(
pipeline: Union[IPipeline, PipelineDefinition],
run_config: Optional[dict] = None,
mode: Optional[str] = None,
step_keys_to_execute: Optional[List[str]] = None,
known_state: KnownExecutionState = None,
) -> ExecutionPlan:
pipeline = _check_pipeline(pipeline)
pipeline_def = pipeline.get_definition()
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config", key_type=str)
mode = check.opt_str_param(mode, "mode", default=pipeline_def.get_default_mode_name())
check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str)
environment_config = EnvironmentConfig.build(pipeline_def, run_config, mode=mode)
return ExecutionPlan.build(
pipeline,
environment_config,
mode=mode,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
)
def pipeline_execution_iterator(
pipeline_context: SystemPipelineExecutionContext, execution_plan: ExecutionPlan
) -> Iterator[DagsterEvent]:
"""A complete execution of a pipeline. Yields pipeline start, success,
and failure events.
Args:
pipeline_context (SystemPipelineExecutionContext):
execution_plan (ExecutionPlan):
"""
check.inst_param(pipeline_context, "pipeline_context", SystemPipelineExecutionContext)
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
yield DagsterEvent.pipeline_start(pipeline_context)
pipeline_exception_info = None
pipeline_canceled_info = None
failed_steps = []
generator_closed = False
try:
for event in pipeline_context.executor.execute(pipeline_context, execution_plan):
if event.is_step_failure:
failed_steps.append(event.step_key)
yield event
except GeneratorExit:
# Shouldn't happen, but avoid runtime-exception in case this generator gets GC-ed
# (see https://amir.rachum.com/blog/2017/03/03/generator-cleanup/).
generator_closed = True
pipeline_exception_info = serializable_error_info_from_exc_info(sys.exc_info())
raise
except (KeyboardInterrupt, DagsterExecutionInterruptedError):
pipeline_canceled_info = serializable_error_info_from_exc_info(sys.exc_info())
raise
except Exception: # pylint: disable=broad-except
pipeline_exception_info = serializable_error_info_from_exc_info(sys.exc_info())
raise # finally block will run before this is re-raised
finally:
if pipeline_canceled_info:
reloaded_run = pipeline_context.instance.get_run_by_id(pipeline_context.run_id)
if reloaded_run and reloaded_run.status == PipelineRunStatus.CANCELING:
event = DagsterEvent.pipeline_canceled(pipeline_context, pipeline_canceled_info)
else:
event = DagsterEvent.pipeline_failure(
pipeline_context,
"Execution was interrupted unexpectedly. "
"No user initiated termination request was found, treating as failure.",
pipeline_canceled_info,
)
elif pipeline_exception_info:
event = DagsterEvent.pipeline_failure(
pipeline_context,
"An exception was thrown during execution.",
pipeline_exception_info,
)
elif failed_steps:
event = DagsterEvent.pipeline_failure(
pipeline_context,
"Steps failed: {}.".format(failed_steps),
)
else:
event = DagsterEvent.pipeline_success(pipeline_context)
if not generator_closed:
yield event
class ExecuteRunWithPlanIterable:
"""Utility class to consolidate execution logic.
This is a class and not a function because, e.g., in constructing a `scoped_pipeline_context`
for `PipelineExecutionResult`, we need to pull out the `pipeline_context` after we're done
yielding events. This broadly follows a pattern we make use of in other places,
cf. `dagster.utils.EventGenerationManager`.
"""
def __init__(self, execution_plan, iterator, execution_context_manager):
self.execution_plan = check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
self.iterator = check.callable_param(iterator, "iterator")
self.execution_context_manager = check.inst_param(
execution_context_manager, "execution_context_manager", ExecutionContextManager
)
self.pipeline_context = None
def __iter__(self):
# Since interrupts can't be raised at arbitrary points safely, delay them until designated
# checkpoints during the execution.
# To be maximally certain that interrupts are always caught during an execution process,
# you can safely add an additional `with capture_interrupts()` at the very beginning of the
# process that performs the execution.
with capture_interrupts():
yield from self.execution_context_manager.prepare_context()
self.pipeline_context = self.execution_context_manager.get_context()
generator_closed = False
try:
if self.pipeline_context: # False if we had a pipeline init failure
yield from self.iterator(
execution_plan=self.execution_plan,
pipeline_context=self.pipeline_context,
)
except GeneratorExit:
# Shouldn't happen, but avoid runtime-exception in case this generator gets GC-ed
# (see https://amir.rachum.com/blog/2017/03/03/generator-cleanup/).
generator_closed = True
raise
finally:
for event in self.execution_context_manager.shutdown_context():
if not generator_closed:
yield event
def _check_execute_pipeline_args(
pipeline: Union[PipelineDefinition, IPipeline],
run_config: Optional[dict],
mode: Optional[str],
preset: Optional[str],
tags: Optional[Dict[str, Any]],
solid_selection: Optional[List[str]] = None,
) -> Tuple[
IPipeline,
Optional[dict],
Optional[str],
Dict[str, Any],
FrozenSet[str],
Optional[List[str]],
]:
pipeline = _check_pipeline(pipeline)
pipeline_def = pipeline.get_definition()
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition)
run_config = check.opt_dict_param(run_config, "run_config")
check.opt_str_param(mode, "mode")
check.opt_str_param(preset, "preset")
check.invariant(
not (mode is not None and preset is not None),
"You may set only one of `mode` (got {mode}) or `preset` (got {preset}).".format(
mode=mode, preset=preset
),
)
tags = check.opt_dict_param(tags, "tags", key_type=str)
check.opt_list_param(solid_selection, "solid_selection", of_type=str)
if preset is not None:
pipeline_preset = pipeline_def.get_preset(preset)
if pipeline_preset.run_config is not None:
check.invariant(
(not run_config) or (pipeline_preset.run_config == run_config),
"The environment set in preset '{preset}' does not agree with the environment "
"passed in the `run_config` argument.".format(preset=preset),
)
run_config = pipeline_preset.run_config
# load solid_selection from preset
if pipeline_preset.solid_selection is not None:
check.invariant(
solid_selection is None or solid_selection == pipeline_preset.solid_selection,
"The solid_selection set in preset '{preset}', {preset_subset}, does not agree with "
"the `solid_selection` argument: {solid_selection}".format(
preset=preset,
preset_subset=pipeline_preset.solid_selection,
solid_selection=solid_selection,
),
)
solid_selection = pipeline_preset.solid_selection
check.invariant(
mode is None or mode == pipeline_preset.mode,
"Mode {mode} does not agree with the mode set in preset '{preset}': "
"('{preset_mode}')".format(preset=preset, preset_mode=pipeline_preset.mode, mode=mode),
)
mode = pipeline_preset.mode
tags = merge_dicts(pipeline_preset.tags, tags)
if mode is not None:
if not pipeline_def.has_mode_definition(mode):
raise DagsterInvariantViolationError(
(
"You have attempted to execute pipeline {name} with mode {mode}. "
"Available modes: {modes}"
).format(
name=pipeline_def.name,
mode=mode,
modes=pipeline_def.available_modes,
)
)
else:
if pipeline_def.is_multi_mode:
raise DagsterInvariantViolationError(
(
"Pipeline {name} has multiple modes (Available modes: {modes}) and you have "
"attempted to execute it without specifying a mode. Set "
"mode property on the PipelineRun object."
).format(name=pipeline_def.name, modes=pipeline_def.available_modes)
)
mode = pipeline_def.get_default_mode_name()
tags = merge_dicts(pipeline_def.tags, tags)
# generate pipeline subset from the given solid_selection
if solid_selection:
pipeline = pipeline.subset_for_execution(solid_selection)
return (
pipeline,
run_config,
mode,
tags,
pipeline.solids_to_execute,
solid_selection,
)
def _resolve_reexecute_step_selection(
instance: DagsterInstance,
pipeline: IPipeline,
mode: Optional[str],
run_config: Optional[dict],
parent_pipeline_run: PipelineRun,
step_selection: List[str],
) -> Tuple[List[str], ExecutionPlan]:
parent_logs = instance.all_logs(parent_pipeline_run.run_id)
parent_plan = create_execution_plan(
pipeline,
parent_pipeline_run.run_config,
mode,
known_state=KnownExecutionState.derive_from_logs(parent_logs),
)
step_keys_to_execute = parse_step_selection(parent_plan.get_all_step_deps(), step_selection)
execution_plan = create_execution_plan(
pipeline,
run_config,
mode,
known_state=KnownExecutionState.for_reexecution(parent_logs, step_keys_to_execute),
)
return step_keys_to_execute, execution_plan