Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat(s3): use bucket name for data_source_oddrn (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir authored Jul 20, 2023
1 parent a541284 commit 9635d71
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 509 deletions.
22 changes: 15 additions & 7 deletions collector_config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
default_pulling_interval: 10
token:
# Description: Configuration file for the collector. Find more examples in config_examples folder.

default_pulling_interval: 60
token: <token>
platform_host_url: "http://localhost:8080"
plugins:
- type: s3
name: s3_adapter
aws_access_key_id:
aws_secret_access_key:
datasets:
- bucket: my_bucket
prefix: prefix
# aws_access_key_id: <aws_access_key_id>
# aws_secret_access_key: <aws_secret_access_key>
# aws_region: <aws_region>
# aws_session_token: <aws_session_token>
# aws_role_arn: <aws_role_arn>
# aws_role_session_name: <aws_role_session_name>
# profile_name: <profile_name>
# endpoint_url: <endpoint_url>
dataset_config:
- bucket: <bucket_name>
prefix: <optional_prefix>
64 changes: 30 additions & 34 deletions config_examples/s3.yaml
Original file line number Diff line number Diff line change
@@ -1,53 +1,49 @@
# S3 collector-config.yaml example
# Note: The following example is for AWS S3. For S3 compatible storage, see the example below.
# All AWS S3 parameters are optional according to default behavior of boto3.
# If not provided, boto3 will search for credentials in environment variables, ~/.aws/credentials and ~/.aws/config

platform_host_url: http://localhost:8080
default_pulling_interval: 10 # Can be omitted to run collector once
default_pulling_interval: 60 # Pulling interval in minutes. Can be omitted to run collector once
token: "" # Token that must be retrieved from the platform
plugins:
- type: s3
name: s3_adapter
aws_secret_access_key: <aws_secret_access_key> # Optional.
aws_access_key_id: <aws_access_key_id> # Optional.
aws_session_token: <aws_session_token> # Optional.
aws_session_token: <aws_session_token> # Optional. Required if using temporary credentials.
aws_region: <aws_region> # Optional.
aws_role_arn: <aws_role_arn> # Optional. Required for assuming role with temporary credentials.
aws_role_session_name: <aws_role_session_name> # Optional. Required for assuming role with temporary credentials.
profile_name: <profile_name> # Optional.
filename_filter: # Optional. Default filter allows each file to be ingested to platform.
include: [ '.*.parquet' ]
exclude: [ 'dev_.*' ]
datasets:
# Recursive fetch for all objects in the bucket.
- bucket: my_bucket
# Explicitly specify the prefix to file.
- bucket: my_bucket
prefix: folder/subfolder/file.csv
# When we want to use the folder as a dataset. Very useful for partitioned datasets.
# I.e it can be Hive partitioned dataset with structure like this:
# s3://my_bucket/partitioned_data/year=2019/month=01/...
- bucket: my_bucket
prefix: partitioned_data/
folder_as_dataset:
file_format: parquet
flavor: hive

#field_names must be provided if partition flavor was not used. I.e for structure like this:
# s3://my_bucket/partitioned_data/year/...
- bucket: my_bucket
prefix: partitioned_data/
folder_as_dataset:
file_format: csv
field_names: ['year']

# S3 compatible collector-config.yaml example, for example for Minio we need to specify endpoint_url
platform_host_url: "http://localhost:8080"
default_pulling_interval: 10
token: ""
plugins:
dataset_config:
bucket: my_bucket
prefix: folder/subfolder/file.csv # Optional. Default is empty string.
# When we want to use the folder as a dataset. Very useful for partitioned datasets.
- type: s3
name: s3_partitioned_adapter
aws_secret_access_key: <aws_secret_access_key> # Optional.
aws_access_key_id: <aws_access_key_id> # Optional.
aws_session_token: <aws_session_token> # Optional. Required if using temporary credentials.
aws_region: <aws_region> # Optional.
aws_role_arn: <aws_role_arn> # Optional. Required for assuming role with temporary credentials.
aws_role_session_name: <aws_role_session_name> # Optional. Required for assuming role with temporary credentials.
profile_name: <profile_name> # Optional.
dataset_config:
bucket: my_bucket
prefix: partitioned_data/
folder_as_dataset:
file_format: parquet # Format of the files in the folder. Can be parquet csv, tsv.
flavor: hive # Optional. Default is hive. Can be hive or presto.
field_names: ['year', 'month'] # Optional. Must be provided if flavor is other than hive. I.e. structure s3://my_bucket/partitioned_data/year/...
# When S3 storage is compatible with AWS S3 API, for example Minio.
- type: s3
name: s3_minio_adapter
endpoint_url: http://localhost:9000
aws_secret_access_key: minioadmin
aws_access_key_id: minioadmin
datasets:
- bucket: my_bucket
prefix: partitioned_data
dataset_config:
bucket: my_bucket

36 changes: 12 additions & 24 deletions odd_collector_aws/adapters/s3/adapter.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,32 @@
import traceback as tb
from typing import Iterable, Union

from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_collector_sdk.domain.adapter import BaseAdapter
from odd_models.models import DataEntityList
from oddrn_generator.generators import Generator, S3Generator

from odd_collector_aws.domain.plugin import S3Plugin
from odd_collector_aws.utils.create_generator import create_generator

from .file_system import FileSystem
from .logger import logger
from .mapper.bucket import map_bucket


class Adapter(AbstractAdapter):
class Adapter(BaseAdapter):
config: S3Plugin
generator: Union[Generator, S3Generator]

def __init__(self, config: S3Plugin) -> None:
self.config = config
self.generator = create_generator(S3Generator, config)
super().__init__(config)
self.fs = FileSystem(config)

def get_data_source_oddrn(self) -> str:
return self.generator.get_data_source_oddrn()
def create_generator(self) -> Generator:
return create_generator(S3Generator, self.config)

def get_data_entity_list(self) -> Iterable[DataEntityList]:
for dataset_config in self.config.datasets:
try:
bucket = self.fs.get_bucket(dataset_config)
data_entities = map_bucket(bucket, self.generator)

yield DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=list(data_entities),
)
except Exception as e:
logger.error(
f"Error while processing bucket {dataset_config.bucket}: {e}."
" SKIPPING."
)
logger.debug(tb.format_exc())
continue
bucket = self.fs.get_bucket(self.config.dataset_config)
data_entities = map_bucket(bucket, self.generator)

