Source code for dagster.core.execution.context.compute
from abc import ABC, abstractmethod, abstractproperty
from typing import Any, Optional
from dagster import check
from dagster.core.definitions.dependency import Solid
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.solid import SolidDefinition
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.utils.forked_pdb import ForkedPdb
from .step import StepExecutionContext
from .system import SystemComputeExecutionContext
class AbstractComputeExecutionContext(ABC): # pylint: disable=no-init
"""Base class for solid context implemented by SolidExecutionContext and DagstermillExecutionContext"""
@abstractmethod
def has_tag(self, key) -> bool:
"""Implement this method to check if a logging tag is set."""
@abstractmethod
def get_tag(self, key: str) -> str:
"""Implement this method to get a logging tag."""
@abstractproperty
def run_id(self) -> str:
"""The run id for the context."""
@abstractproperty
def solid_def(self) -> SolidDefinition:
"""The solid definition corresponding to the execution step being executed."""
@abstractproperty
def solid(self) -> Solid:
"""The solid corresponding to the execution step being executed."""
@abstractproperty
def pipeline_def(self) -> PipelineDefinition:
"""The pipeline being executed."""
@abstractproperty
def pipeline_run(self) -> PipelineRun:
"""The PipelineRun object corresponding to the execution."""
@abstractproperty
def resources(self) -> Any:
"""Resources available in the execution context."""
@abstractproperty
def log(self) -> DagsterLogManager:
"""The log manager available in the execution context."""
@abstractproperty
def solid_config(self) -> Any:
"""The parsed config specific to this solid."""
[docs]class SolidExecutionContext(StepExecutionContext, AbstractComputeExecutionContext):
"""The ``context`` object available as the first argument to every solid's compute function.
Users should not instantiate this object directly.
Example:
.. code-block:: python
@solid
def hello_world(context: SolidExecutionContext):
context.log.info("Hello, world!")
"""
__slots__ = ["_system_compute_execution_context"]
def __init__(self, system_compute_execution_context: SystemComputeExecutionContext):
self._system_compute_execution_context = check.inst_param(
system_compute_execution_context,
"system_compute_execution_context",
SystemComputeExecutionContext,
)
self._pdb: Optional[ForkedPdb] = None
super(SolidExecutionContext, self).__init__(system_compute_execution_context)
@property
def solid_config(self) -> Any:
"""The parsed config specific to this solid."""
return self._system_compute_execution_context.solid_config
@property
def pipeline_run(self) -> PipelineRun:
"""PipelineRun: The current pipeline run"""
return self._system_compute_execution_context.pipeline_run
@property
def instance(self) -> DagsterInstance:
"""DagsterInstance: The current Dagster instance"""
return self._system_compute_execution_context.instance
@property
def pdb(self) -> ForkedPdb:
"""dagster.utils.forked_pdb.ForkedPdb: Gives access to pdb debugging from within the solid.
Example:
.. code-block:: python
@solid
def debug_solid(context):
context.pdb.set_trace()
"""
if self._pdb is None:
self._pdb = ForkedPdb()
return self._pdb