Fixed: Progressively degrading performance issue in Pending/Delayed releases system.

fixes #2205
This commit is contained in:
Taloth Saldono 2017-10-25 22:58:52 +02:00
parent 1ef08424ff
commit 393996fe88
8 changed files with 160 additions and 61 deletions

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using FizzWare.NBuilder; using FizzWare.NBuilder;
using Marr.Data; using Marr.Data;
using Moq; using Moq;
@ -26,6 +27,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
private ReleaseInfo _release; private ReleaseInfo _release;
private ParsedEpisodeInfo _parsedEpisodeInfo; private ParsedEpisodeInfo _parsedEpisodeInfo;
private RemoteEpisode _remoteEpisode; private RemoteEpisode _remoteEpisode;
private List<PendingRelease> _heldReleases;
[SetUp] [SetUp]
public void Setup() public void Setup()
@ -60,17 +62,27 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
_remoteEpisode.Series = _series; _remoteEpisode.Series = _series;
_remoteEpisode.ParsedEpisodeInfo = _parsedEpisodeInfo; _remoteEpisode.ParsedEpisodeInfo = _parsedEpisodeInfo;
_remoteEpisode.Release = _release; _remoteEpisode.Release = _release;
_temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary)); _temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary));
_heldReleases = new List<PendingRelease>();
Mocker.GetMock<IPendingReleaseRepository>() Mocker.GetMock<IPendingReleaseRepository>()
.Setup(s => s.All()) .Setup(s => s.All())
.Returns(new List<PendingRelease>()); .Returns(_heldReleases);
Mocker.GetMock<IPendingReleaseRepository>()
.Setup(s => s.AllBySeriesId(It.IsAny<int>()))
.Returns<int>(i => _heldReleases.Where(v => v.SeriesId == i).ToList());
Mocker.GetMock<ISeriesService>() Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<int>())) .Setup(s => s.GetSeries(It.IsAny<int>()))
.Returns(_series); .Returns(_series);
Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<IEnumerable<int>>()))
.Returns(new List<Series> { _series });
Mocker.GetMock<IParsingService>() Mocker.GetMock<IParsingService>()
.Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null)) .Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null))
.Returns(new List<Episode> {_episode}); .Returns(new List<Episode> {_episode});
@ -80,7 +92,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
.Returns((List<DownloadDecision> d) => d); .Returns((List<DownloadDecision> d) => d);
} }
private void GivenHeldRelease(string title, string indexer, DateTime publishDate) private void GivenHeldRelease(string title, string indexer, DateTime publishDate, PendingReleaseReason reason = PendingReleaseReason.Delay)
{ {
var release = _release.JsonClone(); var release = _release.JsonClone();
release.Indexer = indexer; release.Indexer = indexer;
@ -92,11 +104,10 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
.With(h => h.SeriesId = _series.Id) .With(h => h.SeriesId = _series.Id)
.With(h => h.Title = title) .With(h => h.Title = title)
.With(h => h.Release = release) .With(h => h.Release = release)
.With(h => h.Reason = reason)
.Build(); .Build();
Mocker.GetMock<IPendingReleaseRepository>() _heldReleases.AddRange(heldReleases);
.Setup(s => s.All())
.Returns(heldReleases);
} }
[Test] [Test]
@ -117,6 +128,29 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
VerifyNoInsert(); VerifyNoInsert();
} }
[Test]
public void should_not_add_if_it_is_the_same_release_from_the_same_indexer_twice()
{
GivenHeldRelease(_release.Title, _release.Indexer, _release.PublishDate, PendingReleaseReason.DownloadClientUnavailable);
GivenHeldRelease(_release.Title, _release.Indexer, _release.PublishDate, PendingReleaseReason.Fallback);
Subject.Add(_temporarilyRejected, PendingReleaseReason.Delay);
VerifyNoInsert();
}
[Test]
public void should_remove_duplicate_if_it_is_the_same_release_from_the_same_indexer_twice()
{
GivenHeldRelease(_release.Title, _release.Indexer, _release.PublishDate, PendingReleaseReason.DownloadClientUnavailable);
GivenHeldRelease(_release.Title, _release.Indexer, _release.PublishDate, PendingReleaseReason.Fallback);
Subject.Add(_temporarilyRejected, PendingReleaseReason.Fallback);
Mocker.GetMock<IPendingReleaseRepository>()
.Verify(v => v.Delete(It.IsAny<int>()), Times.Once());
}
[Test] [Test]
public void should_add_if_title_is_different() public void should_add_if_title_is_different()
{ {

View File

@ -1,4 +1,5 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using FizzWare.NBuilder; using FizzWare.NBuilder;
using Marr.Data; using Marr.Data;
using Moq; using Moq;
@ -26,6 +27,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
private ReleaseInfo _release; private ReleaseInfo _release;
private ParsedEpisodeInfo _parsedEpisodeInfo; private ParsedEpisodeInfo _parsedEpisodeInfo;
private RemoteEpisode _remoteEpisode; private RemoteEpisode _remoteEpisode;
private List<PendingRelease> _heldReleases;
[SetUp] [SetUp]
public void Setup() public void Setup()
@ -63,14 +65,24 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
_temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary)); _temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary));
_heldReleases = new List<PendingRelease>();
Mocker.GetMock<IPendingReleaseRepository>() Mocker.GetMock<IPendingReleaseRepository>()
.Setup(s => s.All()) .Setup(s => s.All())
.Returns(new List<PendingRelease>()); .Returns(_heldReleases);
Mocker.GetMock<IPendingReleaseRepository>()
.Setup(s => s.AllBySeriesId(It.IsAny<int>()))
.Returns<int>(i => _heldReleases.Where(v => v.SeriesId == i).ToList());
Mocker.GetMock<ISeriesService>() Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<int>())) .Setup(s => s.GetSeries(It.IsAny<int>()))
.Returns(_series); .Returns(_series);
Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<IEnumerable<int>>()))
.Returns(new List<Series> { _series });
Mocker.GetMock<IParsingService>() Mocker.GetMock<IParsingService>()
.Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null)) .Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null))
.Returns(new List<Episode> {_episode}); .Returns(new List<Episode> {_episode});
@ -92,9 +104,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
.With(h => h.ParsedEpisodeInfo = parsedEpisodeInfo) .With(h => h.ParsedEpisodeInfo = parsedEpisodeInfo)
.Build(); .Build();
Mocker.GetMock<IPendingReleaseRepository>() _heldReleases.AddRange(heldReleases);
.Setup(s => s.All())
.Returns(heldReleases);
} }
[Test] [Test]

