added commands.

they can be triggered using the api

api/command/
This commit is contained in:
Keivan Beigi 2013-04-26 19:03:34 -07:00
parent 68ee71ea81
commit 182192e0ba
31 changed files with 265 additions and 74 deletions

View File

@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Linq;
using NzbDrone.Api.Extensions;
using NzbDrone.Common.Messaging;
namespace NzbDrone.Api.Commands
{
public class CommandModule : NzbDroneRestModule<CommandResource>
{
private readonly IMessageAggregator _messageAggregator;
private readonly IEnumerable<ICommand> _commands;
public CommandModule(IMessageAggregator messageAggregator, IEnumerable<ICommand> commands)
{
_messageAggregator = messageAggregator;
_commands = commands;
CreateResource = RunCommand;
}
private CommandResource RunCommand(CommandResource resource)
{
var commandType = _commands.Single(c => c.GetType().Name.Replace("Command", "").Equals(resource.Command, StringComparison.InvariantCultureIgnoreCase))
.GetType();
var command = (object)Request.Body.FromJson<ICommand>(commandType);
var method = typeof(IMessageAggregator).GetMethod("PublishCommand");
var genericMethod = method.MakeGenericMethod(commandType);
genericMethod.Invoke(_messageAggregator, new[] { command });
return resource;
}
}
}

View File

@ -0,0 +1,9 @@
using NzbDrone.Api.REST;
namespace NzbDrone.Api.Commands
{
public class CommandResource : RestResource
{
public string Command { get; set; }
}
}

View File

@ -1,4 +1,5 @@
using System.IO;
using System;
using System.IO;
using Nancy;
using Nancy.Responses;
using NzbDrone.Common;
@ -11,11 +12,16 @@ namespace NzbDrone.Api.Extensions
private static readonly NancyJsonSerializer NancySerializer = new NancyJsonSerializer(Serializer);
public static T FromJson<T>(this Stream body) where T : class, new()
{
return FromJson<T>(body, typeof(T));
}
public static T FromJson<T>(this Stream body, Type type)
{
var reader = new StreamReader(body, true);
body.Position = 0;
var value = reader.ReadToEnd();
return Serializer.Deserialize<T>(value);
return (T)Serializer.Deserialize(value, type);
}
public static JsonResponse<TModel> AsResponse<TModel>(this TModel model, HttpStatusCode statusCode = HttpStatusCode.OK)

View File

@ -30,7 +30,7 @@ namespace NzbDrone.Api
AutomapperBootstraper.InitializeAutomapper();
RegisterReporting(container);
container.Resolve<IMessageAggregator>().Publish(new ApplicationStartedEvent());
container.Resolve<IMessageAggregator>().PublishEvent(new ApplicationStartedEvent());
ApplicationPipelines.OnError.AddItemToEndOfPipeline(container.Resolve<ErrorPipeline>().HandleException);
}
@ -73,7 +73,7 @@ namespace NzbDrone.Api
public void Shutdown()
{
ApplicationContainer.Resolve<IMessageAggregator>().Publish(new ApplicationShutdownRequested());
ApplicationContainer.Resolve<IMessageAggregator>().PublishEvent(new ApplicationShutdownRequested());
}
}
}

View File

@ -85,6 +85,8 @@
<Compile Include="AutomapperBootstraper.cs" />
<Compile Include="Calendar\CalendarModule.cs" />
<Compile Include="Calendar\CalendarResource.cs" />
<Compile Include="Commands\CommandModule.cs" />
<Compile Include="Commands\CommandResource.cs" />
<Compile Include="Directories\DirectoryModule.cs" />
<Compile Include="Episodes\EpisodeModule.cs" />
<Compile Include="Episodes\EpisodeResource.cs" />

View File

@ -1,6 +1,4 @@
using System.Linq;
using Nancy;
using Nancy.Responses;
using Nancy;
namespace NzbDrone.Api
{
@ -11,8 +9,5 @@ namespace NzbDrone.Api
{
Options["/"] = x => new Response();
}
}
}

View File