yield DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=list(data_entities),
)
11 changes: 9 additions & 2 deletions odd_collector_aws/domain/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from odd_collector_sdk.domain.filter import Filter
from odd_collector_sdk.domain.plugin import Plugin
from odd_collector_sdk.types import PluginFactory
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator

from odd_collector_aws.domain.dataset_config import DatasetConfig

Expand Down Expand Up @@ -73,9 +73,16 @@ class S3DeltaPlugin(AwsPlugin):
class S3Plugin(AwsPlugin):
type: Literal["s3"]
endpoint_url: Optional[str] = None
datasets: list[DatasetConfig]
datasets: Optional[list[DatasetConfig]] = None
dataset_config: DatasetConfig
filename_filter: Optional[Filter] = Filter()

@validator("datasets", pre=True)
def validate_datasets(cls, v):
if v:
raise ValueError("datasets field is deprecated, use dataset_config instead")



class QuicksightPlugin(AwsPlugin):
type: Literal["quicksight"]
Expand Down
2 changes: 2 additions & 0 deletions odd_collector_aws/logger.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from odd_collector_sdk.logger import logger

logger = logger
13 changes: 8 additions & 5 deletions odd_collector_aws/utils/create_generator.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from typing import Type, TypeVar
from typing import Type, TypeVar, cast

from botocore.exceptions import ClientError
from oddrn_generator import Generator
from oddrn_generator.generators import S3CustomGenerator, S3Generator

from odd_collector_aws.aws.aws_client import Aws
from odd_collector_aws.domain.plugin import AwsPlugin
from odd_collector_aws.domain.plugin import AwsPlugin, S3Plugin
from odd_collector_aws.errors import AccountIdError

T = TypeVar("T", bound=Generator)
Expand All @@ -16,10 +16,13 @@ def create_generator(generator_cls: Type[T], aws_plugin: AwsPlugin) -> T:
aws_client = Aws(aws_plugin)

if generator_cls == S3Generator:
if aws_plugin.endpoint_url:
return S3CustomGenerator(endpoint=aws_plugin.endpoint_url)
config = cast(S3Plugin, aws_plugin)
bucket = config.dataset_config.bucket

return generator_cls()
if config.endpoint_url:
return S3CustomGenerator(endpoint=config.endpoint_url, buckets=bucket)

return generator_cls(buckets=bucket)

account_id = aws_plugin.aws_account_id

Expand Down
Loading

0 comments on commit 9635d71

Please sign in to comment.