From d9518be3ed3923d3fd2ff4470c9dfbd7c80ad8d9 Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Sat, 7 Mar 2015 23:44:31 -0500 Subject: update server sync --- .../MediaBrowser.Server.Implementations.csproj | 1 + .../Sync/MediaSync.cs | 7 +- .../Sync/MultiProviderSync.cs | 8 +- .../Sync/ServerSyncScheduledTask.cs | 2 +- .../Sync/SyncManager.cs | 14 +- .../Sync/TargetDataProvider.cs | 242 +++++++++++++++++++++ 6 files changed, 267 insertions(+), 7 deletions(-) create mode 100644 MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs (limited to 'MediaBrowser.Server.Implementations') diff --git a/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj b/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj index 607ba3c41..41f970041 100644 --- a/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj +++ b/MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj @@ -316,6 +316,7 @@ + diff --git a/MediaBrowser.Server.Implementations/Sync/MediaSync.cs b/MediaBrowser.Server.Implementations/Sync/MediaSync.cs index 70c366bf5..246a82b20 100644 --- a/MediaBrowser.Server.Implementations/Sync/MediaSync.cs +++ b/MediaBrowser.Server.Implementations/Sync/MediaSync.cs @@ -206,9 +206,12 @@ namespace MediaBrowser.Server.Implementations.Sync await dataProvider.Delete(target, localId).ConfigureAwait(false); } - private Task SendFile(IServerSyncProvider provider, string inputPath, LocalItem item, SyncTarget target, CancellationToken cancellationToken) + private async Task SendFile(IServerSyncProvider provider, string inputPath, LocalItem item, SyncTarget target, CancellationToken cancellationToken) { - return provider.SendFile(inputPath, item.LocalPath, target, new Progress(), cancellationToken); + using (var stream = _fileSystem.GetFileStream(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read, true)) + { + await provider.SendFile(stream, item.LocalPath, target, new Progress(), cancellationToken).ConfigureAwait(false); + } } private string GetLocalId(string serverId, string itemId) diff --git a/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs b/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs index cbfa82f1d..a8bc24c2a 100644 --- a/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs +++ b/MediaBrowser.Server.Implementations/Sync/MultiProviderSync.cs @@ -14,12 +14,12 @@ namespace MediaBrowser.Server.Implementations.Sync { public class MultiProviderSync { - private readonly ISyncManager _syncManager; + private readonly SyncManager _syncManager; private readonly IServerApplicationHost _appHost; private readonly ILogger _logger; private readonly IFileSystem _fileSystem; - public MultiProviderSync(ISyncManager syncManager, IServerApplicationHost appHost, ILogger logger, IFileSystem fileSystem) + public MultiProviderSync(SyncManager syncManager, IServerApplicationHost appHost, ILogger logger, IFileSystem fileSystem) { _syncManager = syncManager; _appHost = appHost; @@ -54,8 +54,10 @@ namespace MediaBrowser.Server.Implementations.Sync progress.Report(totalProgress); }); + var dataProvider = _syncManager.GetDataProvider(target.Item1, target.Item2); + await new MediaSync(_logger, _syncManager, _appHost, _fileSystem) - .Sync(target.Item1, target.Item1.GetDataProvider(), target.Item2, innerProgress, cancellationToken) + .Sync(target.Item1, dataProvider, target.Item2, innerProgress, cancellationToken) .ConfigureAwait(false); numComplete++; diff --git a/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs b/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs index 170860dc2..33b1e13bd 100644 --- a/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs +++ b/MediaBrowser.Server.Implementations/Sync/ServerSyncScheduledTask.cs @@ -46,7 +46,7 @@ namespace MediaBrowser.Server.Implementations.Sync public Task Execute(CancellationToken cancellationToken, IProgress progress) { - return new MultiProviderSync(_syncManager, _appHost, _logger, _fileSystem) + return new MultiProviderSync((SyncManager)_syncManager, _appHost, _logger, _fileSystem) .Sync(ServerSyncProviders, progress, cancellationToken); } diff --git a/MediaBrowser.Server.Implementations/Sync/SyncManager.cs b/MediaBrowser.Server.Implementations/Sync/SyncManager.cs index d0d65d437..8474cc8c5 100644 --- a/MediaBrowser.Server.Implementations/Sync/SyncManager.cs +++ b/MediaBrowser.Server.Implementations/Sync/SyncManager.cs @@ -21,10 +21,12 @@ using MediaBrowser.Model.Entities; using MediaBrowser.Model.Events; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Querying; +using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Sync; using MediaBrowser.Model.Users; using MoreLinq; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -49,6 +51,7 @@ namespace MediaBrowser.Server.Implementations.Sync private readonly IConfigurationManager _config; private readonly IUserDataManager _userDataManager; private readonly Func _mediaSourceManager; + private readonly IJsonSerializer _json; private ISyncProvider[] _providers = { }; @@ -58,7 +61,7 @@ namespace MediaBrowser.Server.Implementations.Sync public event EventHandler> SyncJobItemUpdated; public event EventHandler> SyncJobItemCreated; - public SyncManager(ILibraryManager libraryManager, ISyncRepository repo, IImageProcessor imageProcessor, ILogger logger, IUserManager userManager, Func dtoService, IApplicationHost appHost, ITVSeriesManager tvSeriesManager, Func mediaEncoder, IFileSystem fileSystem, Func subtitleEncoder, IConfigurationManager config, IUserDataManager userDataManager, Func mediaSourceManager) + public SyncManager(ILibraryManager libraryManager, ISyncRepository repo, IImageProcessor imageProcessor, ILogger logger, IUserManager userManager, Func dtoService, IApplicationHost appHost, ITVSeriesManager tvSeriesManager, Func mediaEncoder, IFileSystem fileSystem, Func subtitleEncoder, IConfigurationManager config, IUserDataManager userDataManager, Func mediaSourceManager, IJsonSerializer json) { _libraryManager = libraryManager; _repo = repo; @@ -74,6 +77,7 @@ namespace MediaBrowser.Server.Implementations.Sync _config = config; _userDataManager = userDataManager; _mediaSourceManager = mediaSourceManager; + _json = json; } public void AddParts(IEnumerable providers) @@ -86,6 +90,14 @@ namespace MediaBrowser.Server.Implementations.Sync get { return _providers.OfType(); } } + private readonly ConcurrentDictionary _dataProviders = + new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public ISyncDataProvider GetDataProvider(IServerSyncProvider provider, SyncTarget target) + { + return _dataProviders.GetOrAdd(target.Id, key => new TargetDataProvider(provider, target, _appHost.SystemId, _logger, _json, _fileSystem, _config.CommonApplicationPaths)); + } + public async Task CreateJob(SyncJobRequest request) { var processor = GetSyncJobProcessor(); diff --git a/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs b/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs new file mode 100644 index 000000000..d068a9e4a --- /dev/null +++ b/MediaBrowser.Server.Implementations/Sync/TargetDataProvider.cs @@ -0,0 +1,242 @@ +using MediaBrowser.Common.Configuration; +using MediaBrowser.Common.Extensions; +using MediaBrowser.Common.IO; +using MediaBrowser.Controller.Sync; +using MediaBrowser.Model.Logging; +using MediaBrowser.Model.Serialization; +using MediaBrowser.Model.Sync; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace MediaBrowser.Server.Implementations.Sync +{ + public class TargetDataProvider : ISyncDataProvider + { + private readonly SyncTarget _target; + private readonly IServerSyncProvider _provider; + + private readonly SemaphoreSlim _dataLock = new SemaphoreSlim(1, 1); + private List _items; + + private readonly ILogger _logger; + private readonly IJsonSerializer _json; + private readonly IFileSystem _fileSystem; + private readonly IApplicationPaths _appPaths; + private readonly string _serverId; + + private readonly SemaphoreSlim _cacheFileLock = new SemaphoreSlim(1, 1); + + public TargetDataProvider(IServerSyncProvider provider, SyncTarget target, string serverId, ILogger logger, IJsonSerializer json, IFileSystem fileSystem, IApplicationPaths appPaths) + { + _logger = logger; + _json = json; + _provider = provider; + _target = target; + _fileSystem = fileSystem; + _appPaths = appPaths; + _serverId = serverId; + } + + private string GetCachePath() + { + return Path.Combine(_appPaths.DataPath, "sync", _target.Id.GetMD5().ToString("N") + ".json"); + } + + private string GetRemotePath() + { + var parts = new List + { + _serverId, + "data.json" + }; + + return _provider.GetFullPath(parts, _target); + } + + private async Task CacheData(Stream stream) + { + var cachePath = GetCachePath(); + + await _cacheFileLock.WaitAsync().ConfigureAwait(false); + + try + { + Directory.CreateDirectory(Path.GetDirectoryName(cachePath)); + using (var fileStream = _fileSystem.GetFileStream(cachePath, FileMode.Create, FileAccess.Write, FileShare.Read, true)) + { + await stream.CopyToAsync(fileStream).ConfigureAwait(false); + } + } + catch (Exception ex) + { + _logger.ErrorException("Error saving sync data to {0}", ex, cachePath); + } + finally + { + _cacheFileLock.Release(); + } + } + + private async Task EnsureData(CancellationToken cancellationToken) + { + if (_items == null) + { + try + { + using (var stream = await _provider.GetFile(GetRemotePath(), _target, new Progress(), cancellationToken)) + { + _items = _json.DeserializeFromStream>(stream); + } + } + catch (FileNotFoundException) + { + _items = new List(); + } + catch (DirectoryNotFoundException) + { + _items = new List(); + } + + using (var memoryStream = new MemoryStream()) + { + _json.SerializeToStream(_items, memoryStream); + + // Now cache it + memoryStream.Position = 0; + await CacheData(memoryStream).ConfigureAwait(false); + } + } + } + + private async Task SaveData(CancellationToken cancellationToken) + { + using (var stream = new MemoryStream()) + { + _json.SerializeToStream(_items, stream); + + // Save to sync provider + stream.Position = 0; + await _provider.SendFile(stream, GetRemotePath(), _target, new Progress(), cancellationToken).ConfigureAwait(false); + + // Now cache it + stream.Position = 0; + await CacheData(stream).ConfigureAwait(false); + } + } + + private async Task GetData(Func, T> dataFactory) + { + await _dataLock.WaitAsync().ConfigureAwait(false); + + try + { + await EnsureData(CancellationToken.None).ConfigureAwait(false); + + return dataFactory(_items); + } + finally + { + _dataLock.Release(); + } + } + + private async Task UpdateData(Func, List> action) + { + await _dataLock.WaitAsync().ConfigureAwait(false); + + try + { + await EnsureData(CancellationToken.None).ConfigureAwait(false); + + _items = action(_items); + + await SaveData(CancellationToken.None).ConfigureAwait(false); + } + finally + { + _dataLock.Release(); + } + } + + public Task> GetServerItemIds(SyncTarget target, string serverId) + { + return GetData(items => items.Where(i => string.Equals(i.ServerId, serverId, StringComparison.OrdinalIgnoreCase)).Select(i => i.ItemId).ToList()); + } + + public Task AddOrUpdate(SyncTarget target, LocalItem item) + { + return UpdateData(items => + { + var list = items.Where(i => !string.Equals(i.Id, item.Id, StringComparison.OrdinalIgnoreCase)) + .ToList(); + + list.Add(item); + + return list; + }); + } + + public Task Delete(SyncTarget target, string id) + { + return UpdateData(items => items.Where(i => !string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase)).ToList()); + } + + public Task Get(SyncTarget target, string id) + { + return GetData(items => items.FirstOrDefault(i => string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase))); + } + + private async Task> GetCachedData() + { + if (_items == null) + { + await _cacheFileLock.WaitAsync().ConfigureAwait(false); + + try + { + if (_items == null) + { + try + { + _items = _json.DeserializeFromFile>(GetCachePath()); + } + catch (FileNotFoundException) + { + _items = new List(); + } + catch (DirectoryNotFoundException) + { + _items = new List(); + } + } + } + finally + { + _cacheFileLock.Release(); + } + } + + return _items.ToList(); + } + + public async Task> GetCachedServerItemIds(SyncTarget target, string serverId) + { + var items = await GetCachedData().ConfigureAwait(false); + + return items.Where(i => string.Equals(i.ServerId, serverId, StringComparison.OrdinalIgnoreCase)) + .Select(i => i.ItemId) + .ToList(); + } + + public async Task GetCachedItem(SyncTarget target, string id) + { + var items = await GetCachedData().ConfigureAwait(false); + + return items.FirstOrDefault(i => string.Equals(i.Id, id, StringComparison.OrdinalIgnoreCase)); + } + } +} -- cgit v1.2.3