2013-09-14 06:36:07 +00:00
using System ;
2019-06-14 03:54:25 +00:00
using System.Collections.Generic ;
using System.Linq ;
2013-09-14 06:36:07 +00:00
using System.Threading.Tasks ;
using NLog ;
using NzbDrone.Common ;
using NzbDrone.Common.EnsureThat ;
using NzbDrone.Common.Messaging ;
using NzbDrone.Common.TPL ;
namespace NzbDrone.Core.Messaging.Events
{
public class EventAggregator : IEventAggregator
{
private readonly Logger _logger ;
private readonly IServiceFactory _serviceFactory ;
private readonly TaskFactory _taskFactory ;
2019-06-14 03:54:25 +00:00
private readonly Dictionary < string , object > _eventSubscribers ;
2019-12-22 22:08:53 +00:00
private class EventSubscribers < TEvent >
where TEvent : class , IEvent
2019-06-14 03:54:25 +00:00
{
private IServiceFactory _serviceFactory ;
public IHandle < TEvent > [ ] _syncHandlers ;
public IHandleAsync < TEvent > [ ] _asyncHandlers ;
public IHandleAsync < IEvent > [ ] _globalHandlers ;
public EventSubscribers ( IServiceFactory serviceFactory )
{
_serviceFactory = serviceFactory ;
_syncHandlers = serviceFactory . BuildAll < IHandle < TEvent > > ( )
. OrderBy ( GetEventHandleOrder )
. ToArray ( ) ;
_globalHandlers = serviceFactory . BuildAll < IHandleAsync < IEvent > > ( )
. ToArray ( ) ;
_asyncHandlers = serviceFactory . BuildAll < IHandleAsync < TEvent > > ( )
. ToArray ( ) ;
}
}
2013-09-14 06:36:07 +00:00
public EventAggregator ( Logger logger , IServiceFactory serviceFactory )
{
_logger = logger ;
_serviceFactory = serviceFactory ;
2013-09-17 04:52:33 +00:00
_taskFactory = new TaskFactory ( ) ;
2019-06-14 03:54:25 +00:00
_eventSubscribers = new Dictionary < string , object > ( ) ;
2013-09-14 06:36:07 +00:00
}
2019-12-22 22:08:53 +00:00
public void PublishEvent < TEvent > ( TEvent @event )
where TEvent : class , IEvent
2013-09-14 06:36:07 +00:00
{
2013-11-30 23:53:07 +00:00
Ensure . That ( @event , ( ) = > @event ) . IsNotNull ( ) ;
2013-09-14 06:36:07 +00:00
var eventName = GetEventName ( @event . GetType ( ) ) ;
2013-11-30 23:53:07 +00:00
/ *
int workerThreads ;
int completionPortThreads ;
ThreadPool . GetAvailableThreads ( out workerThreads , out completionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
int maxCompletionPortThreads ;
int maxWorkerThreads ;
ThreadPool . GetMaxThreads ( out maxWorkerThreads , out maxCompletionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
int minCompletionPortThreads ;
int minWorkerThreads ;
ThreadPool . GetMinThreads ( out minWorkerThreads , out minCompletionPortThreads ) ;
2013-09-17 04:52:33 +00:00
2013-11-30 23:53:07 +00:00
_logger . Warn ( "Thread pool state WT:{0} PT:{1} MAXWT:{2} MAXPT:{3} MINWT:{4} MINPT:{5}" , workerThreads , completionPortThreads , maxWorkerThreads , maxCompletionPortThreads , minWorkerThreads , minCompletionPortThreads ) ;
* /
2013-09-17 04:52:33 +00:00
2013-09-14 06:36:07 +00:00
_logger . Trace ( "Publishing {0}" , eventName ) ;
2019-06-14 03:54:25 +00:00
EventSubscribers < TEvent > subscribers ;
lock ( _eventSubscribers )
{
object target ;
if ( ! _eventSubscribers . TryGetValue ( eventName , out target ) )
{
_eventSubscribers [ eventName ] = target = new EventSubscribers < TEvent > ( _serviceFactory ) ;
}
subscribers = target as EventSubscribers < TEvent > ;
}
2013-09-17 04:52:33 +00:00
2013-09-14 06:36:07 +00:00
//call synchronous handlers first.
2019-06-14 03:54:25 +00:00
var handlers = subscribers . _syncHandlers ;
foreach ( var handler in handlers )
2013-09-14 06:36:07 +00:00
{
try
{
_logger . Trace ( "{0} -> {1}" , eventName , handler . GetType ( ) . Name ) ;
handler . Handle ( @event ) ;
_logger . Trace ( "{0} <- {1}" , eventName , handler . GetType ( ) . Name ) ;
}
catch ( Exception e )
{
2019-06-14 03:54:25 +00:00
_logger . Error ( e , "{0} failed while processing [{1}]" , handler . GetType ( ) . Name , eventName ) ;
2013-09-14 06:36:07 +00:00
}
}
2019-06-14 03:54:25 +00:00
foreach ( var handler in subscribers . _globalHandlers )
{
var handlerLocal = handler ;
_taskFactory . StartNew ( ( ) = >
{
handlerLocal . HandleAsync ( @event ) ;
} , TaskCreationOptions . PreferFairness )
. LogExceptions ( ) ;
}
foreach ( var handler in subscribers . _asyncHandlers )
2013-09-14 06:36:07 +00:00
{
var handlerLocal = handler ;
_taskFactory . StartNew ( ( ) = >
{
_logger . Trace ( "{0} ~> {1}" , eventName , handlerLocal . GetType ( ) . Name ) ;
handlerLocal . HandleAsync ( @event ) ;
_logger . Trace ( "{0} <~ {1}" , eventName , handlerLocal . GetType ( ) . Name ) ;
} , TaskCreationOptions . PreferFairness )
. LogExceptions ( ) ;
}
}
private static string GetEventName ( Type eventType )
{
if ( ! eventType . IsGenericType )
{
return eventType . Name ;
}
2015-10-03 17:45:26 +00:00
return string . Format ( "{0}<{1}>" , eventType . Name . Remove ( eventType . Name . IndexOf ( '`' ) ) , eventType . GetGenericArguments ( ) [ 0 ] . Name ) ;
2013-09-14 06:36:07 +00:00
}
2019-06-14 03:54:25 +00:00
2019-12-22 22:08:53 +00:00
internal static int GetEventHandleOrder < TEvent > ( IHandle < TEvent > eventHandler )
where TEvent : class , IEvent
2019-06-14 03:54:25 +00:00
{
// TODO: Convert "Handle" to nameof(eventHandler.Handle) after .net 4.5
2019-12-22 22:08:53 +00:00
var method = eventHandler . GetType ( ) . GetMethod ( "Handle" , new Type [ ] { typeof ( TEvent ) } ) ;
2019-06-14 03:54:25 +00:00
if ( method = = null )
{
2019-12-22 22:08:53 +00:00
return ( int ) EventHandleOrder . Any ;
2019-06-14 03:54:25 +00:00
}
var attribute = method . GetCustomAttributes ( typeof ( EventHandleOrderAttribute ) , true ) . FirstOrDefault ( ) as EventHandleOrderAttribute ;
if ( attribute = = null )
{
2019-12-22 22:08:53 +00:00
return ( int ) EventHandleOrder . Any ;
2019-06-14 03:54:25 +00:00
}
return ( int ) attribute . EventHandleOrder ;
}
2013-09-14 06:36:07 +00:00
}
}