
from dagster import Field, StringSource, intermediate_storage
from dagster.core.storage.system_storage import (
    build_intermediate_storage_from_object_store,
    fs_intermediate_storage,
    mem_intermediate_storage,
)
from .object_store import GCSObjectStore
[docs]@intermediate_storage(
    name="gcs",
    is_persistent=True,
    config_schema={
        "gcs_bucket": Field(StringSource),
        "gcs_prefix": Field(StringSource, is_required=False, default_value="dagster"),
    },
    required_resource_keys={"gcs"},
)
def gcs_intermediate_storage(init_context):
    client = init_context.resources.gcs
    gcs_bucket = init_context.intermediate_storage_config["gcs_bucket"]
    gcs_prefix = init_context.intermediate_storage_config["gcs_prefix"]
    object_store = GCSObjectStore(gcs_bucket, client=client)
    def root_for_run_id(r_id):
        return object_store.key_for_paths([gcs_prefix, "storage", r_id])
    return build_intermediate_storage_from_object_store(
        object_store, init_context, root_for_run_id=root_for_run_id
    )
gcs_plus_default_intermediate_storage_defs = [
    mem_intermediate_storage,
    fs_intermediate_storage,
    gcs_intermediate_storage,
]