View File

@ -38,6 +38,10 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
.Setup(s => s.GetSeries(It.IsAny<int>())) .Setup(s => s.GetSeries(It.IsAny<int>()))
.Returns(new Series()); .Returns(new Series());
Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<IEnumerable<int>>()))
.Returns(new List<Series> { new Series() });
Mocker.GetMock<IParsingService>() Mocker.GetMock<IParsingService>()
.Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), It.IsAny<Series>(), It.IsAny<bool>(), null)) .Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), It.IsAny<Series>(), It.IsAny<bool>(), null))
.Returns(new List<Episode>{ _episode }); .Returns(new List<Episode>{ _episode });
@ -63,7 +67,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
AssertRemoved(1); AssertRemoved(1);
} }
[Test] [Test]
public void should_remove_multiple_releases_release() public void should_remove_multiple_releases_release()
{ {
@ -134,7 +138,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
AssertRemoved(2); AssertRemoved(2);
} }
private void AssertRemoved(params int[] ids) private void AssertRemoved(params int[] ids)
{ {
Mocker.GetMock<IPendingReleaseRepository>().Verify(c => c.DeleteMany(It.Is<IEnumerable<int>>(s => s.SequenceEqual(ids)))); Mocker.GetMock<IPendingReleaseRepository>().Verify(c => c.DeleteMany(It.Is<IEnumerable<int>>(s => s.SequenceEqual(ids))));

View File

@ -62,7 +62,7 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
_remoteEpisode.Series = _series; _remoteEpisode.Series = _series;
_remoteEpisode.ParsedEpisodeInfo = _parsedEpisodeInfo; _remoteEpisode.ParsedEpisodeInfo = _parsedEpisodeInfo;
_remoteEpisode.Release = _release; _remoteEpisode.Release = _release;
_temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary)); _temporarilyRejected = new DownloadDecision(_remoteEpisode, new Rejection("Temp Rejected", RejectionType.Temporary));
Mocker.GetMock<IPendingReleaseRepository>() Mocker.GetMock<IPendingReleaseRepository>()
@ -73,6 +73,10 @@ namespace NzbDrone.Core.Test.Download.Pending.PendingReleaseServiceTests
.Setup(s => s.GetSeries(It.IsAny<int>())) .Setup(s => s.GetSeries(It.IsAny<int>()))
.Returns(_series); .Returns(_series);
Mocker.GetMock<ISeriesService>()
.Setup(s => s.GetSeries(It.IsAny<IEnumerable<int>>()))
.Returns(new List<Series> { _series });
Mocker.GetMock<IParsingService>() Mocker.GetMock<IParsingService>()
.Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null)) .Setup(s => s.GetEpisodes(It.IsAny<ParsedEpisodeInfo>(), _series, true, null))
.Returns(new List<Episode> {_episode}); .Returns(new List<Episode> {_episode});

