2013-09-14 06:36:07 +00:00
using System ;
using System.Threading.Tasks ;
using NLog ;
using NzbDrone.Common ;
using NzbDrone.Common.EnsureThat ;
using NzbDrone.Common.Messaging ;
using NzbDrone.Common.TPL ;
namespace NzbDrone.Core.Messaging.Events
{
public class EventAggregator : IEventAggregator
{
private readonly Logger _logger ;
private readonly IServiceFactory _serviceFactory ;
private readonly TaskFactory _taskFactory ;
public EventAggregator ( Logger logger , IServiceFactory serviceFactory )
{
_logger = logger ;
_serviceFactory = serviceFactory ;
2013-09-17 04:52:33 +00:00
_taskFactory = new TaskFactory ( ) ;
2013-09-14 06:36:07 +00:00
}
public void PublishEvent < TEvent > ( TEvent @event ) where TEvent : class , IEvent
{
2013-11-30 23:53:07 +00:00
Ensure . That ( @event , ( ) = > @event ) . IsNotNull ( ) ;
2013-09-14 06:36:07 +00:00
var eventName = GetEventName ( @event . GetType ( ) ) ;
2013-11-30 23:53:07 +00:00
/ *
int workerThreads ;
int completionPortThreads ;
ThreadPool . GetAvailableThreads ( out workerThreads , out completionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
int maxCompletionPortThreads ;
int maxWorkerThreads ;
ThreadPool . GetMaxThreads ( out maxWorkerThreads , out maxCompletionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
int minCompletionPortThreads ;
int minWorkerThreads ;
ThreadPool . GetMinThreads ( out minWorkerThreads , out minCompletionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
_logger . Warn ( "Thread pool state WT:{0} PT:{1} MAXWT:{2} MAXPT:{3} MINWT:{4} MINPT:{5}" , workerThreads , completionPortThreads , maxWorkerThreads , maxCompletionPortThreads , minWorkerThreads , minCompletionPortThreads ) ;
* /
2013-09-17 04:52:33 +00:00
2013-09-14 06:36:07 +00:00
_logger . Trace ( "Publishing {0}" , eventName ) ;
2013-09-17 04:52:33 +00:00
2013-09-14 06:36:07 +00:00
//call synchronous handlers first.
foreach ( var handler in _serviceFactory . BuildAll < IHandle < TEvent > > ( ) )
{
try
{
_logger . Trace ( "{0} -> {1}" , eventName , handler . GetType ( ) . Name ) ;
handler . Handle ( @event ) ;
_logger . Trace ( "{0} <- {1}" , eventName , handler . GetType ( ) . Name ) ;
}
catch ( Exception e )
{
2016-02-11 21:13:42 +00:00
_logger . Error ( e , string . Format ( "{0} failed while processing [{1}]" , handler . GetType ( ) . Name , eventName ) ) ;
2013-09-14 06:36:07 +00:00
}
}
foreach ( var handler in _serviceFactory . BuildAll < IHandleAsync < TEvent > > ( ) )
{
var handlerLocal = handler ;
_taskFactory . StartNew ( ( ) = >
{
_logger . Trace ( "{0} ~> {1}" , eventName , handlerLocal . GetType ( ) . Name ) ;
handlerLocal . HandleAsync ( @event ) ;
_logger . Trace ( "{0} <~ {1}" , eventName , handlerLocal . GetType ( ) . Name ) ;
} , TaskCreationOptions . PreferFairness )
. LogExceptions ( ) ;
}
}
private static string GetEventName ( Type eventType )
{
if ( ! eventType . IsGenericType )
{
return eventType . Name ;
}
2015-10-03 17:45:26 +00:00
return string . Format ( "{0}<{1}>" , eventType . Name . Remove ( eventType . Name . IndexOf ( '`' ) ) , eventType . GetGenericArguments ( ) [ 0 ] . Name ) ;
2013-09-14 06:36:07 +00:00
}
}
}