@ -0,0 +1,78 @@
using System;
using System.Collections.Generic;
using Moq;
using NUnit.Framework;
using NzbDrone.Common.Messaging;
using NzbDrone.Test.Common;
namespace NzbDrone.Common.Test.EventingTests
{
[TestFixture]
public class MessageAggregatorCommandTests : TestBase
{
[Test]
public void should_publish_command_to_executor()
{
var commandA = new CommandA();
var executor = new Mock<IExecute<CommandA>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { executor.Object });
aggregator.PublishCommand(commandA);
executor.Verify(c => c.Execute(commandA), Times.Once());
}
[Test]
public void should_throw_if_more_than_one_handler()
{
var commandA = new CommandA();
var executor1 = new Mock<IExecute<CommandA>>();
var executor2 = new Mock<IExecute<CommandA>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { executor1.Object, executor2.Object });
Assert.Throws<InvalidOperationException>(() => aggregator.PublishCommand(commandA));
}
[Test]
public void should_not_publish_to_incompatible_executor()
{
var commandA = new CommandA();
var executor1 = new Mock<IExecute<CommandA>>();
var executor2 = new Mock<IExecute<CommandB>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { executor1.Object, executor2.Object });
aggregator.PublishCommand(commandA);
executor1.Verify(c => c.Execute(commandA), Times.Once());
executor2.Verify(c => c.Execute(It.IsAny<CommandB>()), Times.Never());
}
[Test]
public void broken_executor_should_throw_the_exception()
{
var commandA = new CommandA();
var executor = new Mock<IExecute<CommandA>>();
executor.Setup(c => c.Execute(It.IsAny<CommandA>()))
.Throws(new NotImplementedException());
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { executor.Object });
Assert.Throws<NotImplementedException>(() => aggregator.PublishCommand(commandA));
}
}
public class CommandA : ICommand
{
}
public class CommandB : ICommand
{
}
}

View File

@ -1,5 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using System;
using System.Collections.Generic;
using Moq;
using NUnit.Framework;
using NzbDrone.Common.Messaging;
@ -8,17 +8,17 @@ using NzbDrone.Test.Common;
namespace NzbDrone.Common.Test.EventingTests
{
[TestFixture]
public class ServiceNameFixture : TestBase
public class MessageAggregatorEventTests : TestBase
{
[Test]
public void should_publish_event_to_handlers()
{
var eventA = new EventA();
var intHandler = new Mock<IHandle<EventA>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { intHandler.Object });
aggregator.Publish(eventA);
aggregator.PublishEvent(eventA);
intHandler.Verify(c => c.Handle(eventA), Times.Once());
}
@ -31,7 +31,7 @@ namespace NzbDrone.Common.Test.EventingTests
var intHandler1 = new Mock<IHandle<EventA>>();
var intHandler2 = new Mock<IHandle<EventA>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { intHandler1.Object, intHandler2.Object });
aggregator.Publish(eventA);
aggregator.PublishEvent(eventA);
intHandler1.Verify(c => c.Handle(eventA), Times.Once());
intHandler2.Verify(c => c.Handle(eventA), Times.Once());
@ -46,18 +46,41 @@ namespace NzbDrone.Common.Test.EventingTests
var bHandler = new Mock<IHandle<EventB>>();
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { aHandler.Object, bHandler.Object });
aggregator.Publish(eventA);
aggregator.PublishEvent(eventA);
aHandler.Verify(c => c.Handle(eventA), Times.Once());
bHandler.Verify(c => c.Handle(It.IsAny<EventB>()), Times.Never());
}
[Test]
public void broken_handler_should_not_effect_others_handler()
{
var eventA = new EventA();
var intHandler1 = new Mock<IHandle<EventA>>();
var intHandler2 = new Mock<IHandle<EventA>>();
var intHandler3 = new Mock<IHandle<EventA>>();
intHandler2.Setup(c => c.Handle(It.IsAny<EventA>()))
.Throws(new NotImplementedException());
var aggregator = new MessageAggregator(TestLogger, () => new List<IProcessMessage> { intHandler1.Object, intHandler2.Object , intHandler3.Object});
aggregator.PublishEvent(eventA);
intHandler1.Verify(c => c.Handle(eventA), Times.Once());
intHandler3.Verify(c => c.Handle(eventA), Times.Once());
ExceptionVerification.ExpectedErrors(1);
}
}
public class EventA:IEvent
public class EventA : IEvent
{
}
public class EventB : IEvent

View File

@ -79,7 +79,8 @@
</ItemGroup>
<ItemGroup>
<Compile Include="ConfigFileProviderTest.cs" />
<Compile Include="EventingTests\EventAggregatorTests.cs" />
<Compile Include="EventingTests\MessageAggregatorCommandTests.cs" />
<Compile Include="EventingTests\MessageAggregatorEventTests.cs" />
<Compile Include="ReflectionExtensions.cs" />
<Compile Include="ReportingService_ReportParseError_Fixture.cs" />
<Compile Include="PathExtentionFixture.cs" />

View File