View File

@ -81,7 +81,7 @@ namespace NzbDrone.Core.DecisionEngine.Specifications.RssSync
var episodeIds = subject.Episodes.Select(e => e.Id); var episodeIds = subject.Episodes.Select(e => e.Id);
var oldest = _pendingReleaseService.OldestPendingRelease(subject.Series.Id, episodeIds); var oldest = _pendingReleaseService.OldestPendingRelease(subject.Series.Id, episodeIds.ToArray());
if (oldest != null && oldest.Release.AgeMinutes > delay) if (oldest != null && oldest.Release.AgeMinutes > delay)
{ {

View File

@ -8,6 +8,7 @@ namespace NzbDrone.Core.Download.Pending
{ {
void DeleteBySeriesId(int seriesId); void DeleteBySeriesId(int seriesId);
List<PendingRelease> AllBySeriesId(int seriesId); List<PendingRelease> AllBySeriesId(int seriesId);
List<PendingRelease> WithoutFallback();
} }
public class PendingReleaseRepository : BasicRepository<PendingRelease>, IPendingReleaseRepository public class PendingReleaseRepository : BasicRepository<PendingRelease>, IPendingReleaseRepository
@ -26,5 +27,10 @@ namespace NzbDrone.Core.Download.Pending
{ {
return Query.Where(p => p.SeriesId == seriesId); return Query.Where(p => p.SeriesId == seriesId);
} }
public List<PendingRelease> WithoutFallback()
{
return Query.Where(p => p.Reason != PendingReleaseReason.Fallback);
}
} }
} }

View File

