DagsterDocs

Source code for dagster_aws.s3.file_cache

import boto3
from botocore.exceptions import ClientError
from dagster import Field, check, resource
from dagster.core.storage.file_cache import FileCache

from .file_manager import S3FileHandle


[docs]class S3FileCache(FileCache): def __init__(self, s3_bucket, s3_key, s3_session, overwrite=False): super(S3FileCache, self).__init__(overwrite=overwrite) self.s3_bucket = s3_bucket self.s3_key = s3_key self.s3 = s3_session def has_file_object(self, file_key): check.str_param(file_key, "file_key") try: self.s3.get_object(Bucket=self.s3_bucket, Key=self.get_full_key(file_key)) except ClientError: return False return True def get_full_key(self, file_key): return "{base_key}/{file_key}".format(base_key=self.s3_key, file_key=file_key) def write_file_object(self, file_key, source_file_object): check.str_param(file_key, "file_key") self.s3.put_object( Body=source_file_object, Bucket=self.s3_bucket, Key=self.get_full_key(file_key) ) return self.get_file_handle(file_key) def get_file_handle(self, file_key): check.str_param(file_key, "file_key") return S3FileHandle(self.s3_bucket, self.get_full_key(file_key))
@resource( { "bucket": Field(str), "key": Field(str), "overwrite": Field(bool, is_required=False, default_value=False), } ) def s3_file_cache(init_context): return S3FileCache( s3_bucket=init_context.resource_config["bucket"], s3_key=init_context.resource_config["key"], overwrite=init_context.resource_config["overwrite"], # TODO: resource dependencies s3_session=boto3.resource("s3", use_ssl=True).meta.client, )