Radarr/NzbDrone.Core/Jobs/JobProvider.cs

327 lines
11 KiB
C#
Raw Normal View History

using System;
using System.Collections.Generic;
2011-08-03 16:29:03 +00:00
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NLog;
using NzbDrone.Core.Model;
using NzbDrone.Core.Model.Notification;
2011-12-02 01:33:17 +00:00
using NzbDrone.Core.Providers;
using NzbDrone.Core.Repository;
2011-06-17 19:50:49 +00:00
using PetaPoco;
2011-12-02 01:33:17 +00:00
namespace NzbDrone.Core.Jobs
{
2011-08-03 16:29:03 +00:00
/// <summary>
/// Provides a background task runner, tasks could be queue either by the scheduler using QueueScheduled()
/// or by explicitly calling QueueJob(type,int)
/// </summary>
public class JobProvider
{
private static readonly Logger logger = LogManager.GetCurrentClassLogger();
2011-06-17 19:50:49 +00:00
private readonly IDatabase _database;
private readonly NotificationProvider _notificationProvider;
2013-01-03 01:09:13 +00:00
private readonly IEnumerable<IJob> _jobs;
private Thread _jobThread;
public Stopwatch StopWatch { get; private set; }
2011-07-11 00:03:01 +00:00
private readonly object _executionLock = new object();
private readonly List<JobQueueItem> _queue = new List<JobQueueItem>();
private ProgressNotification _notification;
2013-01-14 16:36:16 +00:00
public JobProvider(IDatabase database, NotificationProvider notificationProvider, IEnumerable<IJob> jobs)
{
StopWatch = new Stopwatch();
2011-06-17 19:50:49 +00:00
_database = database;
_notificationProvider = notificationProvider;
_jobs = jobs;
ResetThread();
Initialize();
}
2011-08-03 16:29:03 +00:00
/// <summary>
/// Initializes a new instance of the <see cref="JobProvider"/> class. by AutoMoq
/// </summary>
/// <remarks>Should only be used by AutoMoq</remarks>
[EditorBrowsable(EditorBrowsableState.Never)]
public JobProvider() { }
2011-08-03 16:29:03 +00:00
/// <summary>
/// Gets the active queue.
/// </summary>
public List<JobQueueItem> Queue
2011-08-03 16:29:03 +00:00
{
get
{
return _queue;
}
}
2011-07-08 03:27:11 +00:00
public virtual List<JobDefinition> All()
{
2011-07-08 03:27:11 +00:00
return _database.Fetch<JobDefinition>().ToList();
}
private void Initialize()
{
var currentJobs = All();
logger.Debug("Initializing jobs. Available: {0} Existing:{1}", _jobs.Count(), currentJobs.Count);
foreach (var currentJob in currentJobs)
{
if (!_jobs.Any(c => c.GetType().ToString() == currentJob.TypeName))
{
logger.Debug("Removing job from database '{0}'", currentJob.Name);
_database.Delete(currentJob);
}
}
foreach (var job in _jobs)
{
var jobDefinition = currentJobs.SingleOrDefault(c => c.TypeName == job.GetType().ToString());
if (jobDefinition == null)
{
jobDefinition = new JobDefinition();
jobDefinition.TypeName = job.GetType().ToString();
jobDefinition.LastExecution = DateTime.Now;
}
jobDefinition.Enable = job.DefaultInterval.TotalSeconds > 0;
jobDefinition.Name = job.Name;
jobDefinition.Interval = Convert.ToInt32(job.DefaultInterval.TotalMinutes);
SaveDefinition(jobDefinition);
}
}
/// <summary>
2011-08-03 16:29:03 +00:00
/// Adds/Updates definitions for a job
/// </summary>
2011-08-03 16:29:03 +00:00
/// <param name="definitions">Settings to be added/updated</param>
public virtual void SaveDefinition(JobDefinition definitions)
{
2011-07-08 03:27:11 +00:00
if (definitions.Id == 0)
{
logger.Trace("Adding job definitions for {0}", definitions.Name);
2011-07-08 03:27:11 +00:00
_database.Insert(definitions);
}
else
{
logger.Trace("Updating job definitions for {0}", definitions.Name);
2011-07-08 03:27:11 +00:00
_database.Update(definitions);
}
}
2011-08-03 16:29:03 +00:00
public virtual void QueueScheduled()
{
lock (_executionLock)
{
VerifyThreadTime();
if (_jobThread.IsAlive)
{
logger.Trace("Queue is already running. Ignoring scheduler's request.");
return;
}
}
var pendingJobTypes = All().Where(
t => t.Enable &&
(DateTime.Now - t.LastExecution) > TimeSpan.FromMinutes(t.Interval)
2013-01-14 16:36:16 +00:00
).Select(c => _jobs.Single(t => t.GetType().ToString() == c.TypeName).GetType()).ToList();
pendingJobTypes.ForEach(jobType => QueueJob(jobType, source: JobQueueItem.JobSourceType.Scheduler));
logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobTypes.Count);
}
2012-09-10 19:04:17 +00:00
public virtual void QueueJob(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User)
{
var queueItem = new JobQueueItem
{
JobType = jobType,
2012-09-10 19:04:17 +00:00
Options = options,
Source = source
};
logger.Debug("Attempting to queue {0}", queueItem);
lock (_executionLock)
{
VerifyThreadTime();
lock (Queue)
{
if (!Queue.Contains(queueItem))
{
Queue.Add(queueItem);
logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, Queue.Count);
}
else
{
logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, Queue.Count);
}
}
if (_jobThread.IsAlive)
{
logger.Trace("Queue is already running. No need to start it up.");
return;
}
ResetThread();
StopWatch = Stopwatch.StartNew();
_jobThread.Start();
}
}
public virtual bool QueueJob(string jobTypeString)
{
var type = Type.GetType(jobTypeString);
if (type == null)
return false;
QueueJob(type);
return true;
}
public virtual JobDefinition GetDefinition(Type type)
{
return _database.Single<JobDefinition>("WHERE TypeName = @0", type.ToString());
}
private void ProcessQueue()
{
try
{
do
{
using (NestedDiagnosticsContext.Push(Guid.NewGuid().ToString()))
{
try
2011-07-11 00:03:01 +00:00
{
JobQueueItem job = null;
lock (Queue)
2011-07-27 22:59:48 +00:00
{
if (Queue.Count != 0)
{
job = Queue.OrderBy(c => c.Source).First();
2011-11-24 07:34:59 +00:00
logger.Trace("Popping {0} from the queue.", job);
Queue.Remove(job);
}
2011-07-27 22:59:48 +00:00
}
if (job != null)
{
Execute(job);
}
}
catch (ThreadAbortException)
2011-07-27 22:59:48 +00:00
{
throw;
}
catch (Exception e)
{
logger.FatalException("An error has occurred while executing job.", e);
2011-07-11 00:03:01 +00:00
}
}
} while (Queue.Count != 0);
}
catch (ThreadAbortException e)
{
logger.Warn(e.Message);
}
catch (Exception e)
{
logger.ErrorException("Error has occurred in queue processor thread", e);
}
finally
{
StopWatch.Stop();
logger.Trace("Finished processing jobs in the queue.");
}
}
private void Execute(JobQueueItem queueItem)
{
2013-01-14 16:36:16 +00:00
var jobImplementation = _jobs.SingleOrDefault(t => t.GetType() == queueItem.JobType);
2011-05-17 07:24:29 +00:00
if (jobImplementation == null)
{
logger.Error("Unable to locate implementation for '{0}'. Make sure it is properly registered.", queueItem.JobType);
return;
}
var settings = All().Where(j => j.TypeName == queueItem.JobType.ToString()).Single();
2011-05-17 07:24:29 +00:00
using (_notification = new ProgressNotification(jobImplementation.Name))
{
2011-04-22 06:23:29 +00:00
try
{
logger.Debug("Starting {0}. Last execution {1}", queueItem, settings.LastExecution);
2011-04-22 06:23:29 +00:00
var sw = Stopwatch.StartNew();
_notificationProvider.Register(_notification);
2012-09-10 19:04:17 +00:00
jobImplementation.Start(_notification, queueItem.Options);
_notification.Status = ProgressNotificationStatus.Completed;
settings.LastExecution = DateTime.Now;
settings.Success = true;
2011-04-22 06:23:29 +00:00
sw.Stop();
2011-11-07 06:38:07 +00:00
logger.Debug("Job {0} successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100,
sw.Elapsed.Seconds);
}
catch (ThreadAbortException)
{
throw;
2011-04-22 06:23:29 +00:00
}
catch (Exception e)
{
logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e);
2011-04-22 06:23:29 +00:00
_notification.Status = ProgressNotificationStatus.Failed;
_notification.CurrentMessage = jobImplementation.Name + " Failed.";
settings.LastExecution = DateTime.Now;
settings.Success = false;
}
}
2011-08-03 16:29:03 +00:00
//Only update last execution status if was triggered by the scheduler
2012-09-10 19:04:17 +00:00
if (queueItem.Options == null)
{
2011-08-03 16:29:03 +00:00
SaveDefinition(settings);
}
}
private void VerifyThreadTime()
{
if (StopWatch.Elapsed.TotalHours > 1)
{
logger.Warn("Thread job has been running for more than an hour. fuck it!");
ResetThread();
}
}
private void ResetThread()
{
if (_jobThread != null)
{
_jobThread.Abort();
}
logger.Trace("resetting queue processor thread");
_jobThread = new Thread(ProcessQueue) { Name = "JobQueueThread" };
}
}
}