@ -21,12 +21,13 @@ namespace NzbDrone.Core.Download.Pending
public interface IPendingReleaseService public interface IPendingReleaseService
{ {
void Add(DownloadDecision decision, PendingReleaseReason reason); void Add(DownloadDecision decision, PendingReleaseReason reason);
void AddMany(List<Tuple<DownloadDecision, PendingReleaseReason>> decisions);
List<ReleaseInfo> GetPending(); List<ReleaseInfo> GetPending();
List<RemoteEpisode> GetPendingRemoteEpisodes(int seriesId); List<RemoteEpisode> GetPendingRemoteEpisodes(int seriesId);
List<Queue.Queue> GetPendingQueue(); List<Queue.Queue> GetPendingQueue();
Queue.Queue FindPendingQueueItem(int queueId); Queue.Queue FindPendingQueueItem(int queueId);
void RemovePendingQueueItems(int queueId); void RemovePendingQueueItems(int queueId);
RemoteEpisode OldestPendingRelease(int seriesId, IEnumerable<int> episodeIds); RemoteEpisode OldestPendingRelease(int seriesId, int[] episodeIds);
} }
public class PendingReleaseService : IPendingReleaseService, public class PendingReleaseService : IPendingReleaseService,
@ -68,8 +69,27 @@ namespace NzbDrone.Core.Download.Pending
public void Add(DownloadDecision decision, PendingReleaseReason reason) public void Add(DownloadDecision decision, PendingReleaseReason reason)
{ {
var alreadyPending = GetPendingReleases(); var alreadyPending = _repository.AllBySeriesId(decision.RemoteEpisode.Series.Id);
alreadyPending = IncludeRemoteEpisodes(alreadyPending);
Add(alreadyPending, decision, reason);
}
public void AddMany(List<Tuple<DownloadDecision, PendingReleaseReason>> decisions)
{
var alreadyPending = decisions.Select(v => v.Item1.RemoteEpisode.Series.Id).Distinct().SelectMany(_repository.AllBySeriesId).ToList();
alreadyPending = IncludeRemoteEpisodes(alreadyPending);
foreach (var pair in decisions)
{
Add(alreadyPending, pair.Item1, pair.Item2);
}
}
private void Add(List<PendingRelease> alreadyPending, DownloadDecision decision, PendingReleaseReason reason)
{
var episodeIds = decision.RemoteEpisode.Episodes.Select(e => e.Id); var episodeIds = decision.RemoteEpisode.Episodes.Select(e => e.Id);
var existingReports = alreadyPending.Where(r => r.RemoteEpisode.Episodes.Select(e => e.Id) var existingReports = alreadyPending.Where(r => r.RemoteEpisode.Episodes.Select(e => e.Id)
@ -80,24 +100,31 @@ namespace NzbDrone.Core.Download.Pending
if (matchingReports.Any()) if (matchingReports.Any())
{ {
var sameReason = true; var matchingReport = matchingReports.First();
foreach (var matchingReport in matchingReports) if (matchingReport.Reason != reason)
{ {
if (matchingReport.Reason != reason) _logger.Debug("The release {0} is already pending with reason {1}, changing to {2}", decision.RemoteEpisode, matchingReport.Reason, reason);
matchingReport.Reason = reason;
_repository.Update(matchingReport);
}
else
{
_logger.Debug("The release {0} is already pending with reason {1}, not adding again", decision.RemoteEpisode, reason);
}
if (matchingReports.Count() > 1)
{
_logger.Debug("The release {0} had {1} duplicate pending, removing duplicates.", decision.RemoteEpisode, matchingReports.Count() - 1);
foreach (var duplicate in matchingReports.Skip(1))
{ {
_logger.Debug("The release {0} is already pending with reason {1}, changing to {2}", decision.RemoteEpisode, matchingReport.Reason, reason); _repository.Delete(duplicate.Id);
matchingReport.Reason = reason; alreadyPending.Remove(duplicate);
_repository.Update(matchingReport);
sameReason = false;
} }
} }
if (sameReason) return;
{
_logger.Debug("The release {0} is already pending with reason {1}, not adding again", decision.RemoteEpisode, reason);
return;
}
} }
_logger.Debug("Adding release {0} to pending releases with reason {1}", decision.RemoteEpisode, reason); _logger.Debug("Adding release {0} to pending releases with reason {1}", decision.RemoteEpisode, reason);
@ -125,7 +152,7 @@ namespace NzbDrone.Core.Download.Pending
public List<RemoteEpisode> GetPendingRemoteEpisodes(int seriesId) public List<RemoteEpisode> GetPendingRemoteEpisodes(int seriesId)
{ {
return _repository.AllBySeriesId(seriesId).Select(GetRemoteEpisode).ToList(); return IncludeRemoteEpisodes(_repository.AllBySeriesId(seriesId)).Select(v => v.RemoteEpisode).ToList();
} }
public List<Queue.Queue> GetPendingQueue() public List<Queue.Queue> GetPendingQueue()
@ -134,7 +161,8 @@ namespace NzbDrone.Core.Download.Pending
var nextRssSync = new Lazy<DateTime>(() => _taskManager.GetNextExecution(typeof(RssSyncCommand))); var nextRssSync = new Lazy<DateTime>(() => _taskManager.GetNextExecution(typeof(RssSyncCommand)));
foreach (var pendingRelease in GetPendingReleases().Where(p => p.Reason != PendingReleaseReason.Fallback)) var pendingReleases = IncludeRemoteEpisodes(_repository.WithoutFallback());
foreach (var pendingRelease in pendingReleases)
{ {
foreach (var episode in pendingRelease.RemoteEpisode.Episodes) foreach (var episode in pendingRelease.RemoteEpisode.Episodes)
{ {
@ -206,24 +234,48 @@ namespace NzbDrone.Core.Download.Pending
_repository.DeleteMany(releasesToRemove.Select(c => c.Id)); _repository.DeleteMany(releasesToRemove.Select(c => c.Id));
} }
public RemoteEpisode OldestPendingRelease(int seriesId, IEnumerable<int> episodeIds) public RemoteEpisode OldestPendingRelease(int seriesId, int[] episodeIds)
{ {
return GetPendingRemoteEpisodes(seriesId).Where(r => r.Episodes.Select(e => e.Id).Intersect(episodeIds).Any()) var seriesReleases = GetPendingReleases(seriesId);
.OrderByDescending(p => p.Release.AgeHours)
.FirstOrDefault(); return seriesReleases.Select(r => r.RemoteEpisode)
.Where(r => r.Episodes.Select(e => e.Id).Intersect(episodeIds).Any())
.OrderByDescending(p => p.Release.AgeHours)
.FirstOrDefault();
} }
private List<PendingRelease> GetPendingReleases() private List<PendingRelease> GetPendingReleases()
{
return IncludeRemoteEpisodes(_repository.All().ToList());
}
private List<PendingRelease> GetPendingReleases(int seriesId)
{
return IncludeRemoteEpisodes(_repository.AllBySeriesId(seriesId).ToList());
}
private List<PendingRelease> IncludeRemoteEpisodes(List<PendingRelease> releases)
{ {
var result = new List<PendingRelease>(); var result = new List<PendingRelease>();
var seriesMap = _seriesService.GetSeries(releases.Select(v => v.SeriesId).Distinct())
.ToDictionary(v => v.Id);
foreach (var release in _repository.All()) foreach (var release in releases)
{ {
var remoteEpisode = GetRemoteEpisode(release); var series = seriesMap.GetValueOrDefault(release.SeriesId);
if (remoteEpisode == null) continue; // Just in case the series was removed, but wasn't cleaned up yet (housekeeper will clean it up)
if (series == null) return null;
release.RemoteEpisode = remoteEpisode; var episodes = _parsingService.GetEpisodes(release.ParsedEpisodeInfo, series, true);
release.RemoteEpisode = new RemoteEpisode
{
Series = series,
Episodes = episodes,
ParsedEpisodeInfo = release.ParsedEpisodeInfo,
Release = release.Release
};
result.Add(release); result.Add(release);
} }
@ -231,24 +283,6 @@ namespace NzbDrone.Core.Download.Pending
return result; return result;
} }
private RemoteEpisode GetRemoteEpisode(PendingRelease release)
{
var series = _seriesService.GetSeries(release.SeriesId);
//Just in case the series was removed, but wasn't cleaned up yet (housekeeper will clean it up)
if (series == null) return null;
var episodes = _parsingService.GetEpisodes(release.ParsedEpisodeInfo, series, true);
return new RemoteEpisode
{
Series = series,
Episodes = episodes,
ParsedEpisodeInfo = release.ParsedEpisodeInfo,
Release = release.Release
};
}
private void Insert(DownloadDecision decision, PendingReleaseReason reason) private void Insert(DownloadDecision decision, PendingReleaseReason reason)
{ {
_repository.Insert(new PendingRelease _repository.Insert(new PendingRelease
@ -288,7 +322,7 @@ namespace NzbDrone.Core.Download.Pending
private void RemoveGrabbed(RemoteEpisode remoteEpisode) private void RemoveGrabbed(RemoteEpisode remoteEpisode)
{ {
var pendingReleases = GetPendingReleases(); var pendingReleases = GetPendingReleases(remoteEpisode.Series.Id);
var episodeIds = remoteEpisode.Episodes.Select(e => e.Id); var episodeIds = remoteEpisode.Episodes.Select(e => e.Id);
var existingReports = pendingReleases.Where(r => r.RemoteEpisode.Episodes.Select(e => e.Id) var existingReports = pendingReleases.Where(r => r.RemoteEpisode.Episodes.Select(e => e.Id)

View File

@ -131,6 +131,8 @@ namespace NzbDrone.Core.Download
var pending = new List<DownloadDecision>(); var pending = new List<DownloadDecision>();
var stored = new List<DownloadDecision>(); var stored = new List<DownloadDecision>();
var addQueue = new List<Tuple<DownloadDecision, PendingReleaseReason>>();
foreach (var report in failed) foreach (var report in failed)
{ {
// If a release was already grabbed with matching episodes we should store it as a fallback // If a release was already grabbed with matching episodes we should store it as a fallback
@ -141,22 +143,27 @@ namespace NzbDrone.Core.Download
if (IsEpisodeProcessed(grabbed, report)) if (IsEpisodeProcessed(grabbed, report))
{ {
_pendingReleaseService.Add(report, PendingReleaseReason.Fallback); addQueue.Add(Tuple.Create(report, PendingReleaseReason.Fallback));
pending.Add(report); pending.Add(report);
} }
else if (IsEpisodeProcessed(stored, report)) else if (IsEpisodeProcessed(stored, report))
{ {
_pendingReleaseService.Add(report, PendingReleaseReason.Fallback); addQueue.Add(Tuple.Create(report, PendingReleaseReason.Fallback));
pending.Add(report); pending.Add(report);
} }
else else
{ {
_pendingReleaseService.Add(report, PendingReleaseReason.DownloadClientUnavailable); addQueue.Add(Tuple.Create(report, PendingReleaseReason.DownloadClientUnavailable));
pending.Add(report); pending.Add(report);
stored.Add(report); stored.Add(report);
} }
} }
if (addQueue.Any())
{
_pendingReleaseService.AddMany(addQueue);
}
return pending; return pending;
} }
} }