Skip to content

Commit

Permalink
fix: tweak logging for db init from symlinks.
Browse files Browse the repository at this point in the history
  • Loading branch information
dreulavelle committed Sep 17, 2024
1 parent 17a579e commit 2f15fbd
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 100 deletions.
10 changes: 7 additions & 3 deletions src/program/content/mdblist.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self):
return
self.requests_per_2_minutes = self._calculate_request_time()
self.rate_limiter = RateLimiter(self.requests_per_2_minutes, 120, True)
self.recurring_items = set()
logger.success("mdblist initialized")

def validate(self):
Expand Down Expand Up @@ -62,9 +63,12 @@ def run(self) -> Generator[MediaItem, None, None]:
pass

non_existing_items = _filter_existing_items(items_to_yield)
if len(non_existing_items) > 0:
logger.info(f"Found {len(non_existing_items)} new items to fetch")
yield non_existing_items
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
logger.info(f"Found {len(new_non_recurring_items)} new items to fetch")
yield new_non_recurring_items

def _calculate_request_time(self):
limits = my_limits(self.settings.api_key).limits
Expand Down
4 changes: 2 additions & 2 deletions src/program/content/overseerr.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def run(self):

overseerr_items: list[MediaItem] = self.get_media_requests()
non_existing_items = _filter_existing_items(overseerr_items)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items]
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if self.settings.use_webhook:
logger.debug("Webhook is enabled. Running Overseerr once before switching to webhook.")
logger.debug("Webhook is enabled. Running Overseerr once before switching to webhook only mode")
self.run_once = True

if new_non_recurring_items:
Expand Down
2 changes: 1 addition & 1 deletion src/program/content/plex_watchlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(self) -> Generator[MediaItem, None, None]:
plex_items: set[str] = set(watchlist_items) | set(rss_items)
items_to_yield: list[MediaItem] = [MediaItem({"imdb_id": imdb_id, "requested_by": self.key}) for imdb_id in plex_items if imdb_id.startswith("tt")]
non_existing_items = _filter_existing_items(items_to_yield)
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items]
new_non_recurring_items = [item for item in non_existing_items if item.imdb_id not in self.recurring_items and isinstance(item, MediaItem)]
self.recurring_items.update([item.imdb_id for item in new_non_recurring_items])

if new_non_recurring_items:
Expand Down
69 changes: 28 additions & 41 deletions src/program/content/trakt.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self):
if not self.initialized:
return
self.next_run_time = 0
self.recurring_items = set()
self.items_already_seen = set()
self.missing()
logger.success("Trakt initialized!")
Expand Down Expand Up @@ -74,53 +75,39 @@ def missing(self):

def run(self):
"""Fetch media from Trakt and yield Movie, Show, or MediaItem instances."""
current_time = time.time()
items_to_yield = []
if current_time < self.next_run_time:
return

self.next_run_time = current_time + self.settings.update_interval
watchlist_ids = self._get_watchlist(self.settings.watchlist)
collection_ids = self._get_collection(self.settings.collection)
user_list_ids = self._get_list(self.settings.user_lists)
trending_ids = self._get_trending_items() if self.settings.fetch_trending else []
popular_ids = self._get_popular_items() if self.settings.fetch_popular else []

# Combine all IMDb IDs and types
all_items = {
"Watchlist": watchlist_ids,
"Collection": collection_ids,
"User Lists": user_list_ids,
"Trending": trending_ids,
"Popular": popular_ids
}

total_new_items = 0
def fetch_items(fetch_function, *args):
"""Helper function to fetch items using the provided function and arguments."""
return fetch_function(*args) if args else []

for source, items in all_items.items():
new_items_count = 0
for imdb_id, item_type in items:
if imdb_id in self.items_already_seen or not imdb_id:
continue
self.items_already_seen.add(imdb_id)
new_items_count += 1
watchlist_ids = fetch_items(self._get_watchlist, self.settings.watchlist)
collection_ids = fetch_items(self._get_collection, self.settings.collection)
user_list_ids = fetch_items(self._get_list, self.settings.user_lists)
trending_ids = fetch_items(self._get_trending_items) if self.settings.fetch_trending else []
popular_ids = fetch_items(self._get_popular_items) if self.settings.fetch_popular else []

items_to_yield.append(MediaItem({
"imdb_id": imdb_id,
"requested_by": self.key
}))
# Combine all IMDb IDs and types into a set to avoid duplicates
all_ids = set(watchlist_ids + collection_ids + user_list_ids + trending_ids + popular_ids)

if new_items_count > 0:
logger.log("TRAKT", f"New items fetched from {source}: {new_items_count}")
total_new_items += new_items_count
if total_new_items > 0:
logger.log("TRAKT", f"Total new items fetched: {total_new_items}")
items_to_yield = [
MediaItem({"imdb_id": imdb_id, "requested_by": self.key})
for imdb_id in all_ids
if imdb_id.startswith("tt")
]

non_existing_items = _filter_existing_items(items_to_yield)
if len(non_existing_items) > 0:
logger.info(f"Found {len(non_existing_items)} new items to fetch")

