diff --git a/src/NzbDrone.Common/Cache/CacheManager.cs b/src/NzbDrone.Common/Cache/CacheManager.cs index 7c2ef01f3..7b7f0d3e0 100644 --- a/src/NzbDrone.Common/Cache/CacheManager.cs +++ b/src/NzbDrone.Common/Cache/CacheManager.cs @@ -8,6 +8,7 @@ namespace NzbDrone.Common.Cache { ICached GetCache(Type host); ICached GetCache(Type host, string name); + ICached GetRollingCache(Type host, string name, TimeSpan defaultLifeTime); ICachedDictionary GetCacheDictionary(Type host, string name, Func> fetchFunc = null, TimeSpan? lifeTime = null); void Clear(); ICollection Caches { get; } @@ -44,6 +45,14 @@ namespace NzbDrone.Common.Cache return (ICached)_cache.Get(host.FullName + "_" + name, () => new Cached()); } + public ICached GetRollingCache(Type host, string name, TimeSpan defaultLifeTime) + { + Ensure.That(host, () => host).IsNotNull(); + Ensure.That(name, () => name).IsNotNullOrWhiteSpace(); + + return (ICached)_cache.Get(host.FullName + "_" + name, () => new Cached(defaultLifeTime, true)); + } + public ICachedDictionary GetCacheDictionary(Type host, string name, Func> fetchFunc = null, TimeSpan? lifeTime = null) { Ensure.That(host, () => host).IsNotNull(); diff --git a/src/NzbDrone.Common/Cache/Cached.cs b/src/NzbDrone.Common/Cache/Cached.cs index 928809d30..db7f8d5a3 100644 --- a/src/NzbDrone.Common/Cache/Cached.cs +++ b/src/NzbDrone.Common/Cache/Cached.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using NzbDrone.Common.EnsureThat; +using NzbDrone.Common.Extensions; namespace NzbDrone.Common.Cache { @@ -30,35 +31,49 @@ namespace NzbDrone.Common.Cache } private readonly ConcurrentDictionary _store; + private readonly TimeSpan? _defaultLifeTime; + private readonly bool _rollingExpiry; - public Cached() + public Cached(TimeSpan? defaultLifeTime = null, bool rollingExpiry = false) { _store = new ConcurrentDictionary(); + _defaultLifeTime = defaultLifeTime; + _rollingExpiry = rollingExpiry; } - public void Set(string key, T value, TimeSpan? lifetime = null) + public void Set(string key, T value, TimeSpan? lifeTime = null) { Ensure.That(key, () => key).IsNotNullOrWhiteSpace(); - _store[key] = new CacheItem(value, lifetime); + _store[key] = new CacheItem(value, lifeTime ?? _defaultLifeTime); } public T Find(string key) { - CacheItem value; - _store.TryGetValue(key, out value); - - if (value == null) + CacheItem cacheItem; + if (!_store.TryGetValue(key, out cacheItem)) { return default(T); } - if (value.IsExpired()) + if (cacheItem.IsExpired()) { - _store.TryRemove(key, out value); - return default(T); + if (TryRemove(key, cacheItem)) + { + return default(T); + } + + if (!_store.TryGetValue(key, out cacheItem)) + { + return default(T); + } } - return value.Object; + if (_rollingExpiry && _defaultLifeTime.HasValue) + { + _store.TryUpdate(key, new CacheItem(cacheItem.Object, _defaultLifeTime.Value), cacheItem); + } + + return cacheItem.Object; } public void Remove(string key) @@ -73,20 +88,31 @@ namespace NzbDrone.Common.Cache { Ensure.That(key, () => key).IsNotNullOrWhiteSpace(); - CacheItem cacheItem; - T value; + lifeTime = lifeTime ?? _defaultLifeTime; - if (!_store.TryGetValue(key, out cacheItem) || cacheItem.IsExpired()) + CacheItem cacheItem; + + if (_store.TryGetValue(key, out cacheItem) && !cacheItem.IsExpired()) { - value = function(); - Set(key, value, lifeTime); + if (_rollingExpiry && lifeTime.HasValue) + { + _store.TryUpdate(key, new CacheItem(cacheItem.Object, lifeTime), cacheItem); + } } else { - value = cacheItem.Object; + var newCacheItem = new CacheItem(function(), lifeTime); + if (cacheItem != null && _store.TryUpdate(key, newCacheItem, cacheItem)) + { + cacheItem = newCacheItem; + } + else + { + cacheItem = _store.GetOrAdd(key, newCacheItem); + } } - return value; + return cacheItem.Object; } public void Clear() @@ -96,9 +122,11 @@ namespace NzbDrone.Common.Cache public void ClearExpired() { - foreach (var cached in _store.Where(c => c.Value.IsExpired())) + var collection = (ICollection>)_store; + + foreach (var cached in _store.Where(c => c.Value.IsExpired()).ToList()) { - Remove(cached.Key); + collection.Remove(cached); } } @@ -110,5 +138,12 @@ namespace NzbDrone.Common.Cache } } + private bool TryRemove(string key, CacheItem value) + { + var collection = (ICollection>)_store; + + return collection.Remove(new KeyValuePair(key, value)); + } + } } \ No newline at end of file diff --git a/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs b/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs new file mode 100644 index 000000000..bccd99028 --- /dev/null +++ b/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs @@ -0,0 +1,9 @@ +using NzbDrone.Common.Messaging; + +namespace NzbDrone.Core.MediaFiles.Events +{ + public class RenameCompletedEvent : IEvent + { + + } +} diff --git a/src/NzbDrone.Core/MediaFiles/RenameEpisodeFileService.cs b/src/NzbDrone.Core/MediaFiles/RenameEpisodeFileService.cs index e98598d43..3ccee5b82 100644 --- a/src/NzbDrone.Core/MediaFiles/RenameEpisodeFileService.cs +++ b/src/NzbDrone.Core/MediaFiles/RenameEpisodeFileService.cs @@ -154,6 +154,8 @@ namespace NzbDrone.Core.MediaFiles _logger.ProgressInfo("Renaming {0} files for {1}", episodeFiles.Count, series.Title); RenameFiles(episodeFiles, series); _logger.ProgressInfo("Selected episode files renamed for {0}", series.Title); + + _eventAggregator.PublishEvent(new RenameCompletedEvent()); } public void Execute(RenameSeriesCommand message) @@ -168,6 +170,8 @@ namespace NzbDrone.Core.MediaFiles RenameFiles(episodeFiles, series); _logger.ProgressInfo("All episode files renamed for {0}", series.Title); } + + _eventAggregator.PublishEvent(new RenameCompletedEvent()); } } } diff --git a/src/NzbDrone.Core/Notifications/INotification.cs b/src/NzbDrone.Core/Notifications/INotification.cs index c0b9fe5e4..b808f75f8 100644 --- a/src/NzbDrone.Core/Notifications/INotification.cs +++ b/src/NzbDrone.Core/Notifications/INotification.cs @@ -11,6 +11,7 @@ namespace NzbDrone.Core.Notifications void OnDownload(DownloadMessage message); void OnRename(Series series); void OnHealthIssue(HealthCheck.HealthCheck healthCheck); + void ProcessQueue(); bool SupportsOnGrab { get; } bool SupportsOnDownload { get; } bool SupportsOnUpgrade { get; } diff --git a/src/NzbDrone.Core/Notifications/NotificationBase.cs b/src/NzbDrone.Core/Notifications/NotificationBase.cs index caeb83639..062048e3b 100644 --- a/src/NzbDrone.Core/Notifications/NotificationBase.cs +++ b/src/NzbDrone.Core/Notifications/NotificationBase.cs @@ -49,6 +49,11 @@ namespace NzbDrone.Core.Notifications } + public virtual void ProcessQueue() + { + + } + public bool SupportsOnGrab => HasConcreteImplementation("OnGrab"); public bool SupportsOnRename => HasConcreteImplementation("OnRename"); public bool SupportsOnDownload => HasConcreteImplementation("OnDownload"); @@ -76,6 +81,5 @@ namespace NzbDrone.Core.Notifications return !method.DeclaringType.IsAbstract; } - } } diff --git a/src/NzbDrone.Core/Notifications/NotificationService.cs b/src/NzbDrone.Core/Notifications/NotificationService.cs index 64d44261a..eb1f811a1 100644 --- a/src/NzbDrone.Core/Notifications/NotificationService.cs +++ b/src/NzbDrone.Core/Notifications/NotificationService.cs @@ -17,7 +17,10 @@ namespace NzbDrone.Core.Notifications : IHandle, IHandle, IHandle, - IHandle + IHandle, + IHandleAsync, + IHandleAsync, + IHandleAsync { private readonly INotificationFactory _notificationFactory; private readonly Logger _logger; @@ -204,5 +207,35 @@ namespace NzbDrone.Core.Notifications } } } + + public void HandleAsync(DownloadsProcessedEvent message) + { + ProcessQueue(); + } + + public void HandleAsync(RenameCompletedEvent message) + { + ProcessQueue(); + } + + public void HandleAsync(HealthCheckCompleteEvent message) + { + ProcessQueue(); + } + + private void ProcessQueue() + { + foreach (var notification in _notificationFactory.GetAvailableProviders()) + { + try + { + notification.ProcessQueue(); + } + catch (Exception ex) + { + _logger.Warn(ex, "Unable to process notification queue for " + notification.Definition.Name); + } + } + } } } diff --git a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs index bf0ee81c3..d1e6e58c7 100644 --- a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs +++ b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs @@ -1,6 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; using FluentValidation.Results; +using NLog; +using NzbDrone.Common.Cache; using NzbDrone.Common.Extensions; using NzbDrone.Core.Exceptions; using NzbDrone.Core.Notifications.Plex.PlexTv; @@ -13,11 +16,23 @@ namespace NzbDrone.Core.Notifications.Plex.Server { private readonly IPlexServerService _plexServerService; private readonly IPlexTvService _plexTvService; + private readonly Logger _logger; - public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService) + class PlexUpdateQueue + { + public Dictionary Pending { get; } = new Dictionary(); + public bool Refreshing { get; set; } + } + + private readonly ICached _pendingSeriesCache; + + public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService, ICacheManager cacheManager, Logger logger) { _plexServerService = plexServerService; _plexTvService = plexTvService; + _logger = logger; + + _pendingSeriesCache = cacheManager.GetRollingCache(GetType(), "pendingSeries", TimeSpan.FromDays(1)); } public override string Link => "https://www.plex.tv/"; @@ -37,7 +52,63 @@ namespace NzbDrone.Core.Notifications.Plex.Server { if (Settings.UpdateLibrary) { - _plexServerService.UpdateLibrary(series, Settings); + _logger.Debug("Scheduling library update for series {0} {1}", series.Id, series.Title); + var queue = _pendingSeriesCache.Get(Settings.Host, () => new PlexUpdateQueue()); + lock (queue) + { + queue.Pending[series.Id] = series; + } + } + } + + public override void ProcessQueue() + { + PlexUpdateQueue queue = _pendingSeriesCache.Find(Settings.Host); + if (queue == null) + { + return; + } + + lock (queue) + { + if (queue.Refreshing) + { + return; + } + queue.Refreshing = true; + } + try + { + while (true) + { + List refreshingSeries; + lock (queue) + { + if (queue.Pending.Empty()) + { + queue.Refreshing = false; + return; + } + + refreshingSeries = queue.Pending.Values.ToList(); + queue.Pending.Clear(); + } + + if (Settings.UpdateLibrary) + { + _logger.Debug("Performing library update for {0} series", refreshingSeries.Count); + _plexServerService.UpdateLibrary(refreshingSeries, Settings); + } + } + } + catch + { + lock (queue) + { + queue.Refreshing = false; + } + + throw; } } diff --git a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs index dfe7de47f..857dff4d3 100644 --- a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs +++ b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text.RegularExpressions; using FluentValidation.Results; @@ -13,6 +14,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server public interface IPlexServerService { void UpdateLibrary(Series series, PlexServerSettings settings); + void UpdateLibrary(IEnumerable series, PlexServerSettings settings); ValidationFailure Test(PlexServerSettings settings); } @@ -32,10 +34,16 @@ namespace NzbDrone.Core.Notifications.Plex.Server } public void UpdateLibrary(Series series, PlexServerSettings settings) + { + UpdateLibrary(new[] { series }, settings); + } + + public void UpdateLibrary(IEnumerable multipleSeries, PlexServerSettings settings) { try { _logger.Debug("Sending Update Request to Plex Server"); + var watch = Stopwatch.StartNew(); var version = _versionCache.Get(settings.Host, () => GetVersion(settings), TimeSpan.FromHours(2)); ValidateVersion(version); @@ -45,16 +53,34 @@ namespace NzbDrone.Core.Notifications.Plex.Server if (partialUpdates) { - UpdatePartialSection(series, sections, settings); - } + var partiallyUpdated = true; + foreach (var series in multipleSeries) + { + partiallyUpdated &= UpdatePartialSection(series, sections, settings); + + if (!partiallyUpdated) + { + break; + } + } + + // Only update complete sections if all partial updates failed + if (!partiallyUpdated) + { + _logger.Debug("Unable to update partial section, updating all TV sections"); + sections.ForEach(s => UpdateSection(s.Id, settings)); + } + } else { sections.ForEach(s => UpdateSection(s.Id, settings)); } + + _logger.Debug("Finished sending Update Request to Plex Server (took {0} ms)", watch.ElapsedMilliseconds); } - catch(Exception ex) + catch (Exception ex) { _logger.Warn(ex, "Failed to Update Plex host: " + settings.Host); throw; @@ -125,7 +151,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server _plexServerProxy.Update(sectionId, settings); } - private void UpdatePartialSection(Series series, List sections, PlexServerSettings settings) + private bool UpdatePartialSection(Series series, List sections, PlexServerSettings settings) { var partiallyUpdated = false; @@ -142,12 +168,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server } } - // Only update complete sections if all partial updates failed - if (!partiallyUpdated) - { - _logger.Debug("Unable to update partial section, updating all TV sections"); - sections.ForEach(s => UpdateSection(s.Id, settings)); - } + return partiallyUpdated; } private int? GetMetadataId(int sectionId, Series series, string language, PlexServerSettings settings) @@ -161,6 +182,8 @@ namespace NzbDrone.Core.Notifications.Plex.Server { try { + _versionCache.Remove(settings.Host); + _partialUpdateCache.Remove(settings.Host); var sections = GetSections(settings); if (sections.Empty())