From bfcd1b520fd79b893e721ba916ae5e1656407d2f Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Wed, 16 Aug 2017 02:43:41 -0400 Subject: merge common implementations and server implementations --- .../ScheduledTasks/TaskManager.cs | 334 +++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 Emby.Server.Implementations/ScheduledTasks/TaskManager.cs (limited to 'Emby.Server.Implementations/ScheduledTasks/TaskManager.cs') diff --git a/Emby.Server.Implementations/ScheduledTasks/TaskManager.cs b/Emby.Server.Implementations/ScheduledTasks/TaskManager.cs new file mode 100644 index 000000000..5f9bf3731 --- /dev/null +++ b/Emby.Server.Implementations/ScheduledTasks/TaskManager.cs @@ -0,0 +1,334 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using MediaBrowser.Common.Configuration; +using MediaBrowser.Common.Events; +using MediaBrowser.Model.Events; +using MediaBrowser.Model.IO; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Serialization; +using MediaBrowser.Model.System; +using MediaBrowser.Model.Tasks; + +namespace Emby.Server.Implementations.ScheduledTasks +{ + /// + /// Class TaskManager + /// + public class TaskManager : ITaskManager + { + public event EventHandler> TaskExecuting; + public event EventHandler TaskCompleted; + + /// + /// Gets the list of Scheduled Tasks + /// + /// The scheduled tasks. + public IScheduledTaskWorker[] ScheduledTasks { get; private set; } + + /// + /// The _task queue + /// + private readonly ConcurrentQueue> _taskQueue = + new ConcurrentQueue>(); + + /// + /// Gets or sets the json serializer. + /// + /// The json serializer. + private IJsonSerializer JsonSerializer { get; set; } + + /// + /// Gets or sets the application paths. + /// + /// The application paths. + private IApplicationPaths ApplicationPaths { get; set; } + + private readonly ISystemEvents _systemEvents; + + /// + /// Gets the logger. + /// + /// The logger. + private ILogger Logger { get; set; } + private readonly IFileSystem _fileSystem; + + /// + /// Initializes a new instance of the class. + /// + /// The application paths. + /// The json serializer. + /// The logger. + /// kernel + public TaskManager(IApplicationPaths applicationPaths, IJsonSerializer jsonSerializer, ILogger logger, IFileSystem fileSystem, ISystemEvents systemEvents) + { + ApplicationPaths = applicationPaths; + JsonSerializer = jsonSerializer; + Logger = logger; + _fileSystem = fileSystem; + _systemEvents = systemEvents; + + ScheduledTasks = new IScheduledTaskWorker[] { }; + } + + private void BindToSystemEvent() + { + _systemEvents.Resume += _systemEvents_Resume; + } + + private void _systemEvents_Resume(object sender, EventArgs e) + { + foreach (var task in ScheduledTasks) + { + task.ReloadTriggerEvents(); + } + } + + /// + /// Cancels if running and queue. + /// + /// + /// Task options. + public void CancelIfRunningAndQueue(TaskExecutionOptions options) + where T : IScheduledTask + { + var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); + ((ScheduledTaskWorker)task).CancelIfRunning(); + + QueueScheduledTask(options); + } + + public void CancelIfRunningAndQueue() + where T : IScheduledTask + { + CancelIfRunningAndQueue(new TaskExecutionOptions()); + } + + /// + /// Cancels if running + /// + /// + public void CancelIfRunning() + where T : IScheduledTask + { + var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); + ((ScheduledTaskWorker)task).CancelIfRunning(); + } + + /// + /// Queues the scheduled task. + /// + /// + /// Task options + public void QueueScheduledTask(TaskExecutionOptions options) + where T : IScheduledTask + { + var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); + + if (scheduledTask == null) + { + Logger.Error("Unable to find scheduled task of type {0} in QueueScheduledTask.", typeof(T).Name); + } + else + { + QueueScheduledTask(scheduledTask, options); + } + } + + public void QueueScheduledTask() + where T : IScheduledTask + { + QueueScheduledTask(new TaskExecutionOptions()); + } + + public void QueueIfNotRunning() + where T : IScheduledTask + { + var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); + + if (task.State != TaskState.Running) + { + QueueScheduledTask(new TaskExecutionOptions()); + } + } + + public void Execute() + where T : IScheduledTask + { + var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); + + if (scheduledTask == null) + { + Logger.Error("Unable to find scheduled task of type {0} in Execute.", typeof(T).Name); + } + else + { + var type = scheduledTask.ScheduledTask.GetType(); + + Logger.Info("Queueing task {0}", type.Name); + + lock (_taskQueue) + { + if (scheduledTask.State == TaskState.Idle) + { + Execute(scheduledTask, new TaskExecutionOptions()); + } + } + } + } + + /// + /// Queues the scheduled task. + /// + /// The task. + /// The task options. + public void QueueScheduledTask(IScheduledTask task, TaskExecutionOptions options) + { + var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == task.GetType()); + + if (scheduledTask == null) + { + Logger.Error("Unable to find scheduled task of type {0} in QueueScheduledTask.", task.GetType().Name); + } + else + { + QueueScheduledTask(scheduledTask, options); + } + } + + /// + /// Queues the scheduled task. + /// + /// The task. + /// The task options. + private void QueueScheduledTask(IScheduledTaskWorker task, TaskExecutionOptions options) + { + var type = task.ScheduledTask.GetType(); + + Logger.Info("Queueing task {0}", type.Name); + + lock (_taskQueue) + { + if (task.State == TaskState.Idle) + { + Execute(task, options); + return; + } + + _taskQueue.Enqueue(new Tuple(type, options)); + } + } + + /// + /// Adds the tasks. + /// + /// The tasks. + public void AddTasks(IEnumerable tasks) + { + var myTasks = ScheduledTasks.ToList(); + + var list = tasks.ToList(); + myTasks.AddRange(list.Select(t => new ScheduledTaskWorker(t, ApplicationPaths, this, JsonSerializer, Logger, _fileSystem, _systemEvents))); + + ScheduledTasks = myTasks.ToArray(); + + BindToSystemEvent(); + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool dispose) + { + foreach (var task in ScheduledTasks) + { + task.Dispose(); + } + } + + public void Cancel(IScheduledTaskWorker task) + { + ((ScheduledTaskWorker)task).Cancel(); + } + + public Task Execute(IScheduledTaskWorker task, TaskExecutionOptions options) + { + return ((ScheduledTaskWorker)task).Execute(options); + } + + /// + /// Called when [task executing]. + /// + /// The task. + internal void OnTaskExecuting(IScheduledTaskWorker task) + { + EventHelper.FireEventIfNotNull(TaskExecuting, this, new GenericEventArgs + { + Argument = task + + }, Logger); + } + + /// + /// Called when [task completed]. + /// + /// The task. + /// The result. + internal void OnTaskCompleted(IScheduledTaskWorker task, TaskResult result) + { + EventHelper.FireEventIfNotNull(TaskCompleted, task, new TaskCompletionEventArgs + { + Result = result, + Task = task + + }, Logger); + + ExecuteQueuedTasks(); + } + + /// + /// Executes the queued tasks. + /// + private void ExecuteQueuedTasks() + { + Logger.Info("ExecuteQueuedTasks"); + + // Execute queued tasks + lock (_taskQueue) + { + var list = new List>(); + + Tuple item; + while (_taskQueue.TryDequeue(out item)) + { + if (list.All(i => i.Item1 != item.Item1)) + { + list.Add(item); + } + } + + foreach (var enqueuedType in list) + { + var scheduledTask = ScheduledTasks.First(t => t.ScheduledTask.GetType() == enqueuedType.Item1); + + if (scheduledTask.State == TaskState.Idle) + { + Execute(scheduledTask, enqueuedType.Item2); + } + } + } + } + } +} -- cgit v1.2.3