import datetime
import great_expectations as ge
from dagster import (
    EventMetadataEntry,
    ExpectationResult,
    InputDefinition,
    Noneable,
    Output,
    OutputDefinition,
    StringSource,
    check,
    resource,
    solid,
)
from dagster_pandas import DataFrame
from great_expectations.render.page_renderer_util import (
    render_multiple_validation_result_pages_markdown,
)
try:
    # ge < v0.13.0
    from great_expectations.core import convert_to_json_serializable
except ImportError:
    # ge >= v0.13.0
    from great_expectations.core.util import convert_to_json_serializable
@resource(config_schema={"ge_root_dir": Noneable(StringSource)})
def ge_data_context(context):
    if context.resource_config["ge_root_dir"] is None:
        yield ge.data_context.DataContext()
    else:
        yield ge.data_context.DataContext(context_root_dir=context.resource_config["ge_root_dir"])
[docs]def ge_validation_solid_factory(
    name,
    datasource_name,
    suite_name,
    validation_operator_name=None,
    input_dagster_type=DataFrame,
    batch_kwargs=None,
):
    """
        Generates solids for interacting with GE.
    Args:
        name (str): the name of the solid
        datasource_name (str): the name of your DataSource, see your great_expectations.yml
        suite_name (str): the name of your expectation suite, see your great_expectations.yml
        validation_operator_name (Optional[str]): what validation operator to run  -- defaults to None,
                    which generates an ephemeral validator.
                    If you want to save data docs, use 'action_list_operator'.
                    See https://docs.greatexpectations.io/en/latest/reference/core_concepts/validation_operators_and_actions.html
        input_dagster_type (DagsterType): the Dagster type used to type check the input to the
                    solid. Defaults to `dagster_pandas.DataFrame`.
        batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the
                    `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`,
                    where `dataset` is the input to the generated solid.
    Returns:
        A solid that takes in a set of data and yields both an expectation with relevant metadata
        and an output with all the metadata (for user processing)
    """
    check.str_param(datasource_name, "datasource_name")
    check.str_param(suite_name, "suite_name")
    check.opt_str_param(validation_operator_name, "validation_operator_name")
    batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs")
    @solid(
        name=name,
        input_defs=[InputDefinition("dataset", input_dagster_type)],
        output_defs=[
            OutputDefinition(
                dagster_type=dict,
                description="""
        This solid yields an expectationResult with a structured dict of metadata from the GE suite,
        as well as the full result in case a user wants to process it differently.
        The structured dict contains both summary stats from the suite as well as expectation by expectation
        results/details.
        """,
            )
        ],
        required_resource_keys={"ge_data_context"},
        tags={"kind": "ge"},
    )
    def ge_validation_solid(context, dataset):
        data_context = context.resources.ge_data_context
        if validation_operator_name is not None:
            validation_operator = validation_operator_name
        else:
            data_context.add_validation_operator(
                "ephemeral_validation",
                {"class_name": "ActionListValidationOperator", "action_list": []},
            )
            validation_operator = "ephemeral_validation"
        suite = data_context.get_expectation_suite(suite_name)
        final_batch_kwargs = batch_kwargs or {"dataset": dataset}
        if "datasource" in batch_kwargs:
            context.log.warning(
                "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` "
                "parameter of the solid factory instead."
            )
        final_batch_kwargs["datasource"] = datasource_name
        batch = data_context.get_batch(final_batch_kwargs, suite)
        run_id = {
            "run_name": datasource_name + " run",
            "run_time": datetime.datetime.utcnow(),
        }
        results = data_context.run_validation_operator(
            validation_operator, assets_to_validate=[batch], run_id=run_id
        )
        res = convert_to_json_serializable(results.list_validation_results())[0]
        md_str = render_multiple_validation_result_pages_markdown(
            validation_operator_result=results,
            run_info_at_end=True,
        )
        meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results")
        yield ExpectationResult(
            success=res["success"],
            metadata_entries=[
                meta_stats,
            ],
        )
        yield Output(res)
    return ge_validation_solid