diff --git a/src/program/program.py b/src/program/program.py index 4beb8b5e..8e607b3c 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -235,54 +235,44 @@ def _schedule_services(self) -> None: logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.") def _id_in_queue(self, id): - if id is None: - return False return any(i._id == id for i in self.queued_items) def _id_in_running_items(self, id): - if id is None: - return False return any(i._id == id for i in self.running_items) - def _push_event_queue(self, event) -> bool: + def _push_event_queue(self, event): with self.mutex: - if event.item in self.queued_items or event.item in self.running_items: - logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.") - return False - - if hasattr(event.item, "_id") and event.item._id is not None: - if isinstance(event.item, Show): - for season in event.item.seasons: - if season._id and (self._id_in_queue(season._id) or self._id_in_running_items(season._id)): - logger.debug(f"Season {season.log_string} of show {event.item.log_string} is already in the queue or running, skipping.") - return False - for episode in season.episodes: - if episode._id and (self._id_in_queue(episode._id) or self._id_in_running_items(episode._id)): - logger.debug(f"Episode {episode.log_string} of season {season.log_string} is already in the queue or running, skipping.") + if (not event.item in self.queued_items and not event.item in self.running_items): + if hasattr(event.item, "_id"): + if event.item.type == "show": + for s in event.item.seasons: + if self._id_in_queue(s._id) or self._id_in_running_items(s._id): return False - elif isinstance(event.item, Season): - for episode in event.item.episodes: - if self._id_in_queue(episode._id) or self._id_in_running_items(episode._id): - logger.debug(f"Episode {episode.log_string} of season {event.item.log_string} is already in the queue or running, skipping.") + for e in s.episodes: + if self._id_in_queue(e._id) or self._id_in_running_items(e._id): + return False + + elif event.item.type == "season": + for e in event.item.episodes: + if self._id_in_queue(e._id) or self._id_in_running_items(e._id): + return False + + elif hasattr(event.item, "parent"): + parent = event.item.parent + if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id): return False - elif hasattr(event.item, "parent"): - parent = event.item.parent - if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id): - logger.debug(f"Parent {parent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.") - return False - if hasattr(parent, "parent") and parent.parent: - grandparent = parent.parent - if self._id_in_queue(grandparent._id) or self._id_in_running_items(grandparent._id): - logger.debug(f"Grandparent {grandparent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.") + elif hasattr(parent, "parent") and self._id_in_queue(parent.parent._id) or self._id_in_running_items(parent.parent._id): return False self.queued_items.append(event.item) self.event_queue.put(event) + if not isinstance(event.item, (Show, Movie, Episode, Season)): logger.log("NEW", f"Added {event.item.log_string} to the queue") else: logger.log("DISCOVERY", f"Re-added {event.item.log_string} to the queue" ) return True + logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.") return False @@ -302,8 +292,11 @@ def add_to_running(self, item, service_name): return with self.mutex: if item not in self.running_items: - self.running_items.append(item) - logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}") + if isinstance(item, MediaItem) and not self._id_in_running_items(item._id): + self.running_items.append(item) + elif not isinstance(item, MediaItem): + self.running_items.append(item) + logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}" ) def _process_future_item(self, future: Future, service: Service, orig_item: MediaItem) -> None: """Callback to add the results from a future emitted by a service to the event queue.""" @@ -409,13 +402,10 @@ def run(self): if items_to_submit: for item_to_submit in items_to_submit: - logger.debug(f"Submitting {item_to_submit.log_string} to {next_service.__name__}") self.add_to_running(item_to_submit, next_service.__name__) self._submit_job(next_service, item_to_submit) if isinstance(existing_item, MediaItem): - logger.debug(f"Storing state of {existing_item.log_string}") existing_item.store_state() - logger.debug(f"Committing changes to the database") session.commit() def stop(self):