diff options
| author | Luke <luke.pulverenti@gmail.com> | 2016-10-05 11:20:48 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-10-05 11:20:48 -0400 |
| commit | a2fa7c475404347aaf675bdd708a69c451f5af2e (patch) | |
| tree | e9aa273f5d6d77473d3d1f345e2890c864a0e481 /MediaBrowser.Server.Implementations/LiveTv | |
| parent | 0a87046736843dfa983213ae1e661d499a6d1df9 (diff) | |
| parent | 83606d82d57604f9796455640d2e93367783f69e (diff) | |
Merge pull request #2210 from MediaBrowser/dev
Dev
Diffstat (limited to 'MediaBrowser.Server.Implementations/LiveTv')
4 files changed, 128 insertions, 19 deletions
diff --git a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs index a3ba2c6f86..5b99849cd6 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/EmbyTV/EmbyTV.cs @@ -36,7 +36,7 @@ using Microsoft.Win32; namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV { - public class EmbyTV : ILiveTvService, ISupportsNewTimerIds, IDisposable + public class EmbyTV : ILiveTvService, ISupportsDirectStreamProvider, ISupportsNewTimerIds, IDisposable { private readonly IApplicationHost _appHpst; private readonly ILogger _logger; @@ -829,9 +829,16 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV public async Task<MediaSourceInfo> GetChannelStream(string channelId, string streamId, CancellationToken cancellationToken) { + var result = await GetChannelStreamWithDirectStreamProvider(channelId, streamId, cancellationToken).ConfigureAwait(false); + + return result.Item1; + } + + public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, CancellationToken cancellationToken) + { var result = await GetChannelStreamInternal(channelId, streamId, cancellationToken).ConfigureAwait(false); - return result.Item2; + return new Tuple<MediaSourceInfo, IDirectStreamProvider>(result.Item2, result.Item1 as IDirectStreamProvider); } private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, int consumerId, bool enableStreamSharing) diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs index 84b1622861..546525fd1a 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvManager.cs @@ -227,10 +227,12 @@ namespace MediaBrowser.Server.Implementations.LiveTv public async Task<MediaSourceInfo> GetRecordingStream(string id, CancellationToken cancellationToken) { - return await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false); + var info = await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false); + + return info.Item1; } - public async Task<MediaSourceInfo> GetChannelStream(string id, string mediaSourceId, CancellationToken cancellationToken) + public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetChannelStream(string id, string mediaSourceId, CancellationToken cancellationToken) { return await GetLiveStream(id, mediaSourceId, true, cancellationToken).ConfigureAwait(false); } @@ -280,7 +282,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv return _services.FirstOrDefault(i => string.Equals(i.Name, name, StringComparison.OrdinalIgnoreCase)); } - private async Task<MediaSourceInfo> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken) + private async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken) { await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); @@ -294,6 +296,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv MediaSourceInfo info; bool isVideo; ILiveTvService service; + IDirectStreamProvider directStreamProvider = null; if (isChannel) { @@ -301,7 +304,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv isVideo = channel.ChannelType == ChannelType.TV; service = GetService(channel); _logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId); - info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); + + var supportsManagedStream = service as ISupportsDirectStreamProvider; + if (supportsManagedStream != null) + { + var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); + info = streamInfo.Item1; + directStreamProvider = streamInfo.Item2; + } + else + { + info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false); + } info.RequiresClosing = true; if (info.RequiresClosing) @@ -332,7 +346,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv _logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info)); Normalize(info, service, isVideo); - return info; + return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider); } catch (Exception ex) { @@ -544,11 +558,13 @@ namespace MediaBrowser.Server.Implementations.LiveTv return item; } - private async Task<LiveTvProgram> GetProgram(ProgramInfo info, LiveTvChannel channel, ChannelType channelType, string serviceName, CancellationToken cancellationToken) + private async Task<LiveTvProgram> GetProgram(ProgramInfo info, Dictionary<Guid, LiveTvProgram> allExistingPrograms, LiveTvChannel channel, ChannelType channelType, string serviceName, CancellationToken cancellationToken) { var id = _tvDtoService.GetInternalProgramId(serviceName, info.Id); - var item = _libraryManager.GetItemById(id) as LiveTvProgram; + LiveTvProgram item = null; + allExistingPrograms.TryGetValue(id, out item); + var isNew = false; var forceUpdate = false; @@ -877,7 +893,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv if (!string.IsNullOrWhiteSpace(query.SeriesTimerId)) { - var seriesTimers = await GetSeriesTimersInternal(new SeriesTimerQuery {}, cancellationToken).ConfigureAwait(false); + var seriesTimers = await GetSeriesTimersInternal(new SeriesTimerQuery { }, cancellationToken).ConfigureAwait(false); var seriesTimer = seriesTimers.Items.FirstOrDefault(i => string.Equals(_tvDtoService.GetInternalSeriesTimerId(i.ServiceName, i.Id).ToString("N"), query.SeriesTimerId, StringComparison.OrdinalIgnoreCase)); if (seriesTimer != null) { @@ -1265,9 +1281,17 @@ namespace MediaBrowser.Server.Implementations.LiveTv var channelPrograms = await service.GetProgramsAsync(currentChannel.ExternalId, start, end, cancellationToken).ConfigureAwait(false); + var existingPrograms = _libraryManager.GetItemList(new InternalItemsQuery + { + + IncludeItemTypes = new string[] { typeof(LiveTvProgram).Name }, + ChannelIds = new string[] { currentChannel.Id.ToString("N") } + + }).Cast<LiveTvProgram>().ToDictionary(i => i.Id); + foreach (var program in channelPrograms) { - var programItem = await GetProgram(program, currentChannel, currentChannel.ChannelType, service.Name, cancellationToken).ConfigureAwait(false); + var programItem = await GetProgram(program, existingPrograms, currentChannel, currentChannel.ChannelType, service.Name, cancellationToken).ConfigureAwait(false); programs.Add(programItem.Id); @@ -1881,11 +1905,11 @@ namespace MediaBrowser.Server.Implementations.LiveTv { if (query.IsScheduled.Value) { - timers = timers.Where(i => i.Item1.Status == RecordingStatus.New || i.Item1.Status == RecordingStatus.Scheduled); + timers = timers.Where(i => i.Item1.Status == RecordingStatus.New); } else { - timers = timers.Where(i => !(i.Item1.Status == RecordingStatus.New || i.Item1.Status == RecordingStatus.Scheduled)); + timers = timers.Where(i => !(i.Item1.Status == RecordingStatus.New)); } } diff --git a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs index aacc0c22bc..1861b79eb5 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/LiveTvMediaSourceProvider.cs @@ -116,17 +116,20 @@ namespace MediaBrowser.Server.Implementations.LiveTv return list; } - public async Task<MediaSourceInfo> OpenMediaSource(string openToken, CancellationToken cancellationToken) + public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> OpenMediaSource(string openToken, CancellationToken cancellationToken) { - MediaSourceInfo stream; + MediaSourceInfo stream = null; const bool isAudio = false; var keys = openToken.Split(new[] { StreamIdDelimeter }, 3); var mediaSourceId = keys.Length >= 3 ? keys[2] : null; + IDirectStreamProvider directStreamProvider = null; if (string.Equals(keys[0], typeof(LiveTvChannel).Name, StringComparison.OrdinalIgnoreCase)) { - stream = await _liveTvManager.GetChannelStream(keys[1], mediaSourceId, cancellationToken).ConfigureAwait(false); + var info = await _liveTvManager.GetChannelStream(keys[1], mediaSourceId, cancellationToken).ConfigureAwait(false); + stream = info.Item1; + directStreamProvider = info.Item2; } else { @@ -142,7 +145,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv _logger.ErrorException("Error probing live tv stream", ex); } - return stream; + return new Tuple<MediaSourceInfo, IDirectStreamProvider>(stream, directStreamProvider); } private async Task AddMediaInfo(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken) diff --git a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs index d49e3f2587..dd635bd55e 100644 --- a/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs +++ b/MediaBrowser.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunLiveStream.cs @@ -6,14 +6,16 @@ using CommonIO; using MediaBrowser.Common.Net; using MediaBrowser.Controller; using MediaBrowser.Controller.LiveTv; +using MediaBrowser.Controller.Library; using MediaBrowser.Model.Dto; using MediaBrowser.Model.Logging; using MediaBrowser.Model.MediaInfo; using MediaBrowser.Server.Implementations.LiveTv.EmbyTV; +using System.Collections.Generic; namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun { - public class HdHomerunLiveStream : LiveStream + public class HdHomerunLiveStream : LiveStream, IDirectStreamProvider { private readonly ILogger _logger; private readonly IHttpClient _httpClient; @@ -106,7 +108,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun { onStarted = () => ResolveWhenExists(openTaskCompletionSource, tempFilePath, cancellationToken); } - await DirectRecorder.CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false); + await CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false); } } } @@ -137,6 +139,79 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun }).ConfigureAwait(false); } + private List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>> _additionalStreams = new List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>>(); + + public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) + { + var taskCompletionSource = new TaskCompletionSource<bool>(); + _additionalStreams.Add(new Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>(stream, cancellationToken, taskCompletionSource)); + + return taskCompletionSource.Task; + } + + private void PopAdditionalStream(Tuple<Stream, CancellationToken, TaskCompletionSource<bool>> stream, Exception exception) + { + if (_additionalStreams.Remove(stream)) + { + stream.Item3.TrySetException(exception); + } + } + + private const int BufferSize = 81920; + private async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false); + + onStarted = null; + + //var position = fs.Position; + //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); + + if (bytesRead == 0) + { + await Task.Delay(100).ConfigureAwait(false); + } + } + } + + private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken) + { + byte[] buffer = new byte[bufferSize]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0) + { + await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); + + foreach (var additionalStream in _additionalStreams) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + await additionalStream.Item1.WriteAsync(buffer, 0, bytesRead, additionalStream.Item2).ConfigureAwait(false); + } + catch (Exception ex) + { + PopAdditionalStream(additionalStream, ex); + } + } + + totalBytesRead += bytesRead; + + if (onStarted != null) + { + onStarted(); + } + onStarted = null; + } + + return totalBytesRead; + } + private async void ResolveWhenExists(TaskCompletionSource<bool> taskCompletionSource, string file, CancellationToken cancellationToken) { while (!File.Exists(file) && !cancellationToken.IsCancellationRequested) |