@ -51,6 +51,7 @@ namespace NzbDrone.Common
if (implementations.Count == 1)
{
Container.Register(contractType, implementations.Single()).AsMultiInstance();
Container.RegisterMultiple(contractType, implementations).AsMultiInstance();
}
else
{

View File

@ -0,0 +1,7 @@
namespace NzbDrone.Common.Messaging
{
public interface IExecute<TCommand> : IProcessMessage<TCommand> where TCommand : ICommand
{
void Execute(TCommand message);
}
}

View File

@ -1,28 +1,12 @@
namespace NzbDrone.Common.Messaging
{
/// <summary>
/// Denotes a class which can handle a particular type of message.
/// </summary>
/// <typeparam name = "TEvent">The type of message to handle.</typeparam>
public interface IHandle<TEvent> : IProcessMessage where TEvent : IEvent
public interface IHandle<TEvent> : IProcessMessage<TEvent> where TEvent : IEvent
{
/// <summary>
/// Handles the message synchronously.
/// </summary>
/// <param name = "message">The message.</param>
void Handle(TEvent message);
}
/// <summary>
/// Denotes a class which can handle a particular type of message.
/// </summary>
/// <typeparam name = "TEvent">The type of message to handle.</typeparam>
public interface IHandleAsync<TEvent> : IProcessMessageAsync where TEvent : IEvent
public interface IHandleAsync<TEvent> : IProcessMessageAsync<TEvent> where TEvent : IEvent
{
/// <summary>
/// Handles the message asynchronously.
/// </summary>
/// <param name = "message">The message.</param>
void HandleAsync(TEvent message);
}
}

View File

@ -5,7 +5,7 @@
/// </summary>
public interface IMessageAggregator
{
void Publish<TEvent>(TEvent message) where TEvent : IEvent;
void Execute<TCommand>(TCommand message) where TCommand : ICommand;
void PublishEvent<TEvent>(TEvent @event) where TEvent : IEvent;
void PublishCommand<TCommand>(TCommand command) where TCommand : ICommand;
}
}

View File

@ -1,12 +1,11 @@
namespace NzbDrone.Common.Messaging
{
/// <summary>
/// A marker interface for classes that subscribe to messages.
/// </summary>
public interface IProcessMessage { }
/// <summary>
/// A marker interface for classes that subscribe to messages.
/// </summary>
public interface IProcessMessageAsync : IProcessMessage { }
public interface IProcessMessage<TMessage> : IProcessMessage { }
public interface IProcessMessageAsync<TMessage> : IProcessMessageAsync { }
}

View File

@ -17,33 +17,45 @@ namespace NzbDrone.Common.Messaging
_handlers = handlers;
}
public void Publish<TEvent>(TEvent message) where TEvent : IEvent
public void PublishEvent<TEvent>(TEvent @event) where TEvent : IEvent
{
_logger.Trace("Publishing {0}", message.GetType().Name);
_logger.Trace("Publishing {0}", @event.GetType().Name);
//call synchronous handlers first.
foreach (var handler in _handlers().OfType<IHandle<TEvent>>())
{
_logger.Debug("{0} -> {1}", message.GetType().Name, handler.GetType().Name);
handler.Handle(message);
_logger.Debug("{0} <- {1}", message.GetType().Name, handler.GetType().Name);
try
{
_logger.Debug("{0} -> {1}", @event.GetType().Name, handler.GetType().Name);
handler.Handle(@event);
_logger.Debug("{0} <- {1}", @event.GetType().Name, handler.GetType().Name);
}
catch (Exception e)
{
_logger.ErrorException(string.Format("{0} failed while processing [{1}]", handler.GetType().Name, @event.GetType().Name), e);
}
}
foreach (var handler in _handlers().OfType<IHandleAsync<TEvent>>())
{
var handlerLocal = handler;
Task.Factory.StartNew(() =>
{
_logger.Debug("{0} ~> {1}", message.GetType().Name, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(message);
_logger.Debug("{0} <~ {1}", message.GetType().Name, handlerLocal.GetType().Name);
});
{
_logger.Debug("{0} ~> {1}", @event.GetType().Name, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(@event);
_logger.Debug("{0} <~ {1}", @event.GetType().Name, handlerLocal.GetType().Name);
});
}
}
public void Execute<TCommand>(TCommand message) where TCommand : ICommand
public void PublishCommand<TCommand>(TCommand command) where TCommand : ICommand
{
throw new NotImplementedException();
_logger.Trace("Publishing {0}", command.GetType().Name);
var handler = _handlers().OfType<IExecute<TCommand>>().Single();
_logger.Debug("{0} -> {1}", command.GetType().Name, handler.GetType().Name);
handler.Execute(command);
_logger.Debug("{0} <- {1}", command.GetType().Name, handler.GetType().Name);
}
}
}

