JobProvider can reset itself.

cleaned up unit test logging
This commit is contained in:
kay.one 2011-11-06 22:26:21 -08:00
parent 82b6ec5ed4
commit 07458529f6
11 changed files with 401 additions and 398 deletions

View File

@ -21,7 +21,6 @@ namespace NzbDrone.Core.Test
readonly IList<Type> indexers = typeof(CentralDispatch).Assembly.GetTypes().Where(t => t.IsSubclassOf(typeof(IndexerBase))).ToList();
readonly IList<Type> jobs = typeof(CentralDispatch).Assembly.GetTypes().Where(t => t.GetInterfaces().Contains(typeof(IJob))).ToList();
[Test]
public void InitAppTest()
{

View File

@ -18,6 +18,7 @@ namespace NzbDrone.Core.Test.Framework
public static IDatabase GetEmptyDatabase(bool enableLogging = false, string fileName = "")
{
Console.WriteLine("====================DataBase====================");
Console.WriteLine("Cloning database from template.");
if (String.IsNullOrWhiteSpace(fileName))
@ -31,6 +32,10 @@ namespace NzbDrone.Core.Test.Framework
var database = Connection.GetPetaPocoDb(connectionString);
Console.WriteLine("====================DataBase====================");
Console.WriteLine();
Console.WriteLine();
return database;
}

View File

@ -5,6 +5,7 @@ using NUnit.Framework;
using Ninject;
using NzbDrone.Common;
using NzbDrone.Test.Common;
using PetaPoco;
namespace NzbDrone.Core.Test.Framework
{
@ -30,6 +31,7 @@ namespace NzbDrone.Core.Test.Framework
protected StandardKernel LiveKernel = null;
protected AutoMoqer Mocker = null;
protected IDatabase Db = null;
protected string VirtualPath
{
@ -60,6 +62,16 @@ namespace NzbDrone.Core.Test.Framework
protected void WithStrictMocker()
{
Mocker = new AutoMoqer(MockBehavior.Strict);
if (Db != null)
{
Mocker.SetConstant(Db);
}
}
protected void WithRealDb()
{
Db = MockLib.GetEmptyDatabase();
Mocker.SetConstant(Db);
}
[TearDown]
@ -67,6 +79,7 @@ namespace NzbDrone.Core.Test.Framework
{
ExceptionVerification.AssertNoUnexcpectedLogs();
Mocker = new AutoMoqer(MockBehavior.Strict);
WebTimer.Stop();
}

View File

@ -86,6 +86,7 @@
<Compile Include="ProviderTests\PostDownloadProviderTests\PostDownloadProviderFixture.cs" />
<Compile Include="JobTests\SearchJobTest.cs" />
<Compile Include="ProviderTests\PostDownloadProviderTests\ProcessDownloadFixture.cs" />
<Compile Include="ProviderTests\JobProviderTests\TestJobs.cs" />
<Compile Include="ProviderTests\UpdateProviderTests\PreformUpdateFixture.cs" />
<Compile Include="ProviderTests\UpdateProviderTests\GetAvilableUpdateFixture.cs" />
<Compile Include="SortHelperTest.cs" />
@ -117,7 +118,7 @@
<Compile Include="JobTests\DiskScanJobTest.cs" />
<Compile Include="IndexerTests.cs" />
<Compile Include="ProviderTests\InventoryProvider_QualityNeededTest.cs" />
<Compile Include="ProviderTests\JobProviderTest.cs" />
<Compile Include="ProviderTests\JobProviderTests\JobProviderTest.cs" />
<Compile Include="QualityTest.cs" />
<Compile Include="ProviderTests\RootDirProviderTest.cs" />
<Compile Include="ProviderTests\IndexerProviderTest.cs" />

View File

@ -1,5 +1,6 @@
// ReSharper disable RedundantUsingDirective
using System.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
@ -7,12 +8,11 @@ using AutoMoq;
using FluentAssertions;
using NUnit.Framework;
using NzbDrone.Core.Model;
using NzbDrone.Core.Model.Notification;
using NzbDrone.Core.Providers.Jobs;
using NzbDrone.Core.Test.Framework;
using NzbDrone.Test.Common;
namespace NzbDrone.Core.Test.ProviderTests
namespace NzbDrone.Core.Test.ProviderTests.JobProviderTests
{
[TestFixture]
// ReSharper disable InconsistentNaming
@ -21,11 +21,11 @@ namespace NzbDrone.Core.Test.ProviderTests
[Test]
public void Run_Jobs_Updates_Last_Execution()
{
IList<IJob> fakeJobs = new List<IJob> { new FakeJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
//Act
var timerProvider = mocker.Resolve<JobProvider>();
@ -42,11 +42,11 @@ namespace NzbDrone.Core.Test.ProviderTests
public void Run_Jobs_Updates_Last_Execution_Mark_as_unsuccesful()
{
IList<IJob> fakeJobs = new List<IJob> { new BrokenJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new BrokenJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
//Act
var timerProvider = mocker.Resolve<JobProvider>();
@ -65,12 +65,12 @@ namespace NzbDrone.Core.Test.ProviderTests
[Test]
public void scheduler_skips_jobs_that_arent_mature_yet()
{
var fakeJob = new FakeJob();
var BaseFakeJob = new FakeJob();
var mocker = new AutoMoqer();
IList<IJob> fakeJobs = new List<IJob> { fakeJob };
IList<IJob> BaseFakeJobs = new List<IJob> { BaseFakeJob };
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
@ -79,7 +79,7 @@ namespace NzbDrone.Core.Test.ProviderTests
timerProvider.QueueScheduled();
Thread.Sleep(500);
fakeJob.ExecutionCount.Should().Be(1);
BaseFakeJob.ExecutionCount.Should().Be(1);
}
[Test]
@ -87,12 +87,12 @@ namespace NzbDrone.Core.Test.ProviderTests
//after execution so the job can successfully run.
public void can_run_async_job_again()
{
var fakeJob = new FakeJob();
var BaseFakeJob = new FakeJob();
var mocker = new AutoMoqer();
IList<IJob> fakeJobs = new List<IJob> { fakeJob };
IList<IJob> BaseFakeJobs = new List<IJob> { BaseFakeJob };
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
@ -101,17 +101,17 @@ namespace NzbDrone.Core.Test.ProviderTests
jobProvider.QueueJob(typeof(FakeJob));
Thread.Sleep(2000);
jobProvider.Queue.Should().BeEmpty();
fakeJob.ExecutionCount.Should().Be(2);
BaseFakeJob.ExecutionCount.Should().Be(2);
}
[Test]
public void no_concurent_jobs()
{
IList<IJob> fakeJobs = new List<IJob> { new SlowJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new SlowJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
@ -132,11 +132,11 @@ namespace NzbDrone.Core.Test.ProviderTests
public void can_run_broken_async_job_again()
{
var brokenJob = new BrokenJob();
IList<IJob> fakeJobs = new List<IJob> { brokenJob };
IList<IJob> BaseFakeJobs = new List<IJob> { brokenJob };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
@ -152,54 +152,48 @@ namespace NzbDrone.Core.Test.ProviderTests
}
[Test]
//This test will confirm that the concurrency checks are rest
//after execution so the job can successfully run.
public void can_run_two_jobs_at_the_same_time()
{
var slowJob = new SlowJob();
IList<IJob> fakeJobs = new List<IJob> { slowJob };
var mocker = new AutoMoqer();
WithRealDb();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
var fakeJob = new FakeJob();
IList<IJob> fakeJobs = new List<IJob> { fakeJob };
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
Mocker.SetConstant(fakeJobs);
var jobProvider = Mocker.Resolve<JobProvider>();
jobProvider.Initialize();
var thread1 = new Thread(() => timerProvider.QueueScheduled());
thread1.Start();
Thread.Sleep(1000);
var thread2 = new Thread(() => timerProvider.QueueScheduled());
thread2.Start();
thread1.Join();
thread2.Join();
slowJob.ExecutionCount = 2;
jobProvider.QueueScheduled();
jobProvider.QueueScheduled();
Thread.Sleep(2000);
fakeJob.ExecutionCount.Should().Be(1);
}
[Test]
//This test will confirm that the concurrency checks are rest
//after execution so the job can successfully run.
public void can_queue_jobs_at_the_same_time()
{
var slowJob = new SlowJob();
var BaseFakeJob = new FakeJob();
IList<IJob> fakeJobs = new List<IJob> { slowJob };
IList<IJob> BaseFakeJobs = new List<IJob> { slowJob, BaseFakeJob };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
var thread1 = new Thread(() => jobProvider.QueueJob(typeof(SlowJob)));
var thread2 = new Thread(() => jobProvider.QueueJob(typeof(SlowJob)));
jobProvider.QueueJob(typeof(SlowJob));
var thread1 = new Thread(() => jobProvider.QueueJob(typeof(FakeJob)));
var thread2 = new Thread(() => jobProvider.QueueJob(typeof(FakeJob)));
thread1.Start();
thread2.Start();
@ -209,20 +203,19 @@ namespace NzbDrone.Core.Test.ProviderTests
Thread.Sleep(5000);
Assert.AreEqual(1, slowJob.ExecutionCount);
BaseFakeJob.ExecutionCount.Should().Be(1);
jobProvider.Queue.Should().BeEmpty();
}
[Test]
public void Init_Jobs()
{
var fakeTimer = new FakeJob();
IList<IJob> fakeJobs = new List<IJob> { fakeTimer };
IList<IJob> BaseFakeJobs = new List<IJob> { fakeTimer };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
@ -248,11 +241,11 @@ namespace NzbDrone.Core.Test.ProviderTests
for (int i = 0; i < 2; i++)
{
var fakeTimer = new FakeJob();
IList<IJob> fakeJobs = new List<IJob> { fakeTimer };
IList<IJob> BaseFakeJobs = new List<IJob> { fakeTimer };
var mocker = new AutoMoqer();
mocker.SetConstant(repo);
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
@ -279,11 +272,11 @@ namespace NzbDrone.Core.Test.ProviderTests
for (int i = 0; i < 2; i++)
{
var disabledJob = new DisabledJob();
IList<IJob> fakeJobs = new List<IJob> { disabledJob };
IList<IJob> BaseFakeJobs = new List<IJob> { disabledJob };
var mocker = new AutoMoqer();
mocker.SetConstant(repo);
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
@ -305,11 +298,11 @@ namespace NzbDrone.Core.Test.ProviderTests
[Test]
public void Get_Next_Execution_Time()
{
IList<IJob> fakeJobs = new List<IJob> { new FakeJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
//Act
var timerProvider = mocker.Resolve<JobProvider>();
@ -330,11 +323,11 @@ namespace NzbDrone.Core.Test.ProviderTests
var disabledJob = new DisabledJob();
IList<IJob> fakeJobs = new List<IJob> { disabledJob };
IList<IJob> BaseFakeJobs = new List<IJob> { disabledJob };
var mocker = new AutoMoqer();
mocker.SetConstant(repo);
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var timerProvider = mocker.Resolve<JobProvider>();
timerProvider.Initialize();
@ -351,11 +344,11 @@ namespace NzbDrone.Core.Test.ProviderTests
[Test]
public void SingleId_do_not_update_last_execution()
{
IList<IJob> fakeJobs = new List<IJob> { new FakeJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
//Act
var jobProvider = mocker.Resolve<JobProvider>();
@ -373,11 +366,11 @@ namespace NzbDrone.Core.Test.ProviderTests
[Test]
public void SingleId_do_not_set_success()
{
IList<IJob> fakeJobs = new List<IJob> { new FakeJob() };
IList<IJob> BaseFakeJobs = new List<IJob> { new FakeJob() };
var mocker = new AutoMoqer();
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
//Act
var jobProvider = mocker.Resolve<JobProvider>();
@ -397,16 +390,16 @@ namespace NzbDrone.Core.Test.ProviderTests
{
var mocker = new AutoMoqer();
var fakeJob = new FakeJob();
IList<IJob> fakeJobs = new List<IJob> { fakeJob };
var BaseFakeJob = new FakeJob();
IList<IJob> BaseFakeJobs = new List<IJob> { BaseFakeJob };
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var fakeQueueItem = new JobQueueItem
{
JobType = fakeJob.GetType(),
JobType = BaseFakeJob.GetType(),
TargetId = 12,
SecondaryTargetId = 0
};
@ -415,11 +408,11 @@ namespace NzbDrone.Core.Test.ProviderTests
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
jobProvider.Queue.Add(fakeQueueItem);
jobProvider.QueueJob(fakeJob.GetType(), 12);
jobProvider.QueueJob(BaseFakeJob.GetType(), 12);
Thread.Sleep(1000);
//Assert
fakeJob.ExecutionCount.Should().Be(1);
BaseFakeJob.ExecutionCount.Should().Be(1);
}
@ -430,16 +423,16 @@ namespace NzbDrone.Core.Test.ProviderTests
var slowJob = new SlowJob();
var disabledJob = new DisabledJob();
IList<IJob> fakeJobs = new List<IJob> { slowJob, disabledJob };
IList<IJob> BaseFakeJobs = new List<IJob> { slowJob, disabledJob };
mocker.SetConstant(MockLib.GetEmptyDatabase());
mocker.SetConstant(fakeJobs);
mocker.SetConstant(BaseFakeJobs);
var jobProvider = mocker.Resolve<JobProvider>();
jobProvider.Initialize();
var _jobThread = new Thread(() => jobProvider.QueueScheduled());
var _jobThread = new Thread(jobProvider.QueueScheduled);
_jobThread.Start();
Thread.Sleep(200);
@ -453,89 +446,25 @@ namespace NzbDrone.Core.Test.ProviderTests
slowJob.ExecutionCount.Should().Be(1);
disabledJob.ExecutionCount.Should().Be(1);
}
}
public class FakeJob : IJob
{
public string Name
[Test]
public void trygin_to_queue_unregistered_job_should_fail()
{
get { return "FakeJob"; }
}
WithRealDb();
public int DefaultInterval
{
get { return 15; }
}
IList<IJob> BaseFakeJobs = new List<IJob> { new SlowJob(), new DisabledJob() };
public int ExecutionCount { get; set; }
Mocker.SetConstant(BaseFakeJobs);
public void Start(ProgressNotification notification, int targetId, int secondaryTargetId)
{
ExecutionCount++;
}
}
var jobProvider = Mocker.Resolve<JobProvider>();
public class DisabledJob : IJob
{
public string Name
{
get { return "DisabledJob"; }
}
jobProvider.Initialize();
jobProvider.QueueJob(typeof(string));
public int DefaultInterval
{
get { return 0; }
}
public int ExecutionCount { get; set; }
public void Start(ProgressNotification notification, int targetId, int secondaryTargetId)
{
ExecutionCount++;
}
}
public class BrokenJob : IJob
{
public string Name
{
get { return "FakeJob"; }
}
public int DefaultInterval
{
get { return 15; }
}
public int ExecutionCount { get; set; }
public void Start(ProgressNotification notification, int targetId, int secondaryTargetId)
{
ExecutionCount++;
throw new ApplicationException("Broken job is broken");
}
}
public class SlowJob : IJob
{
public string Name
{
get { return "FakeJob"; }
}
public int DefaultInterval
{
get { return 15; }
}
public int ExecutionCount { get; set; }
public void Start(ProgressNotification notification, int targetId, int secondaryTargetId)
{
Console.WriteLine("Starting Job");
Thread.Sleep(1000);
ExecutionCount++;
Console.WriteLine("Finishing Job");
ExceptionVerification.ExcpectedErrors(1);
}
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Linq;
using System.Threading;
using NzbDrone.Core.Model.Notification;
using NzbDrone.Core.Providers.Jobs;
namespace NzbDrone.Core.Test.ProviderTests.JobProviderTests
{
public class FakeJob : IJob
{
public string Name
{
get { return GetType().Name; }
}
public virtual int DefaultInterval
{
get { return 15; }
}
public int ExecutionCount { get; private set; }
public void Start(ProgressNotification notification, int targetId, int secondaryTargetId)
{
ExecutionCount++;
Console.WriteLine("Begin " + Name);
Start();
Console.WriteLine("End " + Name);
}
protected virtual void Start()
{
}
}
public class DisabledJob : FakeJob
{
public override int DefaultInterval
{
get { return 0; }
}
}
public class BrokenJob : FakeJob
{
protected override void Start()
{
throw new ApplicationException("Broken job is broken");
}
}
public class SlowJob : FakeJob
{
protected override void Start()
{
Thread.Sleep(1000);
}
}
}

View File

@ -147,6 +147,7 @@ namespace NzbDrone.Core
private static void ShutDown()
{
Logger.Info("Shutting down application.");
WebTimer.Stop();
Process.GetCurrentProcess().Kill();
}
}

View File

@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace NzbDrone.Core.Model
{
@ -12,13 +10,13 @@ namespace NzbDrone.Core.Model
public bool Equals(JobQueueItem other)
{
if (JobType == other.JobType && TargetId == other.TargetId
&& SecondaryTargetId == other.SecondaryTargetId)
{
return true;
}
return (JobType == other.JobType && TargetId == other.TargetId
&& SecondaryTargetId == other.SecondaryTargetId);
}
return false;
public override string ToString()
{
return string.Format("[{0}({1}, {2})]", JobType.Name, TargetId, SecondaryTargetId);
}
}
}

View File

@ -11,7 +11,6 @@ using NzbDrone.Core.Model;
using NzbDrone.Core.Model.Notification;
using NzbDrone.Core.Repository;
using PetaPoco;
using ThreadState = System.Threading.ThreadState;
namespace NzbDrone.Core.Providers.Jobs
{
@ -21,15 +20,15 @@ namespace NzbDrone.Core.Providers.Jobs
/// </summary>
public class JobProvider
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static readonly Logger logger = LogManager.GetCurrentClassLogger();
private readonly IDatabase _database;
private readonly NotificationProvider _notificationProvider;
private readonly IList<IJob> _jobs;
private Thread _jobThread;
private Stopwatch _jobThreadStopWatch;
private readonly object ExecutionLock = new object();
private bool _isRunning;
private readonly object executionLock = new object();
private readonly List<JobQueueItem> _queue = new List<JobQueueItem>();
private ProgressNotification _notification;
@ -40,6 +39,7 @@ namespace NzbDrone.Core.Providers.Jobs
_database = database;
_notificationProvider = notificationProvider;
_jobs = jobs;
ResetThread();
}
/// <summary>
@ -68,243 +68,13 @@ namespace NzbDrone.Core.Providers.Jobs
return _database.Fetch<JobDefinition>().ToList();
}
/// <summary>
/// Adds/Updates definitions for a job
/// </summary>
/// <param name="definitions">Settings to be added/updated</param>
public virtual void SaveDefinition(JobDefinition definitions)
{
if (definitions.Id == 0)
{
Logger.Trace("Adding job definitions for {0}", definitions.Name);
_database.Insert(definitions);
}
else
{
Logger.Trace("Updating job definitions for {0}", definitions.Name);
_database.Update(definitions);
}
}
/// <summary>
/// Iterates through all registered jobs and queues any that are due for an execution.
/// </summary>
/// <remarks>Will ignore request if queue is already running.</remarks>
public virtual void QueueScheduled()
{
lock (ExecutionLock)
{
if (_isRunning)
{
Logger.Trace("Queue is already running. Ignoring scheduler's request.");
return;
}
}
var counter = 0;
var pendingJobs = All().Where(
t => t.Enable &&
(DateTime.Now - t.LastExecution) > TimeSpan.FromMinutes(t.Interval)
).Select(c => _jobs.Where(t => t.GetType().ToString() == c.TypeName).Single());
foreach (var job in pendingJobs)
{
QueueJob(job.GetType());
counter++;
}
Logger.Trace("{0} Scheduled tasks have been added to the queue", counter);
}
/// <summary>
/// Queues the execution of a job asynchronously
/// </summary>
/// <param name="jobType">Type of the job that should be queued.</param>
/// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the job implementation
/// to allow it to filter it's target of execution.</param>
/// /// <param name="secondaryTargetId">The secondaryTargetId could be any Id parameter eg. SeasonNumber. it will be passed to
/// the timer implementation to further allow it to filter it's target of execution</param>
/// <remarks>Job is only added to the queue if same job with the same targetId doesn't already exist in the queue.</remarks>
public virtual void QueueJob(Type jobType, int targetId = 0, int secondaryTargetId = 0)
{
Logger.Debug("Adding [{0}:{1}] to the queue", jobType.Name, targetId);
lock (ExecutionLock)
{
lock (Queue)
{
var queueItem = new JobQueueItem
{
JobType = jobType,
TargetId = targetId,
SecondaryTargetId = secondaryTargetId
};
if (!Queue.Contains(queueItem))
{
Queue.Add(queueItem);
Logger.Trace("Job [{0}:{1}] added to the queue", jobType.Name, targetId);
}
else
{
Logger.Info("[{0}:{1}] already exists in the queue. Skipping.", jobType.Name, targetId);
}
}
if (_isRunning)
{
Logger.Trace("Queue is already running. No need to start it up.");
return;
}
_isRunning = true;
}
if (_jobThread == null || _jobThread.ThreadState != ThreadState.Running)
{
Logger.Trace("Initializing queue processor thread");
ThreadStart starter = () =>
{
try
{
ProcessQueue();
}
catch (Exception e)
{
Logger.ErrorException("Error has occurred in queue processor thread", e);
}
finally
{
_isRunning = false;
_jobThread.Abort();
}
};
_jobThread = new Thread(starter) { Name = "JobQueueThread" };
_jobThread.Start();
}
else
{
var messge = "Job Thread is null";
if (_jobThread != null)
{
messge = "Job Thread State: " + _jobThread.ThreadState;
}
Logger.Error("Execution lock has fucked up. {0}. Ignoring request.", messge);
}
}
/// <summary>
/// Starts processing of queue synchronously.
/// </summary>
private void ProcessQueue()
{
do
{
using (NestedDiagnosticsContext.Push(Guid.NewGuid().ToString()))
{
try
{
JobQueueItem job = null;
lock (Queue)
{
if (Queue.Count != 0)
{
job = Queue.First();
Queue.Remove(job);
}
}
if (job != null)
{
Execute(job.JobType, job.TargetId, job.SecondaryTargetId);
}
}
catch (Exception e)
{
Logger.FatalException("An error has occurred while processing queued job.", e);
}
}
} while (Queue.Count != 0);
Logger.Trace("Finished processing jobs in the queue.");
return;
}
/// <summary>
/// Executes the job synchronously
/// </summary>
/// <param name="jobType">Type of the job that should be executed</param>
/// <param name="targetId">The targetId could be any Id parameter eg. SeriesId. it will be passed to the timer implementation
/// to allow it to filter it's target of execution</param>
/// /// <param name="secondaryTargetId">The secondaryTargetId could be any Id parameter eg. SeasonNumber. it will be passed to
/// the timer implementation to further allow it to filter it's target of execution</param>
private void Execute(Type jobType, int targetId = 0, int secondaryTargetId = 0)
{
var jobImplementation = _jobs.Where(t => t.GetType() == jobType).Single();
if (jobImplementation == null)
{
Logger.Error("Unable to locate implementation for '{0}'. Make sure it is properly registered.", jobType);
return;
}
var settings = All().Where(j => j.TypeName == jobType.ToString()).Single();
using (_notification = new ProgressNotification(jobImplementation.Name))
{
try
{
Logger.Debug("Starting '{0}' job. Last execution {1}", settings.Name, settings.LastExecution);
var sw = Stopwatch.StartNew();
_notificationProvider.Register(_notification);
jobImplementation.Start(_notification, targetId, secondaryTargetId);
_notification.Status = ProgressNotificationStatus.Completed;
settings.LastExecution = DateTime.Now;
settings.Success = true;
sw.Stop();
Logger.Debug("Job '{0}' successfully completed in {1:0}.{2} seconds.", jobImplementation.Name, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100,
sw.Elapsed.Seconds);
}
catch (Exception e)
{
Logger.ErrorException("An error has occurred while executing job " + jobImplementation.Name, e);
_notification.Status = ProgressNotificationStatus.Failed;
_notification.CurrentMessage = jobImplementation.Name + " Failed.";
settings.LastExecution = DateTime.Now;
settings.Success = false;
}
}
//Only update last execution status if was triggered by the scheduler
if (targetId == 0)
{
SaveDefinition(settings);
}
}
/// <summary>
/// Initializes jobs in the database using the IJob instances that are
/// registered using ninject
/// </summary>
public virtual void Initialize()
{
Logger.Debug("Initializing jobs. Count {0}", _jobs.Count());
logger.Debug("Initializing jobs. Count {0}", _jobs.Count());
var currentTimer = All();
foreach (var timer in _jobs)
@ -326,6 +96,47 @@ namespace NzbDrone.Core.Providers.Jobs
}
}
/// <summary>
/// Adds/Updates definitions for a job
/// </summary>
/// <param name="definitions">Settings to be added/updated</param>
public virtual void SaveDefinition(JobDefinition definitions)
{
if (definitions.Id == 0)
{
logger.Trace("Adding job definitions for {0}", definitions.Name);
_database.Insert(definitions);
}
else
{
logger.Trace("Updating job definitions for {0}", definitions.Name);
_database.Update(definitions);
}
}
public virtual void QueueScheduled()
{
lock (executionLock)
{
VerifyThreadTime();
if (_jobThread.IsAlive)
{
logger.Trace("Queue is already running. Ignoring scheduler's request.");
return;
}
}
var pendingJobTypes = All().Where(
t => t.Enable &&
(DateTime.Now - t.LastExecution) > TimeSpan.FromMinutes(t.Interval)
).Select(c => _jobs.Where(t => t.GetType().ToString() == c.TypeName).Single().GetType()).ToList();
pendingJobTypes.ForEach(jobType => QueueJob(jobType));
logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobTypes.Count);
}
/// <summary>
/// Gets the next scheduled run time for a specific job
/// (Estimated due to schedule timer)
@ -336,5 +147,179 @@ namespace NzbDrone.Core.Providers.Jobs
var job = All().Where(t => t.TypeName == jobType.ToString()).Single();
return job.LastExecution.AddMinutes(job.Interval);
}
public virtual void QueueJob(Type jobType, int targetId = 0, int secondaryTargetId = 0)
{
var queueItem = new JobQueueItem
{
JobType = jobType,
TargetId = targetId,
SecondaryTargetId = secondaryTargetId
};
logger.Debug("Attempting to queue {0}", queueItem);
lock (executionLock)
{
VerifyThreadTime();
lock (Queue)
{
if (!Queue.Contains(queueItem))
{
Queue.Add(queueItem);
logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, Queue.Count);
}
else
{
logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, Queue.Count);
}
}
if (_jobThread.IsAlive)
{
logger.Trace("Queue is already running. No need to start it up.");
return;
}
ResetThread();
_jobThreadStopWatch = Stopwatch.StartNew();
_jobThread.Start();
}
}
private void ProcessQueue()
{
try
{
do
{
using (NestedDiagnosticsContext.Push(Guid.NewGuid().ToString()))
{
try
{
JobQueueItem job = null;
lock (Queue)
{
if (Queue.Count != 0)
{
job = Queue.First();
Queue.Remove(job);
logger.Debug("Popping {0} from the queue.", job);
}
}
if (job != null)
{
Execute(job);
}
}
catch (ThreadAbortException)
{
throw;
}
catch (Exception e)
{
logger.FatalException("An error has occurred while executing job.", e);
}
}
} while (Queue.Count != 0);
logger.Trace("Finished processing jobs in the queue.");
return;
}
catch (ThreadAbortException e)
{
logger.Warn(e.Message);
}
catch (Exception e)
{
logger.ErrorException("Error has occurred in queue processor thread", e);
}
finally
{
ResetThread();
}
}
private void Execute(JobQueueItem queueItem)
{
var jobImplementation = _jobs.Where(t => t.GetType() == queueItem.JobType).SingleOrDefault();
if (jobImplementation == null)
{
logger.Error("Unable to locate implementation for '{0}'. Make sure it is properly registered.", queueItem.JobType);
return;
}
var settings = All().Where(j => j.TypeName == queueItem.JobType.ToString()).Single();
using (_notification = new ProgressNotification(jobImplementation.Name))
{
try
{
logger.Debug("Starting {0}. Last execution {1}", queueItem, settings.LastExecution);
var sw = Stopwatch.StartNew();
_notificationProvider.Register(_notification);
jobImplementation.Start(_notification, queueItem.TargetId, queueItem.SecondaryTargetId);
_notification.Status = ProgressNotificationStatus.Completed;
settings.LastExecution = DateTime.Now;
settings.Success = true;
sw.Stop();
logger.Debug("Job '{0}' successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100,
sw.Elapsed.Seconds);
}
catch (ThreadAbortException)
{
throw;
}
catch (Exception e)
{
logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e);
_notification.Status = ProgressNotificationStatus.Failed;
_notification.CurrentMessage = jobImplementation.Name + " Failed.";
settings.LastExecution = DateTime.Now;
settings.Success = false;
}
}
//Only update last execution status if was triggered by the scheduler
if (queueItem.TargetId == 0)
{
SaveDefinition(settings);
}
}
private void VerifyThreadTime()
{
if (_jobThreadStopWatch.Elapsed.TotalHours > 1)
{
logger.Warn("Thread job has been running for more than an hour. fuck it!");
ResetThread();
}
}
private void ResetThread()
{
if (_jobThread != null)
{
_jobThread.Abort();
}
logger.Trace("resetting queue processor thread");
_jobThread = new Thread(ProcessQueue) { Name = "JobQueueThread" };
_jobThreadStopWatch = new Stopwatch();
}
}
}

View File

@ -6,13 +6,17 @@ using NzbDrone.Core.Providers.Jobs;
namespace NzbDrone.Core
{
class WebTimer
public class WebTimer
{
private readonly JobProvider _jobProvider;
private static CacheItemRemovedCallback _onCacheRemove;
private static bool _stop;
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
public WebTimer(JobProvider jobProvider)
{
_jobProvider = jobProvider;
@ -31,8 +35,17 @@ namespace NzbDrone.Core
public void DoWork(string k, object v, CacheItemRemovedReason r)
{
_jobProvider.QueueScheduled();
StartTimer(Convert.ToInt32(v));
if (!_stop)
{
_jobProvider.QueueScheduled();
StartTimer(Convert.ToInt32(v));
}
}
public static void Stop()
{
Logger.Info("Stopping Web Timer");
_stop = true;
}
}
}

View File

@ -41,9 +41,10 @@ namespace NzbDrone.Test.Common
string exception = "";
if (log.Exception != null)
{
exception = log.Exception.Message;
exception = "[" + log.Exception.Message + "]";
}
errors += Environment.NewLine + String.Format("[{0}] {1}: {2} [{3}]", log.Level, log.LoggerName, log.FormattedMessage, exception);
errors += Environment.NewLine + String.Format("[{0}] {1}: {2} {3}", log.Level, log.LoggerName, log.FormattedMessage, exception);
}
return errors;
}
@ -87,8 +88,6 @@ namespace NzbDrone.Test.Common
private static void Excpected(LogLevel level, int count)
{
var levelLogs = _logs.Where(l => l.Level == level).ToList();
if (levelLogs.Count != count)
@ -97,9 +96,9 @@ namespace NzbDrone.Test.Common
var message = String.Format("{0} {1}(s) were expected but {2} were logged.\n\r{3}",
count, level, levelLogs.Count, GetLogsString(levelLogs));
message = "********************************************************************************************************************************\n\r"
message = "\n\r****************************************************************************************\n\r"
+ message +
"\n\r********************************************************************************************************************************";
"\n\r****************************************************************************************";
Assert.Fail(message);
}