diff --git a/bazarr/app/signalr_client.py b/bazarr/app/signalr_client.py index e72968551..1f64ac479 100644 --- a/bazarr/app/signalr_client.py +++ b/bazarr/app/signalr_client.py @@ -3,11 +3,14 @@ import logging import json import time +import threading from requests import Session from signalr import Connection from requests.exceptions import ConnectionError from signalrcore.hub_connection_builder import HubConnectionBuilder +from collections import deque +from time import sleep from constants import headers from sonarr.sync.episodes import sync_episodes, sync_one_episode @@ -21,6 +24,12 @@ from .scheduler import scheduler from .get_args import args +sonarr_queue = deque() +radarr_queue = deque() + +last_event_data = None + + class SonarrSignalrClientLegacy: def __init__(self): super(SonarrSignalrClientLegacy, self).__init__() @@ -46,8 +55,9 @@ class SonarrSignalrClientLegacy: except json.decoder.JSONDecodeError: logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is caused by a permissions " "issue when Sonarr try to access its /config/.config directory." - "Typically permissions are too permissive - only the user and group Sonarr runs as should have Read/Write permissions (e.g. files 664 / folders 775)" - "You should fix permissions on that directory and restart Sonarr. Also, if you're a Docker image " + "Typically permissions are too permissive - only the user and group Sonarr runs as " + "should have Read/Write permissions (e.g. files 664 / folders 775). You should fix " + "permissions on that directory and restart Sonarr. Also, if you're a Docker image " "user, you should make sure you properly defined PUID/PGID environment variables. " "Otherwise, please contact Sonarr support.") else: @@ -61,21 +71,19 @@ class SonarrSignalrClientLegacy: try: self.connection.close() except Exception: - pass + self.connection.started = False if log: logging.info('BAZARR SignalR client for Sonarr is now disconnected.') def restart(self): if self.connection: if self.connection.started: - try: - self.stop(log=False) - except Exception: - self.connection.started = False + self.stop(log=False) if settings.general.getboolean('use_sonarr'): self.start() - def exception_handler(self, type, exception, traceback): + def exception_handler(self): + sonarr_queue.clear() logging.error('BAZARR connection to Sonarr SignalR feed has been lost.') self.restart() @@ -87,7 +95,7 @@ class SonarrSignalrClientLegacy: sonarr_method = ['series', 'episode'] for item in sonarr_method: - sonarr_hub.client.on(item, dispatcher) + sonarr_hub.client.on(item, feed_queue) self.connection.exception += self.exception_handler @@ -119,6 +127,7 @@ class SonarrSignalrClient: self.start() def exception_handler(self): + sonarr_queue.clear() logging.error("BAZARR connection to Sonarr SignalR feed has failed. We'll try to reconnect.") self.restart() @@ -148,7 +157,7 @@ class SonarrSignalrClient: 'Trying to reconnect...')) self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Sonarr is disconnected.')) self.connection.on_error(self.exception_handler) - self.connection.on("receiveMessage", dispatcher) + self.connection.on("receiveMessage", feed_queue) class RadarrSignalrClient: @@ -178,6 +187,7 @@ class RadarrSignalrClient: self.start() def exception_handler(self): + radarr_queue.clear() logging.error("BAZARR connection to Radarr SignalR feed has failed. We'll try to reconnect.") self.restart() @@ -206,38 +216,49 @@ class RadarrSignalrClient: 'Trying to reconnect...')) self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.')) self.connection.on_error(self.exception_handler) - self.connection.on("receiveMessage", dispatcher) + self.connection.on("receiveMessage", feed_queue) def dispatcher(data): try: - topic = media_id = action = None - episodesChanged = None - if isinstance(data, dict): + series_title = series_year = episode_title = season_number = episode_number = movie_title = movie_year = None + + # + try: + episodesChanged = False topic = data['name'] - try: - media_id = data['body']['resource']['id'] - action = data['body']['action'] + + media_id = data['body']['resource']['id'] + action = data['body']['action'] + if topic == 'series': if 'episodesChanged' in data['body']['resource']: episodesChanged = data['body']['resource']['episodesChanged'] - except KeyError: - return - elif isinstance(data, list): - topic = data[0]['name'] - try: - media_id = data[0]['body']['resource']['id'] - action = data[0]['body']['action'] - except KeyError: - return + series_title = data['body']['resource']['title'] + series_year = data['body']['resource']['year'] + elif topic == 'episode': + series_title = data['body']['resource']['series']['title'] + series_year = data['body']['resource']['series']['year'] + episode_title = data['body']['resource']['title'] + season_number = data['body']['resource']['seasonNumber'] + episode_number = data['body']['resource']['episodeNumber'] + elif topic == 'movie': + movie_title = data['body']['resource']['title'] + movie_year = data['body']['resource']['year'] + except KeyError: + return if topic == 'series': + logging.debug(f'Event received from Sonarr for series: {series_title} ({series_year})') update_one_series(series_id=media_id, action=action) if episodesChanged: # this will happen if a season monitored status is changed. sync_episodes(series_id=media_id, send_event=True) elif topic == 'episode': + logging.debug(f'Event received from Sonarr for episode: {series_title} ({series_year}) - ' + f'S{season_number:0>2}E{episode_number:0>2} - {episode_title}') sync_one_episode(episode_id=media_id, defer_search=settings.sonarr.getboolean('defer_search_signalr')) elif topic == 'movie': + logging.debug(f'Event received from Radarr for movie: {movie_title} ({movie_year})') update_one_movie(movie_id=media_id, action=action, defer_search=settings.radarr.getboolean('defer_search_signalr')) except Exception as e: @@ -246,6 +267,43 @@ def dispatcher(data): return +def feed_queue(data): + # check if event is duplicate from the previous one + global last_event_data + if data == last_event_data: + return + else: + last_event_data = data + + # some sonarr version send event as a list of a single dict, we make it a dict + if isinstance(data, list) and len(data): + data = data[0] + + # if data is a dict and contain an event for series, episode or movie, we add it to the event queue + if isinstance(data, dict) and 'name' in data: + if data['name'] in ['series', 'episode']: + sonarr_queue.append(data) + elif data['name'] == 'movie': + radarr_queue.append(data) + + +def consume_queue(queue): + # get events data from queue one at a time and dispatch it + while True: + try: + data = queue.popleft() + except IndexError: + pass + else: + dispatcher(data) + sleep(0.1) + + +# start both queue consuming threads +threading.Thread(target=consume_queue, args=(sonarr_queue,)).start() +threading.Thread(target=consume_queue, args=(radarr_queue,)).start() + +# instantiate proper SignalR client sonarr_signalr_client = SonarrSignalrClientLegacy() if get_sonarr_info.version().startswith(('0.', '2.', '3.')) else \ SonarrSignalrClient() radarr_signalr_client = RadarrSignalrClient() diff --git a/bazarr/sonarr/sync/utils.py b/bazarr/sonarr/sync/utils.py index d1eb4495f..e03f40ff3 100644 --- a/bazarr/sonarr/sync/utils.py +++ b/bazarr/sonarr/sync/utils.py @@ -91,7 +91,11 @@ def get_series_from_sonarr_api(url, apikey_sonarr, sonarr_series_id=None): logging.exception("BAZARR Error trying to get series from Sonarr.") return else: - return r.json() + result = r.json() + if isinstance(result, dict): + return list(result) + else: + return r.json() def get_episodes_from_sonarr_api(url, apikey_sonarr, series_id=None, episode_id=None):