From 864535b01dc790142e21284d24f71335dd116e38 Mon Sep 17 00:00:00 2001 From: Spoked Date: Mon, 1 Jul 2024 18:05:10 -0500 Subject: [PATCH] fix: revert trakt/item modules back to 0.7.4 --- backend/program/indexers/trakt.py | 106 +++++++++++++++++------------- backend/program/media/item.py | 71 +------------------- 2 files changed, 61 insertions(+), 116 deletions(-) diff --git a/backend/program/indexers/trakt.py b/backend/program/indexers/trakt.py index 52221c02..5af9e184 100644 --- a/backend/program/indexers/trakt.py +++ b/backend/program/indexers/trakt.py @@ -1,7 +1,7 @@ """Trakt updater module""" from datetime import datetime, timedelta -from typing import Generator, Optional, Union +from typing import Generator, List, Optional, Union from program.media.item import Episode, MediaItem, Movie, Season, Show from program.settings.manager import settings_manager @@ -17,30 +17,24 @@ class TraktIndexer: def __init__(self): self.key = "traktindexer" + self.ids = [] self.initialized = True self.settings = settings_manager.settings.indexer - def copy_items(self, itema: MediaItem, itemb: MediaItem) -> MediaItem: + def copy_items(self, itema: MediaItem, itemb: MediaItem): if isinstance(itema, Show) and isinstance(itemb, Show): - for seasona, seasonb in zip(itema.seasons, itemb.seasons): - for episodea, episodeb in zip(seasona.episodes, seasonb.episodes): - self._copy_episode_attributes(episodea, episodeb) + for (seasona, seasonb) in zip(itema.seasons, itemb.seasons): + for (episodea, episodeb) in zip(seasona.episodes, seasonb.episodes): + episodeb.set("update_folder", episodea.update_folder) + episodeb.set("symlinked", episodea.symlinked) + seasonb.set("is_anime", itema.is_anime) + episodeb.set("is_anime", itema.is_anime) elif isinstance(itema, Movie) and isinstance(itemb, Movie): - self._copy_movie_attributes(itema, itemb) + itemb.set("update_folder", itema.update_folder) + itemb.set("symlinked", itema.symlinked) + itemb.set("is_anime", itema.is_anime) return itemb - - @staticmethod - def _copy_episode_attributes(source: Episode, target: Episode) -> None: - target.update_folder = source.update_folder - target.symlinked = source.symlinked - target.is_anime = source.is_anime - - @staticmethod - def _copy_movie_attributes(source: Movie, target: Movie) -> None: - target.update_folder = source.update_folder - target.symlinked = source.symlinked - target.is_anime = source.is_anime - + def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episode], None, None]: """Run the Trakt indexer for the given item.""" if not in_item: @@ -49,15 +43,14 @@ def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episod if not (imdb_id := in_item.imdb_id): logger.error(f"Item {in_item.log_string} does not have an imdb_id, cannot index it") return - + item = create_item_from_imdb_id(imdb_id) + if not isinstance(item, MediaItem): logger.error(f"Failed to get item from imdb_id: {imdb_id}") return - if isinstance(item, Show): self._add_seasons_to_show(item, imdb_id) - item = self.copy_items(in_item, item) item.indexed_at = datetime.now() yield item @@ -73,7 +66,7 @@ def should_submit(item: MediaItem) -> bool: interval = timedelta(seconds=settings.update_interval) return datetime.now() - item.indexed_at > interval except Exception: - logger.error(f"Failed to parse date: {item.indexed_at}") + logger.error(f"Failed to parse date: {item.indexed_at} with format: {interval}") return False @staticmethod @@ -86,49 +79,46 @@ def _add_seasons_to_show(show: Show, imdb_id: str): if not imdb_id or not imdb_id.startswith("tt"): logger.error(f"Item {show.log_string} does not have an imdb_id, cannot index it") return - - + seasons = get_show(imdb_id) for season in seasons: if season.number == 0: continue - season_item = _map_item_from_data(season, "season") + season_item = _map_item_from_data(season, "season", show.genres) if season_item: - for episode_data in season.episodes: - episode_item = _map_item_from_data(episode_data, "episode") + for episode in season.episodes: + episode_item = _map_item_from_data(episode, "episode", show.genres) if episode_item: season_item.add_episode(episode_item) show.add_season(season_item) - # Propagate important global attributes to seasons and episodes - show.propagate_attributes_to_childs() -def _map_item_from_data(data, item_type: str) -> Optional[MediaItem]: +def _map_item_from_data(data, item_type: str, show_genres: List[str] = None) -> Optional[MediaItem]: """Map trakt.tv API data to MediaItemContainer.""" if item_type not in ["movie", "show", "season", "episode"]: - logger.debug(f"Unknown item type {item_type} for {data.title}") + logger.debug(f"Unknown item type {item_type} for {data.title} not found in list of acceptable items") return None formatted_aired_at = _get_formatted_date(data, item_type) - year = getattr(data, "year", None) or (formatted_aired_at.year if formatted_aired_at else None) + genres = getattr(data, "genres", None) or show_genres item = { "title": getattr(data, "title", None), - "year": year, + "year": getattr(data, "year", None), "status": getattr(data, "status", None), "aired_at": formatted_aired_at, "imdb_id": getattr(data.ids, "imdb", None), "tvdb_id": getattr(data.ids, "tvdb", None), "tmdb_id": getattr(data.ids, "tmdb", None), - "genres": getattr(data, "genres", None), + "genres": genres, "network": getattr(data, "network", None), "country": getattr(data, "country", None), "language": getattr(data, "language", None), - "requested_at": datetime.now(), + "requested_at": datetime.now(), } - + item["is_anime"] = ( - ("anime" in item['genres'] or "animation" in item['genres']) if item['genres'] + ("anime" in genres or "animation" in genres) if genres and item["country"] in ("jp", "kr") else False ) @@ -145,14 +135,17 @@ def _map_item_from_data(data, item_type: str) -> Optional[MediaItem]: item["number"] = data.number return Episode(item) case _: - logger.error(f"Failed to create item from data: {data}") + logger.error(f"Unknown item type {item_type} for {data.title} not found in list of acceptable items") return None + def _get_formatted_date(data, item_type: str) -> Optional[datetime]: """Get the formatted aired date from the data.""" - date_str = getattr(data, "first_aired" if item_type in ["show", "season", "episode"] else "released", None) - date_format = "%Y-%m-%dT%H:%M:%S.%fZ" if item_type in ["show", "season", "episode"] else "%Y-%m-%d" - return datetime.strptime(date_str, date_format) if date_str else None + if item_type in ["show", "season", "episode"] and (first_aired := getattr(data, "first_aired", None)): + return datetime.strptime(first_aired, "%Y-%m-%dT%H:%M:%S.%fZ") + if item_type == "movie" and (released := getattr(data, "released", None)): + return datetime.strptime(released, "%Y-%m-%d") + return None def get_show(imdb_id: str) -> dict: @@ -167,11 +160,20 @@ def create_item_from_imdb_id(imdb_id: str) -> Optional[MediaItem]: url = f"https://api.trakt.tv/search/imdb/{imdb_id}?extended=full" response = get(url, additional_headers={"trakt-api-version": "2", "trakt-api-key": CLIENT_ID}) if not response.is_ok or not response.data: - logger.error(f"Failed to create item using imdb id: {imdb_id}") + logger.error(f"Failed to create item using imdb id: {imdb_id}") # This returns an empty list for response.data + return None + + def find_first(preferred_types, data): + for type in preferred_types: + for d in data: + if d.type == type: + return d return None - data = next((d for d in response.data if d.type in ["show", "movie", "season", "episode"]), None) - return _map_item_from_data(getattr(data, data.type), data.type) if data else None + data = find_first(["show", "movie", "season", "episode"], response.data) + if data: + return _map_item_from_data(getattr(data, data.type), data.type) + return None def get_imdbid_from_tmdb(tmdb_id: str) -> Optional[str]: """Wrapper for trakt.tv API search method.""" @@ -179,4 +181,16 @@ def get_imdbid_from_tmdb(tmdb_id: str) -> Optional[str]: response = get(url, additional_headers={"trakt-api-version": "2", "trakt-api-key": CLIENT_ID}) if not response.is_ok or not response.data: return None - return next((ns.movie.ids.imdb if ns.type == 'movie' else ns.show.ids.imdb for ns in response.data if ns.type in ['movie', 'show']), None) + imdb_id = get_imdb_id_from_list(response.data) + if imdb_id: + return imdb_id + logger.error(f"Failed to fetch imdb_id for tmdb_id: {tmdb_id}") + return None + +def get_imdb_id_from_list(namespaces): + for ns in namespaces: + if ns.type == 'movie': + return ns.movie.ids.imdb + elif ns.type == 'show': + return ns.show.ids.imdb + return None diff --git a/backend/program/media/item.py b/backend/program/media/item.py index 6fad930c..4ef374ec 100644 --- a/backend/program/media/item.py +++ b/backend/program/media/item.py @@ -1,4 +1,3 @@ -import re from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Optional, Self @@ -6,7 +5,6 @@ from program.media.state import States from RTN import Torrent from RTN.patterns import extract_episodes -from unidecode import unidecode from utils.logger import logger @@ -53,7 +51,7 @@ def __init__(self, item: dict) -> None: self.parent: Optional[Self] = None # Media related - self.title: Optional[str] = self.clean_title(item.get("title", None)) + self.title: Optional[str] = item.get("title", None) self.imdb_id: Optional[str] = item.get("imdb_id", None) if self.imdb_id: self.imdb_link: Optional[str] = f"https://www.imdb.com/title/{self.imdb_id}/" @@ -65,7 +63,6 @@ def __init__(self, item: dict) -> None: self.country: Optional[str] = item.get("country", None) self.language: Optional[str] = item.get("language", None) self.aired_at: Optional[datetime] = item.get("aired_at", None) - self.year: Optional[int] = item.get("year" , None) self.genres: Optional[List[str]] = item.get("genres", []) # Plex related @@ -119,18 +116,6 @@ def _determine_state(self): return States.Requested return States.Unknown - def clean_title(self, title: Optional[str]) -> Optional[str]: - """Clean the title by removing non-alphanumeric characters and mapping special characters.""" - if title is None: - return None - # Convert special characters to closest ASCII equivalents - title = unidecode(title) - # Replace non-alphanumeric characters with spaces - title = re.sub(r'[^a-zA-Z0-9]', ' ', title) - # Remove extra spaces - title = re.sub(r'\s+', ' ', title).strip() - return title - def copy_other_media_attr(self, other): """Copy attributes from another media item.""" self.title = getattr(other, "title", None) @@ -140,7 +125,6 @@ def copy_other_media_attr(self, other): self.country = getattr(other, "country", None) self.language = getattr(other, "language", None) self.aired_at = getattr(other, "aired_at", None) - self.year = getattr(other, "year", None) self.genres = getattr(other, "genres", []) self.is_anime = getattr(other, "is_anime", False) self.overseerr_id = getattr(other, "overseerr_id", None) @@ -173,7 +157,6 @@ def to_dict(self): "state": self.state.value, "imdb_link": self.imdb_link if hasattr(self, "imdb_link") else None, "aired_at": self.aired_at, - "year": self.year if hasattr(self, "year") else None, "genres": self.genres if hasattr(self, "genres") else None, "is_anime": self.is_anime if hasattr(self, "is_anime") else False, "guid": self.guid, @@ -248,26 +231,6 @@ def get_top_title(self) -> str: case _: return self.title - def get_top_year(self) -> Optional[int]: - """Get the top year of the item.""" - match self.__class__.__name__: - case "Season": - return self.parent.year - case "Episode": - return self.parent.parent.year - case _: - return self.year - - def get_season_year(self) -> Optional[int]: - """Get the season title of the item if show return nothing""" - match self.__class__.__name__: - case "Season": - return self.year - case "Episode": - return self.parent.year - case _: - return None - def __hash__(self): return hash(self.item_id) @@ -304,7 +267,6 @@ def __init__(self, item): self.locations = item.get("locations", []) self.seasons: list[Season] = item.get("seasons", []) self.item_id = ItemId(self.imdb_id) - self.propagate_attributes_to_childs() def get_season_index_by_id(self, item_id): """Find the index of an season by its item_id.""" @@ -359,24 +321,6 @@ def add_season(self, season): season.item_id.parent_id = self.item_id self.seasons = sorted(self.seasons, key=lambda s: s.number) - def propagate_attributes_to_childs(self): - """Propagate show attributes to seasons and episodes if they are empty or do not match.""" - # Important attributes that need to be connected. - attributes = ["genres", "country", "network", "language", "is_anime"] - - def propagate(target, source): - for attr in attributes: - source_value = getattr(source, attr, None) - target_value = getattr(target, attr, None) - # Check if the attribute source is not falsy (none, false, 0, []) - # and if the target is not None we set the source to the target - if (not target_value) and source_value is not None: - setattr(target, attr, source_value) - - for season in self.seasons: - propagate(season, self) - for episode in season.episodes: - propagate(episode, self) class Season(MediaItem): """Season class""" @@ -455,13 +399,6 @@ def log_string(self): def get_top_title(self) -> str: return self.parent.title - def get_top_year(self) -> Optional[int]: - return self.parent.year - - def get_season_year(self) -> Optional[int]: - return self.year - - class Episode(MediaItem): """Episode class""" @@ -499,12 +436,6 @@ def log_string(self): def get_top_title(self) -> str: return self.parent.parent.title - def get_top_year(self) -> Optional[int]: - return self.parent.parent.year - - def get_season_year(self) -> Optional[int]: - return self.parent.year - def _set_nested_attr(obj, key, value): if "." in key: