Skip to content

Commit

Permalink
fix: revert trakt/item modules back to 0.7.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Spoked authored and Spoked committed Jul 1, 2024
1 parent a9b1ea9 commit 864535b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 116 deletions.
106 changes: 60 additions & 46 deletions backend/program/indexers/trakt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Trakt updater module"""

from datetime import datetime, timedelta
from typing import Generator, Optional, Union
from typing import Generator, List, Optional, Union

from program.media.item import Episode, MediaItem, Movie, Season, Show
from program.settings.manager import settings_manager
Expand All @@ -17,30 +17,24 @@ class TraktIndexer:

def __init__(self):
self.key = "traktindexer"
self.ids = []
self.initialized = True
self.settings = settings_manager.settings.indexer

def copy_items(self, itema: MediaItem, itemb: MediaItem) -> MediaItem:
def copy_items(self, itema: MediaItem, itemb: MediaItem):
if isinstance(itema, Show) and isinstance(itemb, Show):
for seasona, seasonb in zip(itema.seasons, itemb.seasons):
for episodea, episodeb in zip(seasona.episodes, seasonb.episodes):
self._copy_episode_attributes(episodea, episodeb)
for (seasona, seasonb) in zip(itema.seasons, itemb.seasons):
for (episodea, episodeb) in zip(seasona.episodes, seasonb.episodes):
episodeb.set("update_folder", episodea.update_folder)
episodeb.set("symlinked", episodea.symlinked)
seasonb.set("is_anime", itema.is_anime)
episodeb.set("is_anime", itema.is_anime)
elif isinstance(itema, Movie) and isinstance(itemb, Movie):
self._copy_movie_attributes(itema, itemb)
itemb.set("update_folder", itema.update_folder)
itemb.set("symlinked", itema.symlinked)
itemb.set("is_anime", itema.is_anime)
return itemb

@staticmethod
def _copy_episode_attributes(source: Episode, target: Episode) -> None:
target.update_folder = source.update_folder
target.symlinked = source.symlinked
target.is_anime = source.is_anime

@staticmethod
def _copy_movie_attributes(source: Movie, target: Movie) -> None:
target.update_folder = source.update_folder
target.symlinked = source.symlinked
target.is_anime = source.is_anime


def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
"""Run the Trakt indexer for the given item."""
if not in_item:
Expand All @@ -49,15 +43,14 @@ def run(self, in_item: MediaItem) -> Generator[Union[Movie, Show, Season, Episod
if not (imdb_id := in_item.imdb_id):
logger.error(f"Item {in_item.log_string} does not have an imdb_id, cannot index it")
return

item = create_item_from_imdb_id(imdb_id)

if not isinstance(item, MediaItem):
logger.error(f"Failed to get item from imdb_id: {imdb_id}")
return

if isinstance(item, Show):
self._add_seasons_to_show(item, imdb_id)

item = self.copy_items(in_item, item)
item.indexed_at = datetime.now()
yield item
Expand All @@ -73,7 +66,7 @@ def should_submit(item: MediaItem) -> bool:
interval = timedelta(seconds=settings.update_interval)
return datetime.now() - item.indexed_at > interval
except Exception:
logger.error(f"Failed to parse date: {item.indexed_at}")
logger.error(f"Failed to parse date: {item.indexed_at} with format: {interval}")
return False

@staticmethod
Expand All @@ -86,49 +79,46 @@ def _add_seasons_to_show(show: Show, imdb_id: str):
if not imdb_id or not imdb_id.startswith("tt"):
logger.error(f"Item {show.log_string} does not have an imdb_id, cannot index it")
return



seasons = get_show(imdb_id)
for season in seasons:
if season.number == 0:
continue
season_item = _map_item_from_data(season, "season")
season_item = _map_item_from_data(season, "season", show.genres)
if season_item:
for episode_data in season.episodes:
episode_item = _map_item_from_data(episode_data, "episode")
for episode in season.episodes:
episode_item = _map_item_from_data(episode, "episode", show.genres)
if episode_item:
season_item.add_episode(episode_item)
show.add_season(season_item)

# Propagate important global attributes to seasons and episodes
show.propagate_attributes_to_childs()

def _map_item_from_data(data, item_type: str) -> Optional[MediaItem]:
def _map_item_from_data(data, item_type: str, show_genres: List[str] = None) -> Optional[MediaItem]:
"""Map trakt.tv API data to MediaItemContainer."""
if item_type not in ["movie", "show", "season", "episode"]:
logger.debug(f"Unknown item type {item_type} for {data.title}")
logger.debug(f"Unknown item type {item_type} for {data.title} not found in list of acceptable items")
return None

formatted_aired_at = _get_formatted_date(data, item_type)
year = getattr(data, "year", None) or (formatted_aired_at.year if formatted_aired_at else None)
genres = getattr(data, "genres", None) or show_genres

item = {
"title": getattr(data, "title", None),
"year": year,
"year": getattr(data, "year", None),
"status": getattr(data, "status", None),
"aired_at": formatted_aired_at,
"imdb_id": getattr(data.ids, "imdb", None),
"tvdb_id": getattr(data.ids, "tvdb", None),
"tmdb_id": getattr(data.ids, "tmdb", None),
"genres": getattr(data, "genres", None),
"genres": genres,
"network": getattr(data, "network", None),
"country": getattr(data, "country", None),
"language": getattr(data, "language", None),
"requested_at": datetime.now(),
"requested_at": datetime.now(),
}

item["is_anime"] = (
("anime" in item['genres'] or "animation" in item['genres']) if item['genres']
("anime" in genres or "animation" in genres) if genres
and item["country"] in ("jp", "kr")
else False
)
Expand All @@ -145,14 +135,17 @@ def _map_item_from_data(data, item_type: str) -> Optional[MediaItem]:
item["number"] = data.number
return Episode(item)
case _:
logger.error(f"Failed to create item from data: {data}")
logger.error(f"Unknown item type {item_type} for {data.title} not found in list of acceptable items")
return None


def _get_formatted_date(data, item_type: str) -> Optional[datetime]:
"""Get the formatted aired date from the data."""
date_str = getattr(data, "first_aired" if item_type in ["show", "season", "episode"] else "released", None)
date_format = "%Y-%m-%dT%H:%M:%S.%fZ" if item_type in ["show", "season", "episode"] else "%Y-%m-%d"
return datetime.strptime(date_str, date_format) if date_str else None
if item_type in ["show", "season", "episode"] and (first_aired := getattr(data, "first_aired", None)):
return datetime.strptime(first_aired, "%Y-%m-%dT%H:%M:%S.%fZ")
if item_type == "movie" and (released := getattr(data, "released", None)):
return datetime.strptime(released, "%Y-%m-%d")
return None


def get_show(imdb_id: str) -> dict:
Expand All @@ -167,16 +160,37 @@ def create_item_from_imdb_id(imdb_id: str) -> Optional[MediaItem]:
url = f"https://api.trakt.tv/search/imdb/{imdb_id}?extended=full"
response = get(url, additional_headers={"trakt-api-version": "2", "trakt-api-key": CLIENT_ID})
if not response.is_ok or not response.data:
logger.error(f"Failed to create item using imdb id: {imdb_id}")
logger.error(f"Failed to create item using imdb id: {imdb_id}") # This returns an empty list for response.data
return None

def find_first(preferred_types, data):
for type in preferred_types:
for d in data:
if d.type == type:
return d
return None

data = next((d for d in response.data if d.type in ["show", "movie", "season", "episode"]), None)
return _map_item_from_data(getattr(data, data.type), data.type) if data else None
data = find_first(["show", "movie", "season", "episode"], response.data)
if data:
return _map_item_from_data(getattr(data, data.type), data.type)
return None

def get_imdbid_from_tmdb(tmdb_id: str) -> Optional[str]:
"""Wrapper for trakt.tv API search method."""
url = f"https://api.trakt.tv/search/tmdb/{tmdb_id}?extended=full"
response = get(url, additional_headers={"trakt-api-version": "2", "trakt-api-key": CLIENT_ID})
if not response.is_ok or not response.data:
return None
return next((ns.movie.ids.imdb if ns.type == 'movie' else ns.show.ids.imdb for ns in response.data if ns.type in ['movie', 'show']), None)
imdb_id = get_imdb_id_from_list(response.data)
if imdb_id:
return imdb_id
logger.error(f"Failed to fetch imdb_id for tmdb_id: {tmdb_id}")
return None

def get_imdb_id_from_list(namespaces):
for ns in namespaces:
if ns.type == 'movie':
return ns.movie.ids.imdb
elif ns.type == 'show':
return ns.show.ids.imdb
return None
71 changes: 1 addition & 70 deletions backend/program/media/item.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Self

from program.media.state import States
from RTN import Torrent
from RTN.patterns import extract_episodes
from unidecode import unidecode
from utils.logger import logger


Expand Down Expand Up @@ -53,7 +51,7 @@ def __init__(self, item: dict) -> None:
self.parent: Optional[Self] = None

# Media related
self.title: Optional[str] = self.clean_title(item.get("title", None))
self.title: Optional[str] = item.get("title", None)
self.imdb_id: Optional[str] = item.get("imdb_id", None)
if self.imdb_id:
self.imdb_link: Optional[str] = f"https://www.imdb.com/title/{self.imdb_id}/"
Expand All @@ -65,7 +63,6 @@ def __init__(self, item: dict) -> None:
self.country: Optional[str] = item.get("country", None)
self.language: Optional[str] = item.get("language", None)
self.aired_at: Optional[datetime] = item.get("aired_at", None)
self.year: Optional[int] = item.get("year" , None)
self.genres: Optional[List[str]] = item.get("genres", [])

# Plex related
Expand Down Expand Up @@ -119,18 +116,6 @@ def _determine_state(self):
return States.Requested
return States.Unknown

def clean_title(self, title: Optional[str]) -> Optional[str]:
"""Clean the title by removing non-alphanumeric characters and mapping special characters."""
if title is None:
return None
# Convert special characters to closest ASCII equivalents
title = unidecode(title)
# Replace non-alphanumeric characters with spaces
title = re.sub(r'[^a-zA-Z0-9]', ' ', title)
# Remove extra spaces
title = re.sub(r'\s+', ' ', title).strip()
return title

def copy_other_media_attr(self, other):
"""Copy attributes from another media item."""
self.title = getattr(other, "title", None)
Expand All @@ -140,7 +125,6 @@ def copy_other_media_attr(self, other):
self.country = getattr(other, "country", None)
self.language = getattr(other, "language", None)
self.aired_at = getattr(other, "aired_at", None)
self.year = getattr(other, "year", None)
self.genres = getattr(other, "genres", [])
self.is_anime = getattr(other, "is_anime", False)
self.overseerr_id = getattr(other, "overseerr_id", None)
Expand Down Expand Up @@ -173,7 +157,6 @@ def to_dict(self):
"state": self.state.value,
"imdb_link": self.imdb_link if hasattr(self, "imdb_link") else None,
"aired_at": self.aired_at,
"year": self.year if hasattr(self, "year") else None,
"genres": self.genres if hasattr(self, "genres") else None,
"is_anime": self.is_anime if hasattr(self, "is_anime") else False,
"guid": self.guid,
Expand Down Expand Up @@ -248,26 +231,6 @@ def get_top_title(self) -> str:
case _:
return self.title

def get_top_year(self) -> Optional[int]:
"""Get the top year of the item."""
match self.__class__.__name__:
case "Season":
return self.parent.year
case "Episode":
return self.parent.parent.year
case _:
return self.year

def get_season_year(self) -> Optional[int]:
"""Get the season title of the item if show return nothing"""
match self.__class__.__name__:
case "Season":
return self.year
case "Episode":
return self.parent.year
case _:
return None

def __hash__(self):
return hash(self.item_id)

Expand Down Expand Up @@ -304,7 +267,6 @@ def __init__(self, item):
self.locations = item.get("locations", [])
self.seasons: list[Season] = item.get("seasons", [])
self.item_id = ItemId(self.imdb_id)
self.propagate_attributes_to_childs()

def get_season_index_by_id(self, item_id):
"""Find the index of an season by its item_id."""
Expand Down Expand Up @@ -359,24 +321,6 @@ def add_season(self, season):
season.item_id.parent_id = self.item_id
self.seasons = sorted(self.seasons, key=lambda s: s.number)

def propagate_attributes_to_childs(self):
"""Propagate show attributes to seasons and episodes if they are empty or do not match."""
# Important attributes that need to be connected.
attributes = ["genres", "country", "network", "language", "is_anime"]

def propagate(target, source):
for attr in attributes:
source_value = getattr(source, attr, None)
target_value = getattr(target, attr, None)
# Check if the attribute source is not falsy (none, false, 0, [])
# and if the target is not None we set the source to the target
if (not target_value) and source_value is not None:
setattr(target, attr, source_value)

for season in self.seasons:
propagate(season, self)
for episode in season.episodes:
propagate(episode, self)

class Season(MediaItem):
"""Season class"""
Expand Down Expand Up @@ -455,13 +399,6 @@ def log_string(self):
def get_top_title(self) -> str:
return self.parent.title

def get_top_year(self) -> Optional[int]:
return self.parent.year

def get_season_year(self) -> Optional[int]:
return self.year


class Episode(MediaItem):
"""Episode class"""

Expand Down Expand Up @@ -499,12 +436,6 @@ def log_string(self):
def get_top_title(self) -> str:
return self.parent.parent.title

def get_top_year(self) -> Optional[int]:
return self.parent.parent.year

def get_season_year(self) -> Optional[int]:
return self.parent.year


def _set_nested_attr(obj, key, value):
if "." in key:
Expand Down

0 comments on commit 864535b

Please sign in to comment.