1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Net;
namespace Emby.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
private readonly ConcurrentDictionary<Guid, QueueStream> _outputStreams = new ConcurrentDictionary<Guid, QueueStream>();
private const int BufferSize = 81920;
private readonly ILogger _logger;
public MulticastStream(ILogger logger)
{
_logger = logger;
}
public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken)
{
byte[] buffer = new byte[BufferSize];
if (source == null)
{
throw new ArgumentNullException("source");
}
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var bytesRead = source.Read(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
var allStreams = _outputStreams.ToList();
//if (allStreams.Count == 1)
//{
// await allStreams[0].Value.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
//}
//else
{
byte[] copy = new byte[bytesRead];
Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
foreach (var stream in allStreams)
{
stream.Value.Queue(copy, 0, copy.Length);
}
}
if (onStarted != null)
{
var onStartedCopy = onStarted;
onStarted = null;
Task.Run(onStartedCopy);
}
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
var result = new QueueStream(stream, _logger)
{
OnFinished = OnFinished
};
_outputStreams.TryAdd(result.Id, result);
result.Start(cancellationToken);
return result.TaskCompletion.Task;
}
public void RemoveOutputStream(QueueStream stream)
{
QueueStream removed;
_outputStreams.TryRemove(stream.Id, out removed);
}
private void OnFinished(QueueStream queueStream)
{
RemoveOutputStream(queueStream);
}
}
}
|