Cleaned up JobController.

This commit is contained in:
Keivan Beigi 2013-03-04 18:34:38 -08:00
parent 8ebcf0eb8b
commit 5e77d51510
3 changed files with 59 additions and 102 deletions

View File

@ -79,7 +79,7 @@ namespace NzbDrone.Core.Test.JobTests
{ {
GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _fakeJob.GetType().FullName } }); GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _fakeJob.GetType().FullName } });
Subject.QueueScheduled(); Subject.EnqueueScheduled();
WaitForQueue(); WaitForQueue();
_updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10)); _updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10));
@ -92,7 +92,7 @@ namespace NzbDrone.Core.Test.JobTests
{ {
GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _brokenJob.GetType().FullName } }); GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _brokenJob.GetType().FullName } });
Subject.QueueScheduled(); Subject.EnqueueScheduled();
WaitForQueue(); WaitForQueue();
_updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10)); _updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10));
@ -102,11 +102,11 @@ namespace NzbDrone.Core.Test.JobTests
} }
[Test] [Test]
public void can_run_async_job_again() public void can_run_job_again()
{ {
Subject.QueueJob(typeof(FakeJob)); Subject.Enqueue(typeof(FakeJob));
WaitForQueue(); WaitForQueue();
Subject.QueueJob(typeof(FakeJob)); Subject.Enqueue(typeof(FakeJob));
WaitForQueue(); WaitForQueue();
Subject.Queue.Should().BeEmpty(); Subject.Queue.Should().BeEmpty();
@ -116,9 +116,9 @@ namespace NzbDrone.Core.Test.JobTests
[Test] [Test]
public void should_ignore_job_with_same_arg() public void should_ignore_job_with_same_arg()
{ {
Subject.QueueJob(typeof(SlowJob2), 1); Subject.Enqueue(typeof(SlowJob2), 1);
Subject.QueueJob(typeof(FakeJob), 1); Subject.Enqueue(typeof(FakeJob), 1);
Subject.QueueJob(typeof(FakeJob), 1); Subject.Enqueue(typeof(FakeJob), 1);
WaitForQueue(); WaitForQueue();
@ -131,10 +131,10 @@ namespace NzbDrone.Core.Test.JobTests
[Test] [Test]
public void can_run_broken_job_again() public void can_run_broken_job_again()
{ {
Subject.QueueJob(typeof(BrokenJob)); Subject.Enqueue(typeof(BrokenJob));
WaitForQueue(); WaitForQueue();
Subject.QueueJob(typeof(BrokenJob)); Subject.Enqueue(typeof(BrokenJob));
WaitForQueue(); WaitForQueue();
@ -145,8 +145,8 @@ namespace NzbDrone.Core.Test.JobTests
[Test] [Test]
public void schedule_hit_should_be_ignored_if_queue_is_running() public void schedule_hit_should_be_ignored_if_queue_is_running()
{ {
Subject.QueueJob(typeof(SlowJob)); Subject.Enqueue(typeof(SlowJob));
Subject.QueueScheduled(); Subject.EnqueueScheduled();
WaitForQueue(); WaitForQueue();
_slowJob.ExecutionCount.Should().Be(1); _slowJob.ExecutionCount.Should().Be(1);
@ -158,9 +158,9 @@ namespace NzbDrone.Core.Test.JobTests
public void can_queue_jobs_at_the_same_time() public void can_queue_jobs_at_the_same_time()
{ {
Subject.QueueJob(typeof(SlowJob)); Subject.Enqueue(typeof(SlowJob));
var thread1 = new Thread(() => Subject.QueueJob(typeof(FakeJob))); var thread1 = new Thread(() => Subject.Enqueue(typeof(FakeJob)));
var thread2 = new Thread(() => Subject.QueueJob(typeof(FakeJob))); var thread2 = new Thread(() => Subject.Enqueue(typeof(FakeJob)));
thread1.Start(); thread1.Start();
thread2.Start(); thread2.Start();
@ -179,7 +179,7 @@ namespace NzbDrone.Core.Test.JobTests
[Test] [Test]
public void job_with_specific_target_should_not_update_status() public void job_with_specific_target_should_not_update_status()
{ {
Subject.QueueJob(typeof(FakeJob), 10); Subject.Enqueue(typeof(FakeJob), 10);
WaitForQueue(); WaitForQueue();
@ -194,12 +194,12 @@ namespace NzbDrone.Core.Test.JobTests
{ {
GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _slowJob.GetType().FullName } }); GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _slowJob.GetType().FullName } });
var jobThread = new Thread(Subject.QueueScheduled); var jobThread = new Thread(Subject.EnqueueScheduled);
jobThread.Start(); jobThread.Start();
Thread.Sleep(200); Thread.Sleep(200);
Subject.QueueJob(typeof(DisabledJob), 12); Subject.Enqueue(typeof(DisabledJob), 12);
WaitForQueue(); WaitForQueue();
@ -210,7 +210,7 @@ namespace NzbDrone.Core.Test.JobTests
[Test] [Test]
public void trygin_to_queue_unregistered_job_should_fail() public void trygin_to_queue_unregistered_job_should_fail()
{ {
Subject.QueueJob(typeof(UpdateInfoJob)); Subject.Enqueue(typeof(UpdateInfoJob));
WaitForQueue(); WaitForQueue();
ExceptionVerification.ExpectedErrors(1); ExceptionVerification.ExpectedErrors(1);
} }
@ -219,7 +219,7 @@ namespace NzbDrone.Core.Test.JobTests
public void scheduled_job_should_have_scheduler_as_source() public void scheduled_job_should_have_scheduler_as_source()
{ {
GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _slowJob.GetType().FullName }, new JobDefinition { TypeName = _slowJob2.GetType().FullName } }); GivenPendingJob(new List<JobDefinition> { new JobDefinition { TypeName = _slowJob.GetType().FullName }, new JobDefinition { TypeName = _slowJob2.GetType().FullName } });
Subject.QueueScheduled(); Subject.EnqueueScheduled();
Subject.Queue.Should().OnlyContain(c => c.Source == JobQueueItem.JobSourceType.Scheduler); Subject.Queue.Should().OnlyContain(c => c.Source == JobQueueItem.JobSourceType.Scheduler);

View File

@ -546,6 +546,9 @@
<RegexTestSelector> <RegexTestSelector>
<RegularExpression>NzbDrone\.Core\.Test\.TvTests\.QualityModelFixture\..*</RegularExpression> <RegularExpression>NzbDrone\.Core\.Test\.TvTests\.QualityModelFixture\..*</RegularExpression>
</RegexTestSelector> </RegexTestSelector>
<RegexTestSelector>
<RegularExpression>NzbDrone\.Core\.Test\.JobTests\.JobControllerFixture\..*</RegularExpression>
</RegexTestSelector>
</IgnoredTests> </IgnoredTests>
<HiddenWarnings>PostBuildEventDisabled;PreBuildEventDisabled</HiddenWarnings> <HiddenWarnings>PostBuildEventDisabled;PreBuildEventDisabled</HiddenWarnings>
</ProjectConfiguration> </ProjectConfiguration>

View File

@ -4,6 +4,7 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks;
using NLog; using NLog;
using NzbDrone.Core.Model; using NzbDrone.Core.Model;
using NzbDrone.Core.Model.Notification; using NzbDrone.Core.Model.Notification;
@ -15,9 +16,9 @@ namespace NzbDrone.Core.Jobs
{ {
bool IsProcessing { get; } bool IsProcessing { get; }
IEnumerable<JobQueueItem> Queue { get; } IEnumerable<JobQueueItem> Queue { get; }
void QueueScheduled(); void EnqueueScheduled();
void QueueJob(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User); void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User);
bool QueueJob(string jobTypeString); bool Enqueue(string jobTypeString);
} }
public class JobController : IJobController public class JobController : IJobController
@ -27,14 +28,10 @@ namespace NzbDrone.Core.Jobs
private readonly IJobRepository _jobRepository; private readonly IJobRepository _jobRepository;
private readonly Logger _logger; private readonly Logger _logger;
private Thread _jobThread;
private readonly object _executionLock = new object();
private readonly BlockingCollection<JobQueueItem> _queue = new BlockingCollection<JobQueueItem>(); private readonly BlockingCollection<JobQueueItem> _queue = new BlockingCollection<JobQueueItem>();
private ProgressNotification _notification; private ProgressNotification _notification;
private readonly CancellationTokenSource _cancellationTokenSource;
public JobController(NotificationProvider notificationProvider, IEnumerable<IJob> jobs, IJobRepository jobRepository, Logger logger) public JobController(NotificationProvider notificationProvider, IEnumerable<IJob> jobs, IJobRepository jobRepository, Logger logger)
{ {
@ -42,12 +39,14 @@ namespace NzbDrone.Core.Jobs
_jobs = jobs; _jobs = jobs;
_jobRepository = jobRepository; _jobRepository = jobRepository;
_logger = logger; _logger = logger;
ResetThread(); _cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(ProcessQueue, _cancellationTokenSource.Token);
} }
public bool IsProcessing { get; private set; } public bool IsProcessing { get; private set; }
public IEnumerable<JobQueueItem> Queue public IEnumerable<JobQueueItem> Queue
{ {
get get
@ -56,15 +55,12 @@ namespace NzbDrone.Core.Jobs
} }
} }
public virtual void QueueScheduled() public void EnqueueScheduled()
{ {
lock (_executionLock) if (IsProcessing)
{ {
if (_jobThread.IsAlive) _logger.Trace("Queue is already running. Ignoring scheduler's request.");
{ return;
_logger.Trace("Queue is already running. Ignoring scheduler's request.");
return;
}
} }
var pendingJobs = _jobRepository.GetPendingJobs() var pendingJobs = _jobRepository.GetPendingJobs()
@ -72,11 +68,11 @@ namespace NzbDrone.Core.Jobs
.GetType()).ToList(); .GetType()).ToList();
pendingJobs.ForEach(jobType => QueueJob(jobType, source: JobQueueItem.JobSourceType.Scheduler)); pendingJobs.ForEach(jobType => Enqueue(jobType, source: JobQueueItem.JobSourceType.Scheduler));
_logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobs.Count); _logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobs.Count);
} }
public virtual void QueueJob(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User) public void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User)
{ {
IsProcessing = true; IsProcessing = true;
@ -89,84 +85,58 @@ namespace NzbDrone.Core.Jobs
_logger.Debug("Attempting to queue {0}", queueItem); _logger.Debug("Attempting to queue {0}", queueItem);
lock (_executionLock) lock (_queue)
{ {
lock (_queue) if (!_queue.Contains(queueItem))
{ {
if (!_queue.Contains(queueItem)) _queue.Add(queueItem);
{ _logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, _queue.Count);
_queue.Add(queueItem);
_logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, _queue.Count);
}
else
{
_logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, _queue.Count);
}
} }
else
if (_jobThread.IsAlive)
{ {
_logger.Trace("Queue is already running. No need to start it up."); _logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, _queue.Count);
return;
} }
ResetThread();
_jobThread.Start();
} }
} }
public virtual bool QueueJob(string jobTypeString)
public bool Enqueue(string jobTypeString)
{ {
var type = Type.GetType(jobTypeString); var type = Type.GetType(jobTypeString);
if (type == null) if (type == null)
return false; return false;
QueueJob(type); Enqueue(type);
return true; return true;
} }
private void ProcessQueue() private void ProcessQueue()
{ {
try while (true)
{ {
while (true) try
{ {
IsProcessing = false; IsProcessing = false;
var item = _queue.Take(); var item = _queue.Take();
IsProcessing = true; Execute(item);
}
try catch (ThreadAbortException e)
{ {
Execute(item); _logger.Warn(e.Message);
} }
catch (ThreadAbortException) catch (Exception e)
{ {
throw; _logger.ErrorException("Error has occurred in queue processor thread", e);
}
catch (Exception e)
{
_logger.FatalException("An error has occurred while executing job.", e);
}
} }
}
catch (ThreadAbortException e)
{
_logger.Warn(e.Message);
}
catch (Exception e)
{
_logger.ErrorException("Error has occurred in queue processor thread", e);
}
finally
{
_logger.Trace("Finished processing jobs in the queue.");
} }
} }
private void Execute(JobQueueItem queueItem) private void Execute(JobQueueItem queueItem)
{ {
IsProcessing = true;
var jobImplementation = _jobs.SingleOrDefault(t => t.GetType() == queueItem.JobType); var jobImplementation = _jobs.SingleOrDefault(t => t.GetType() == queueItem.JobType);
if (jobImplementation == null) if (jobImplementation == null)
{ {
@ -194,10 +164,6 @@ namespace NzbDrone.Core.Jobs
_logger.Debug("Job {0} successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100, _logger.Debug("Job {0} successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100,
sw.Elapsed.Seconds); sw.Elapsed.Seconds);
} }
catch (ThreadAbortException)
{
throw;
}
catch (Exception e) catch (Exception e)
{ {
_logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e); _logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e);
@ -215,17 +181,5 @@ namespace NzbDrone.Core.Jobs
_jobRepository.Update(jobDefinition); _jobRepository.Update(jobDefinition);
} }
} }
private void ResetThread()
{
if (_jobThread != null)
{
_jobThread.Abort();
}
_logger.Trace("resetting queue processor thread");
_jobThread = new Thread(ProcessQueue) { Name = "JobQueueThread" };
}
} }
} }