Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Handle nested folders as data entity. #34

Merged
merged 7 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions odd_collector_aws/adapters/s3/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from odd_collector_aws.use_cases.s3_dataset_use_case import S3DatasetUseCase
from odd_collector_aws.use_cases.s3_use_case import S3UseCase
from odd_collector_aws.utils.create_generator import create_generator
from odd_collector_aws.utils.handle_nested_structure import HandleNestedStructure
from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_models.models import DataEntityList
from oddrn_generator.generators import S3Generator
from funcy import lpluck_attr, lconcat

from .logger import logger

Expand All @@ -17,21 +19,31 @@ def __init__(self, config: S3Plugin) -> None:
self.__datasets = config.datasets

self._oddrn_generator = create_generator(S3Generator, config)

dataset_client = S3DatasetService(S3Client(config))
self.s3_client = S3Client(config)
dataset_client = S3DatasetService(self.s3_client)
dataset_use_case = S3DatasetUseCase(dataset_client)

self.s3_use_case = S3UseCase(dataset_use_case, self._oddrn_generator)
self.handle_obj = HandleNestedStructure()
except Exception:
logger.debug("Error during initialization adapter", exc_info=True)

def get_data_source_oddrn(self) -> str:
return self._oddrn_generator.get_data_source_oddrn()

def get_data_entity_list(self) -> DataEntityList:
entities = lconcat(self._get_entities())
list_of_oddrns = lpluck_attr("oddrn", entities)
folder_data_entities = self.handle_obj.get_all_data_entities(
list(reversed(list_of_oddrns)),
self.__datasets,
self._oddrn_generator,
self.s3_client.s3_folders
)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=list(self._get_entities()),
items=entities + folder_data_entities,
)

def _get_entities(self):
Expand Down
2 changes: 2 additions & 0 deletions odd_collector_aws/adapters/s3/clients/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, config: AwsPlugin):

self.s3 = AwsClient(config).get_client("s3")
self.fs = FileSystem(config)
self.s3_folders = {}

def get_list_files(self, bucket: str, prefix: str) -> List[S3Object]:
"""
Expand All @@ -26,6 +27,7 @@ def get_list_files(self, bucket: str, prefix: str) -> List[S3Object]:
objects = self.s3.list_objects_v2(Bucket=bucket, Prefix=prefix.lstrip("/"))[
"Contents"
]
self.s3_folders[prefix] = [e['Key'] for e in objects if not e.get("Size") and e['Key'].endswith('/')]
except KeyError as e:
raise EmptyFolderError(f"{bucket}/{prefix}") from e
else:
Expand Down
143 changes: 143 additions & 0 deletions odd_collector_aws/utils/handle_nested_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from collections import defaultdict
from typing import Dict, List
from funcy import concat, lpluck_attr

from odd_models.models import DataEntity, DataEntityGroup, DataEntityType
from oddrn_generator.generators import Generator
from oddrn_generator.utils import unescape, DELIMITER


class HandleNestedStructure:

def get_all_data_entities(
self,
list_of_oddrns: List[str],
datasets: List,
oddrn_generator: Generator,
s3_folders: Dict[str, List[str]]
) -> List[DataEntity]:
"""Get all data entity according to datasets."""
result = []
for dataset in datasets:
result += self.get_data_entity_list(
dataset.bucket, dataset.path, oddrn_generator, list_of_oddrns, s3_folders[dataset.path]
)
return result

def get_data_entity_list(
self,
bucket: str,
s3_path: str,
oddrn_generator: Generator,
list_of_oddrns: List[str],
s3_folders: List[str]
) -> List[DataEntity]:
"""Get combined list with DataEntity objects."""
oddrn_generator.set_oddrn_paths(
buckets=bucket,
keys=s3_path
)
oddrn_by_path = oddrn_generator.get_oddrn_by_path('keys')
parent_oddrn = unescape(oddrn_by_path)
filtered_list_of_oddrns = list(
filter(lambda s: s.startswith(oddrn_by_path), list_of_oddrns)
)
folder_files_map = self._parse_oddrns(filtered_list_of_oddrns, s3_path, oddrn_by_path, s3_folders)
generated_data_entities = self._generate_data_entity(
folder_files_map, s3_path, parent_oddrn, oddrn_generator
)
result = self._combine_data_entities(generated_data_entities, parent_oddrn)
return result

def _generate_folder_entity(self, path: str, entities: List[str], oddrn_generator: Generator):
"""Generate DataEntity by given args."""
data_entity = DataEntity(
oddrn=oddrn_generator.get_oddrn_by_path('keys', path),
name=path,
type=DataEntityType.FILE,
data_entity_group=DataEntityGroup(entities_list=entities)
)
return data_entity

def _parse_oddrns(
self,
oddrns: List[str],
s3_path: str,
main_oddrn: str,
s3_folders: List[str]
) -> Dict[str, List[str]]:
"""
Create dictionary from list of created oddrns from files,
where `key` is path, and `value` is a list of files
related to the same path.
@return {'path_to_folder': ['oddrn_of_file']}
"""
result = defaultdict(list)
second_part_of_oddrn = [unescape(oddrn.split(main_oddrn)[-1]) for oddrn in oddrns]
for i in second_part_of_oddrn:
index = second_part_of_oddrn.index(i)
if len(i.split(DELIMITER)) == 1:
result[s3_path].append(oddrns[index])
else:
parent_path, _ = i.rsplit(DELIMITER, 1)
new_path = f"{s3_path}{parent_path}/"
result[new_path].append(oddrns[index])

for folder in s3_folders:
if folder not in list(result.keys()):
result[folder] = []

return result

def _generate_data_entity(
self,
folder_files_map: Dict[str, List[str]],
s3_path: str,
parent_oddrn: str,
oddrn_generator: Generator
) -> List[DataEntity]:
"""
Create the list of DataEntity objects.
@param: entities {'path_to_folder': ['oddrn_of_file']}
@return: [DataEntity,...,DataEntity]
"""
result = []
previous_path = ''
for folder_path in folder_files_map:
oddrn_of_files = folder_files_map[folder_path].copy()
if folder_path == s3_path:
list_of_oddrns = list(reversed(lpluck_attr("oddrn", concat(result))))
oddrn_of_sub_folders = [oddrn for oddrn in list_of_oddrns if
self._has_one_delimiter(oddrn, parent_oddrn)]
oddrn_of_files += oddrn_of_sub_folders
previous_path = ''
if folder_path in previous_path:
oddrn_of_files.append(previous_path)
current_entity = self._generate_folder_entity(folder_path, oddrn_of_files, oddrn_generator)
result.append(current_entity)
previous_path = current_entity.oddrn

return result

def _has_one_delimiter(self, oddrn: str, parent_oddrn: str) -> bool:
max_counter = 1
return oddrn.split(parent_oddrn)[1].count(DELIMITER) == max_counter

def _combine_data_entities(
self,
data_entities: List[DataEntity],
parent_oddrn: str
) -> List[DataEntity]:
"""Combine oddrn from nested folder with parent."""
oddrns = lpluck_attr("oddrn", concat(data_entities))
step = 2
for oddrn in oddrns:
if oddrn == parent_oddrn:
continue
else:
make_path = oddrn.rsplit(DELIMITER, step)[0] + DELIMITER
get_index = oddrns.index(make_path)
if oddrn not in data_entities[get_index].data_entity_group.entities_list:
data_entities[get_index].data_entity_group.entities_list.append(oddrn)

return data_entities
Loading