View File

@ -105,6 +105,7 @@
<Compile Include="EnsureThat\Resources\ExceptionMessages.Designer.cs" />
<Compile Include="EnsureThat\StringExtensions.cs" />
<Compile Include="EnsureThat\TypeParam.cs" />
<Compile Include="Messaging\IExecute.cs" />
<Compile Include="Messaging\ICommand.cs" />
<Compile Include="Messaging\IMessage.cs" />
<Compile Include="Messaging\IProcessMessage.cs" />

View File

@ -50,7 +50,7 @@ namespace NzbDrone.Core.Download
if (success)
{
_logger.Info("Report sent to download client. {0}", downloadTitle);
_messageAggregator.Publish(new EpisodeGrabbedEvent(episode));
_messageAggregator.PublishEvent(new EpisodeGrabbedEvent(episode));
}
return success;

View File

@ -0,0 +1,9 @@
using NzbDrone.Common.Messaging;
namespace NzbDrone.Core.Indexers
{
public class RssSyncCommand : ICommand
{
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.Linq;
using NLog;
using NzbDrone.Common.Messaging;
using NzbDrone.Core.DecisionEngine;
using NzbDrone.Core.Download;
@ -11,7 +12,7 @@ namespace NzbDrone.Core.Indexers
void Sync();
}
public class RssSyncService : IRssSyncService
public class RssSyncService : IRssSyncService, IExecute<RssSyncCommand>
{
private readonly IFetchAndParseRss _rssFetcherAndParser;
private readonly IMakeDownloadDecision _downloadDecisionMaker;
@ -36,7 +37,7 @@ namespace NzbDrone.Core.Indexers
//TODO: this will download multiple of same episode if they show up in RSS. need to
//proposal: maybe get download decision one by one, that way
var qualifiedReports = decisions
.Where(c => c.Approved)
.Select(c => c.Episode)
@ -60,5 +61,10 @@ namespace NzbDrone.Core.Indexers
_logger.Info("RSS Sync Completed. Reports found: {0}, Fetches attempted: {1}", reports.Count, qualifiedReports.Count());
}
public void Execute(RssSyncCommand message)
{
Sync();
}
}
}

View File

@ -90,7 +90,7 @@ namespace NzbDrone.Core.Jobs.Implementations
}
//Start AfterRename
_messageAggregator.Publish(new SeriesRenamedEvent(series));
_messageAggregator.PublishEvent(new SeriesRenamedEvent(series));
notification.CurrentMessage = String.Format("Rename completed for {0} Season {1}", series.Title, options.SeasonNumber);
}

View File

@ -89,7 +89,7 @@ namespace NzbDrone.Core.Jobs.Implementations
//Start AfterRename
_messageAggregator.Publish(new SeriesRenamedEvent(series));
_messageAggregator.PublishEvent(new SeriesRenamedEvent(series));
notification.CurrentMessage = String.Format("Rename completed for {0}", series.Title);
}

View File

@ -83,7 +83,7 @@ namespace NzbDrone.Core.MediaFiles
if (newDownload)
{
_messageAggregator.Publish(new EpisodeDownloadedEvent(parsedEpisodeInfo, series));
_messageAggregator.PublishEvent(new EpisodeDownloadedEvent(parsedEpisodeInfo, series));
}
return episodeFile;

View File

@ -40,7 +40,7 @@ namespace NzbDrone.Core.MediaFiles
public EpisodeFile Add(EpisodeFile episodeFile)
{
var addedFile = _mediaFileRepository.Insert(episodeFile);
_messageAggregator.Publish(new EpisodeFileAddedEvent(addedFile));
_messageAggregator.PublishEvent(new EpisodeFileAddedEvent(addedFile));
return addedFile;
}
@ -52,7 +52,7 @@ namespace NzbDrone.Core.MediaFiles
public void Delete(EpisodeFile episodeFile)
{
_mediaFileRepository.Delete(episodeFile);
_messageAggregator.Publish(new EpisodeFileDeletedEvent(episodeFile));
_messageAggregator.PublishEvent(new EpisodeFileDeletedEvent(episodeFile));
}
public bool Exists(string path)

View File

@ -238,6 +238,7 @@
<Compile Include="Indexers\IIndexerBase.cs" />
<Compile Include="Indexers\IndexerSettingUpdatedEvent.cs" />
<Compile Include="Indexers\IndexerWithSetting.cs" />
<Compile Include="Indexers\RssSyncCommand.cs" />
<Compile Include="Lifecycle\ApplicationShutdownRequested.cs" />
<Compile Include="MediaFiles\Events\EpisodeDownloadedEvent.cs" />
<Compile Include="Download\EpisodeGrabbedEvent.cs" />

