diff --git a/src/program/program.py b/src/program/program.py index 6b9f6dcb..171194b2 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -393,63 +393,86 @@ def _init_db_from_symlinks(self): return logger.log("PROGRAM", "Collecting items from symlinks, this may take a while depending on library size") - items = self.services[SymlinkLibrary].run() - errors = [] - added_items = set() - - progress, console = create_progress_bar(len(items)) - task = progress.add_task("Enriching items with metadata", total=len(items), log="") - - with Live(progress, console=console, refresh_per_second=10): - workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4)) - with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor: - future_to_item = { - executor.submit(self._enhance_item, item): item - for item in items - if isinstance(item, (Movie, Show)) - } - - for future in as_completed(future_to_item): - item = future_to_item[future] - log_message = "" - + try: + items = self.services[SymlinkLibrary].run() + errors = [] + added_items = set() + + # Convert items to list and get total count + items_list = [item for item in items if isinstance(item, (Movie, Show))] + total_items = len(items_list) + + progress, console = create_progress_bar(total_items) + task = progress.add_task("Enriching items with metadata", total=total_items, log="") + + # Process in chunks of 100 items + chunk_size = 100 + with Live(progress, console=console, refresh_per_second=10): + workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4)) + + for i in range(0, total_items, chunk_size): + chunk = items_list[i:i + chunk_size] + try: - if not item or item.imdb_id in added_items: - errors.append(f"Duplicate symlink directory found for {item.log_string}") - continue - - # Check for existing item using your db_functions - if db_functions.get_item_by_id(item.id, session=session): - errors.append(f"Duplicate item found in database for id: {item.id}") - continue - - enhanced_item = future.result() - if not enhanced_item: - errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer") - continue - - enhanced_item.store_state() - session.add(enhanced_item) - added_items.add(item.imdb_id) - - log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}" - except NotADirectoryError: - errors.append(f"Skipping {item.log_string} as it is not a valid directory") + with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor: + future_to_item = { + executor.submit(self._enhance_item, item): item + for item in chunk + } + + for future in as_completed(future_to_item): + item = future_to_item[future] + log_message = "" + + try: + if not item or item.imdb_id in added_items: + errors.append(f"Duplicate symlink directory found for {item.log_string}") + continue + + if db_functions.get_item_by_id(item.id, session=session): + errors.append(f"Duplicate item found in database for id: {item.id}") + continue + + enhanced_item = future.result() + if not enhanced_item: + errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer") + continue + + enhanced_item.store_state() + session.add(enhanced_item) + added_items.add(item.imdb_id) + + log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}" + except NotADirectoryError: + errors.append(f"Skipping {item.log_string} as it is not a valid directory") + except Exception as e: + logger.exception(f"Error processing {item.log_string}: {e}") + raise # Re-raise to trigger rollback + finally: + progress.update(task, advance=1, log=log_message) + + # Only commit if the entire chunk was successful + session.commit() + except Exception as e: - logger.exception(f"Error processing {item.log_string}: {e}") - finally: - progress.update(task, advance=1, log=log_message) - + session.rollback() + logger.error(f"Failed to process chunk {i//chunk_size + 1}, rolling back all changes: {str(e)}") + raise # Re-raise to abort the entire process + progress.update(task, log="Finished Indexing Symlinks!") - session.commit() - if errors: - logger.error("Errors encountered during initialization") - for error in errors: - logger.error(error) + if errors: + logger.error("Errors encountered during initialization") + for error in errors: + logger.error(error) + + except Exception as e: + session.rollback() + logger.error(f"Failed to initialize database from symlinks: {str(e)}") + return elapsed_time = datetime.now() - start_time total_seconds = elapsed_time.total_seconds() hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) - logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}") + logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}") \ No newline at end of file