Skip to content

Commit

Permalink
fix: chunk initial symlinks on re-ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
dreulavelle committed Nov 14, 2024
1 parent 0a2c5a9 commit 2b0af33
Showing 1 changed file with 52 additions and 42 deletions.
94 changes: 52 additions & 42 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,51 +397,61 @@ def _init_db_from_symlinks(self):
errors = []
added_items = set()

progress, console = create_progress_bar(len(items))
task = progress.add_task("Enriching items with metadata", total=len(items), log="")
# Convert items to list and get total count
items_list = [item for item in items if isinstance(item, (Movie, Show))]
total_items = len(items_list)

progress, console = create_progress_bar(total_items)
task = progress.add_task("Enriching items with metadata", total=total_items, log="")

# Process in chunks of 100 items
chunk_size = 100
with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in items
if isinstance(item, (Movie, Show))
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

# Check for existing item using your db_functions
if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

progress.update(task, log="Finished Indexing Symlinks!")

for i in range(0, total_items, chunk_size):
chunk = items_list[i:i + chunk_size]

with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in chunk
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

# Commit after each chunk
session.commit()

progress.update(task, log="Finished Indexing Symlinks!")

if errors:
logger.error("Errors encountered during initialization")
Expand All @@ -452,4 +462,4 @@ def _init_db_from_symlinks(self):
total_seconds = elapsed_time.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")

0 comments on commit 2b0af33

Please sign in to comment.