View File

@ -211,12 +211,12 @@ namespace NzbDrone.Core.Tv
if (newList.Any())
{
_messageAggregator.Publish(new EpisodeInfoAddedEvent(newList));
_messageAggregator.PublishEvent(new EpisodeInfoAddedEvent(newList));
}
if (updateList.Any())
{
_messageAggregator.Publish(new EpisodeInfoUpdatedEvent(updateList));
_messageAggregator.PublishEvent(new EpisodeInfoUpdatedEvent(updateList));
}
if (failCount != 0)

View File

@ -83,7 +83,7 @@ namespace NzbDrone.Core.Tv
//Todo: We need to get the UtcOffset from TVRage, since its not available from trakt
_messageAggregator.Publish(new SeriesUpdatedEvent(series));
_messageAggregator.PublishEvent(new SeriesUpdatedEvent(series));
return series;
}
@ -114,7 +114,7 @@ namespace NzbDrone.Core.Tv
newSeries.BacklogSetting = BacklogSettingType.Inherit;
_seriesRepository.Insert(newSeries);
_messageAggregator.Publish(new SeriesAddedEvent(newSeries));
_messageAggregator.PublishEvent(new SeriesAddedEvent(newSeries));
return newSeries;
}
@ -158,7 +158,7 @@ namespace NzbDrone.Core.Tv
{
var series = _seriesRepository.Get(seriesId);
_seriesRepository.Delete(seriesId);
_messageAggregator.Publish(new SeriesDeletedEvent(series, deleteFiles));
_messageAggregator.PublishEvent(new SeriesDeletedEvent(series, deleteFiles));
}
public List<Series> GetAllSeries()

View File

@ -0,0 +1,17 @@
using NUnit.Framework;
using NzbDrone.Api.Commands;
namespace NzbDrone.Integration.Test
{
[TestFixture]
public class CommandIntegrationTest : IntegrationTest
{
[Test]
public void should_be_able_to_run_rss_sync()
{
Commands.Post(new CommandResource {Command = "rsssync"});
}
}
}

View File

@ -6,6 +6,7 @@ using NLog.Targets;
using NUnit.Framework;
using Nancy.Hosting.Self;
using NzbDrone.Api;
using NzbDrone.Api.Commands;
using NzbDrone.Api.RootFolders;
using NzbDrone.Common;
using NzbDrone.Core.Datastore;
@ -29,6 +30,7 @@ namespace NzbDrone.Integration.Test
protected SeriesClient Series;
protected ClientBase<RootFolderResource> RootFolders;
protected ClientBase<CommandResource> Commands;
static IntegrationTest()
{
@ -84,6 +86,7 @@ namespace NzbDrone.Integration.Test
RestClient = new RestClient(url + "/api/");
Series = new SeriesClient(RestClient);
RootFolders = new ClientBase<RootFolderResource>(RestClient);
Commands = new ClientBase<CommandResource>(RestClient);
_host.Start();
}
@ -96,4 +99,5 @@ namespace NzbDrone.Integration.Test
_bootstrapper.Shutdown();
}
}
}

View File

@ -76,6 +76,7 @@
<ItemGroup>
<Compile Include="Client\ClientBase.cs" />
<Compile Include="Client\SeriesClient.cs" />
<Compile Include="CommandIntegerationTests.cs" />
<Compile Include="IntegrationTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RootFolderIntegrationTest.cs" />

View File

@ -122,12 +122,12 @@ namespace NzbDrone.Test.Common
protected void VerifyEventPublished<TEvent>(Times times) where TEvent : IEvent
{
Mocker.GetMock<IMessageAggregator>().Verify(c => c.Publish(It.IsAny<TEvent>()), times);
Mocker.GetMock<IMessageAggregator>().Verify(c => c.PublishEvent(It.IsAny<TEvent>()), times);
}
protected void VerifyEventNotPublished<TEvent>() where TEvent : IEvent
{
Mocker.GetMock<IMessageAggregator>().Verify(c => c.Publish(It.IsAny<TEvent>()), Times.Never());
Mocker.GetMock<IMessageAggregator>().Verify(c => c.PublishEvent(It.IsAny<TEvent>()), Times.Never());
}
}
}

View File

@ -23,7 +23,7 @@
"grunt-contrib-concat": "*",
"grunt-contrib-copy": "*",
"grunt-wrap": "*",
"grunt-curl": "~0.6.0",
"grunt-curl": "*",
"grunt-notify": "*"
}
}