yield non_existing_items
new_non_recurring_items = [
item
for item in non_existing_items
if item.imdb_id not in self.recurring_items
and isinstance(item, MediaItem)
]
self.recurring_items.update(item.imdb_id for item in new_non_recurring_items)

if new_non_recurring_items:
logger.log("TRAKT", f"Found {len(new_non_recurring_items)} new items to fetch")

yield new_non_recurring_items

def _get_watchlist(self, watchlist_users: list) -> list:
"""Get IMDb IDs from Trakt watchlist"""
Expand Down
78 changes: 25 additions & 53 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import time
from datetime import datetime
from queue import Empty
import traceback

from apscheduler.schedulers.background import BackgroundScheduler
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn, MofNCompleteColumn, TimeElapsedColumn
from rich.live import Live

from program.content import Listrr, Mdblist, Overseerr, PlexWatchlist, TraktContent
Expand Down Expand Up @@ -335,45 +336,6 @@ def stop(self):
self.scheduler.shutdown(wait=False)
logger.log("PROGRAM", "Riven has been stopped.")

# def _init_db_from_symlinks(self):
# with db.Session() as session:
# res = session.execute(select(func.count(MediaItem._id))).scalar_one()
# added = []
# if res == 0:
# logger.log("PROGRAM", "Collecting items from symlinks")
# items = self.services[SymlinkLibrary].run()
# logger.log("PROGRAM", f"Found {len(items)} symlinks to add to database")
# if settings_manager.settings.map_metadata:
# console = Console()
# progress = Progress(
# SpinnerColumn(),
# TextColumn("[progress.description]{task.description}"),
# BarColumn(),
# TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
# TimeRemainingColumn(),
# console=console,
# transient=True,
# )

# task = progress.add_task("Enriching items with metadata", total=len(items))
# with Live(progress, console=console, refresh_per_second=10):
# for item in items:
# if isinstance(item, (Movie, Show)):
# try:
# enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False))
# except StopIteration as e:
# logger.error(f"Failed to enhance metadata for {item.title} ({item.item_id}): {e}")
# continue
# if enhanced_item.item_id in added:
# logger.error(f"Cannot enhance metadata, {item.title} ({item.item_id}) contains multiple folders. Manual resolution required. Skipping.")
# continue
# added.append(enhanced_item.item_id)
# enhanced_item.store_state()
# session.add(enhanced_item)
# progress.update(task, advance=1)
# session.commit()
# logger.log("PROGRAM", "Database initialized")

def _enhance_item(self, item: MediaItem) -> MediaItem | None:
try:
enhanced_item = next(self.services[TraktIndexer].run(item, log_msg=False))
Expand All @@ -383,14 +345,15 @@ def _enhance_item(self, item: MediaItem) -> MediaItem | None:
return None

def _init_db_from_symlinks(self):
"""Initialize the database from symlinks."""
start_time = datetime.now()
with db.Session() as session:
res = session.execute(select(func.count(MediaItem._id))).scalar_one()
added = []
errors = []
if res == 0:
logger.log("PROGRAM", "Collecting items from symlinks")
logger.log("PROGRAM", "Collecting items from symlinks, this may take a while depending on library size")
items = self.services[SymlinkLibrary].run()
logger.log("PROGRAM", f"Found {len(items)} Movie and Show symlinks to add to database")
if settings_manager.settings.map_metadata:
console = Console()
progress = Progress(
Expand All @@ -399,11 +362,13 @@ def _init_db_from_symlinks(self):
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
TextColumn("[progress.completed]{task.completed}/{task.total}", justify="right"),
TextColumn("[progress.log]{task.fields[log]}", justify="right"),
console=console,
transient=True,
transient=True
)

task = progress.add_task("Enriching items with metadata", total=len(items))
task = progress.add_task("Enriching items with metadata", total=len(items), log="")
with Live(progress, console=console, refresh_per_second=10):
with ThreadPoolExecutor(max_workers=8) as executor: # testing between 4 and 8
future_to_item = {executor.submit(self._enhance_item, item): item for item in items if isinstance(item, (Movie, Show))}
Expand All @@ -413,21 +378,28 @@ def _init_db_from_symlinks(self):
enhanced_item = future.result()
if enhanced_item:
if enhanced_item.item_id in added:
errors.append(f"Duplicate symlink found for {item.title} ({item.item_id}), skipping...")
errors.append(f"Duplicate Symlink found: {enhanced_item.log_string}")
continue
else:
added.append(enhanced_item.item_id)
enhanced_item.store_state()
session.add(enhanced_item)
log_message = f"Indexed IMDb Id: {enhanced_item.imdb_id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except Exception as e:
errors.append(f"Error processing {item.title} ({item.item_id}): {e}")
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1)
progress.update(task, advance=1, log=log_message)
progress.update(task, log="Finished Indexing Symlinks!")
session.commit()

if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)

logger.log("PROGRAM", "Database initialized")
# lets log the errors at the end in case we need user intervention
if errors:
logger.error("Errors encountered during initialization")
for error in errors:
logger.error(error)

elapsed_time = datetime.now() - start_time
total_seconds = elapsed_time.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")

0 comments on commit 2f15fbd

Please sign in to comment.