Skip to content

Commit

Permalink
fix: add extra logging to track issue. added mutex to add_to_running
Browse files Browse the repository at this point in the history
  • Loading branch information
dreulavelle committed Jul 26, 2024
1 parent cbe9012 commit 87c3241
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ def run_delete(_type):
run_delete(MediaItem)

logger.log("PROGRAM", "Database reset. Turning off HARD_RESET Env Var.")
os.environ["HARD_RESET"] = "False"
os.environ.pop('HARD_RESET', None)
67 changes: 44 additions & 23 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,47 @@ def _schedule_services(self) -> None:
logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.")

def _id_in_queue(self, id):
if id is None:
return False
return any(i._id == id for i in self.queued_items)

def _id_in_running_items(self, id):
if id is None:
return False
return any(i._id == id for i in self.running_items)

def _push_event_queue(self, event):
def _push_event_queue(self, event) -> bool:
with self.mutex:
if( event.item not in self.queued_items and event.item not in self.running_items):
if hasattr(event.item, "_id") and event.item._id is not None:
if isinstance(event.item, Show):
for s in event.item.seasons:
if s._id and (self._id_in_queue(s._id) or self._id_in_running_items(s._id)):
return None
for e in s.episodes:
if e._id and (self._id_in_queue(e._id) or self._id_in_running_items(e._id)):
return None
if isinstance(event.item, Season):
for e in event.item.episodes:
if self._id_in_queue(e._id) or self._id_in_running_items(e._id):
return None
if hasattr(event.item, "parent") and ( self._id_in_queue(event.item.parent._id) or self._id_in_running_items(event.item.parent._id) ):
return None
if hasattr(event.item, "parent") and hasattr(event.item.parent, "parent") and event.item.parent.parent and ( self._id_in_queue(event.item.parent.parent._id) or self._id_in_running_items(event.item.parent.parent._id)):
return None
if event.item in self.queued_items or event.item in self.running_items:
logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.")
return False

if hasattr(event.item, "_id") and event.item._id is not None:
if isinstance(event.item, Show):
for season in event.item.seasons:
if season._id and (self._id_in_queue(season._id) or self._id_in_running_items(season._id)):
logger.debug(f"Season {season.log_string} of show {event.item.log_string} is already in the queue or running, skipping.")
return False
for episode in season.episodes:
if episode._id and (self._id_in_queue(episode._id) or self._id_in_running_items(episode._id)):
logger.debug(f"Episode {episode.log_string} of season {season.log_string} is already in the queue or running, skipping.")
return False
elif isinstance(event.item, Season):
for episode in event.item.episodes:
if self._id_in_queue(episode._id) or self._id_in_running_items(episode._id):
logger.debug(f"Episode {episode.log_string} of season {event.item.log_string} is already in the queue or running, skipping.")
return False
elif hasattr(event.item, "parent"):
parent = event.item.parent
if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id):
logger.debug(f"Parent {parent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.")
return False
if hasattr(parent, "parent") and parent.parent:
grandparent = parent.parent
if self._id_in_queue(grandparent._id) or self._id_in_running_items(grandparent._id):
logger.debug(f"Grandparent {grandparent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.")
return False

self.queued_items.append(event.item)
self.event_queue.put(event)
if not isinstance(event.item, (Show, Movie, Episode, Season)):
Expand All @@ -278,15 +295,15 @@ def _remove_from_running_items(self, item, service_name=""):
with self.mutex:
if item in self.running_items:
self.running_items.remove(item)
logger.log("PROGRAM", f"Item {item.log_string} finished running section {service_name}" )
logger.log("PROGRAM", f"Item {item.log_string} finished running section {service_name} with state {item.state.value}" )

def add_to_running(self, item, service_name):
if item is None:
return
if item not in self.running_items:
if isinstance(item, MediaItem) and not self._id_in_running_items(item._id) or not isinstance(item, MediaItem):
with self.mutex:
if item not in self.running_items:
self.running_items.append(item)
logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name}" )
logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}")

def _process_future_item(self, future: Future, service: Service, orig_item: MediaItem) -> None:
"""Callback to add the results from a future emitted by a service to the event queue."""
Expand Down Expand Up @@ -378,8 +395,9 @@ def run(self):
except Empty:
self.dump_tracemalloc()
continue

with db.Session() as session:
existing_item = DB._get_item_from_db(session, event.item)
existing_item: MediaItem | None = DB._get_item_from_db(session, event.item)
updated_item, next_service, items_to_submit = process_event(
existing_item, event.emitted_by, existing_item if existing_item is not None else event.item
)
Expand All @@ -391,10 +409,13 @@ def run(self):

if items_to_submit:
for item_to_submit in items_to_submit:
logger.debug(f"Submitting {item_to_submit.log_string} to {next_service.__name__}")
self.add_to_running(item_to_submit, next_service.__name__)
self._submit_job(next_service, item_to_submit)
if isinstance(existing_item, MediaItem):
logger.debug(f"Storing state of {existing_item.log_string}")
existing_item.store_state()
logger.debug(f"Committing changes to the database")
session.commit()

def stop(self):
Expand Down

0 comments on commit 87c3241

Please sign in to comment.