From fb1de5a9213f7da98ed15a6975201d6bca3537d4 Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Wed, 27 Feb 2019 23:22:55 +0100 Subject: Remove more cruft and add the beginnings of a socket middleware --- .../WebSockets/WebSocketManager.cs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 Emby.Server.Implementations/WebSockets/WebSocketManager.cs (limited to 'Emby.Server.Implementations/WebSockets/WebSocketManager.cs') diff --git a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs new file mode 100644 index 000000000..7e74a4527 --- /dev/null +++ b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Concurrent; +using System.Net.WebSockets; + +namespace Emby.Server.Implementations.WebSockets +{ + public class WebSocketManager + { + private readonly ConcurrentDictionary _activeWebSockets; + + public WebSocketManager() + { + _activeWebSockets = new ConcurrentDictionary(); + } + + public void AddSocket(WebSocket webSocket) + { + var guid = Guid.NewGuid(); + _activeWebSockets.TryAdd(guid, webSocket); + } + } +} -- cgit v1.2.3 From 6bdb5debd2492b71d11f9628889b8c29b6321a77 Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Fri, 1 Mar 2019 14:08:51 +0100 Subject: Add some websocket manager boilerplate --- .../Middleware/WebSocketMiddleware.cs | 5 +- .../WebSockets/WebSocketHandler.cs | 10 +++ .../WebSockets/WebSocketManager.cs | 90 ++++++++++++++++++++-- 3 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 Emby.Server.Implementations/WebSockets/WebSocketHandler.cs (limited to 'Emby.Server.Implementations/WebSockets/WebSocketManager.cs') diff --git a/Emby.Server.Implementations/Middleware/WebSocketMiddleware.cs b/Emby.Server.Implementations/Middleware/WebSocketMiddleware.cs index a1d0e77d6..268bf4042 100644 --- a/Emby.Server.Implementations/Middleware/WebSocketMiddleware.cs +++ b/Emby.Server.Implementations/Middleware/WebSocketMiddleware.cs @@ -25,7 +25,10 @@ namespace Emby.Server.Implementations.Middleware if (httpContext.WebSockets.IsWebSocketRequest) { var webSocketContext = await httpContext.WebSockets.AcceptWebSocketAsync(null).ConfigureAwait(false); - _webSocketManager.AddSocket(webSocketContext); + if (webSocketContext != null) + { + await _webSocketManager.OnWebSocketConnected(webSocketContext); + } } else { diff --git a/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs b/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs new file mode 100644 index 000000000..70b9e85aa --- /dev/null +++ b/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; +using MediaBrowser.Model.Net; + +namespace Emby.Server.Implementations.WebSockets +{ + public interface IWebSocketHandler + { + Task ProcessMessage(WebSocketMessage message); + } +} diff --git a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs index 7e74a4527..888f2f0fc 100644 --- a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs +++ b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs @@ -1,22 +1,100 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.Net; +using MediaBrowser.Model.Net; +using MediaBrowser.Model.Serialization; +using Microsoft.Extensions.Logging; +using UtfUnknown; namespace Emby.Server.Implementations.WebSockets { public class WebSocketManager { - private readonly ConcurrentDictionary _activeWebSockets; + private readonly IWebSocketHandler[] _webSocketHandlers; + private readonly IJsonSerializer _jsonSerializer; + private readonly ILogger _logger; + private const int BufferSize = 4096; - public WebSocketManager() + public WebSocketManager(IWebSocketHandler[] webSocketHandlers, IJsonSerializer jsonSerializer, ILogger logger) { - _activeWebSockets = new ConcurrentDictionary(); + _webSocketHandlers = webSocketHandlers; + _jsonSerializer = jsonSerializer; + _logger = logger; } - public void AddSocket(WebSocket webSocket) + public async Task OnWebSocketConnected(WebSocket webSocket) { - var guid = Guid.NewGuid(); - _activeWebSockets.TryAdd(guid, webSocket); + var taskCompletionSource = new TaskCompletionSource(); + var cancellationToken = new CancellationTokenSource().Token; + WebSocketReceiveResult result; + var message = new List(); + + do + { + var buffer = WebSocket.CreateServerBuffer(BufferSize); + result = await webSocket.ReceiveAsync(buffer, cancellationToken); + message.AddRange(buffer.Array.Take(result.Count)); + + if (result.EndOfMessage) + { + await ProcessMessage(message.ToArray(), taskCompletionSource); + message.Clear(); + } + } while (!taskCompletionSource.Task.IsCompleted && + webSocket.State == WebSocketState.Open && + result.MessageType != WebSocketMessageType.Close); + + if (webSocket.State == WebSocketState.Open) + { + await webSocket.CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure, + result.CloseStatusDescription, cancellationToken); + } + } + + public async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource taskCompletionSource) + { + var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName; + var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase) + ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length) + : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length); + + // All messages are expected to be json + if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) + { + _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message); + return; + } + + try + { + var info = _jsonSerializer.DeserializeFromString>(message); + + _logger.LogDebug("Websocket message received: {0}", info.MessageType); + + var tasks = _webSocketHandlers.Select(handler => Task.Run(() => + { + try + { + handler.ProcessMessage(info).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, "{0} failed processing WebSocket message {1}", handler.GetType().Name, info.MessageType ?? string.Empty); + } + })); + + await Task.WhenAll(tasks); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing web socket message"); + } } } } -- cgit v1.2.3 From e823c11b46ccf0473aa72cb52cd5a3a9f977e61b Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Sun, 3 Mar 2019 08:29:23 +0100 Subject: Add certificate to https and minor cleanup --- Emby.Server.Implementations/ApplicationHost.cs | 26 ++++++++++------------ .../HttpServer/HttpListenerHost.cs | 11 --------- .../WebSockets/WebSocketHandler.cs | 2 +- .../WebSockets/WebSocketManager.cs | 8 ++++--- Jellyfin.Server/CoreAppHost.cs | 2 -- 5 files changed, 18 insertions(+), 31 deletions(-) (limited to 'Emby.Server.Implementations/WebSockets/WebSocketManager.cs') diff --git a/Emby.Server.Implementations/ApplicationHost.cs b/Emby.Server.Implementations/ApplicationHost.cs index e558b4354..05d8ee410 100644 --- a/Emby.Server.Implementations/ApplicationHost.cs +++ b/Emby.Server.Implementations/ApplicationHost.cs @@ -626,9 +626,12 @@ namespace Emby.Server.Implementations { options.Listen(IPAddress.Any, HttpPort); options.Listen(IPAddress.Loopback, HttpPort); - // TODO certs - options.Listen(IPAddress.Any, HttpsPort, listenOptions => { listenOptions.UseHttps(); }); - options.Listen(IPAddress.Loopback, HttpsPort, listenOptions => { listenOptions.UseHttps(); }); + + if (CertificateInfo != null) + { + options.Listen(IPAddress.Any, HttpsPort, listenOptions => { listenOptions.UseHttps(Certificate); }); + options.Listen(IPAddress.Loopback, HttpsPort, listenOptions => { listenOptions.UseHttps(Certificate); }); + } }) .UseContentRoot(Path.Combine(Directory.GetCurrentDirectory(), "jellyfin-web", "src")) .ConfigureServices(services => @@ -927,11 +930,9 @@ namespace Emby.Server.Implementations } } - protected virtual bool SupportsDualModeSockets => true; - - private X509Certificate GetCertificate(CertificateInfo info) + private X509Certificate2 GetCertificate(CertificateInfo info) { - var certificateLocation = info == null ? null : info.Path; + var certificateLocation = info?.Path; if (string.IsNullOrWhiteSpace(certificateLocation)) { @@ -1004,7 +1005,7 @@ namespace Emby.Server.Implementations return info; } - protected virtual FFMpegInfo GetFFMpegInfo() + protected FFMpegInfo GetFFMpegInfo() { return new FFMpegLoader(ApplicationPaths, FileSystemManager, GetFfmpegInstallInfo()) .GetFFMpegInfo(StartupOptions); @@ -1085,7 +1086,7 @@ namespace Emby.Server.Implementations /// private void SetStaticProperties() { - ((SqliteItemRepository)ItemRepository).ImageProcessor = ImageProcessor; + ItemRepository.ImageProcessor = ImageProcessor; // For now there's no real way to inject these properly BaseItem.Logger = LoggerFactory.CreateLogger("BaseItem"); @@ -1211,15 +1212,12 @@ namespace Emby.Server.Implementations AllConcreteTypes = GetComposablePartAssemblies() .SelectMany(x => x.ExportedTypes) - .Where(type => - { - return type.IsClass && !type.IsAbstract && !type.IsInterface && !type.IsGenericType; - }) + .Where(type => type.IsClass && !type.IsAbstract && !type.IsInterface && !type.IsGenericType) .ToArray(); } private CertificateInfo CertificateInfo { get; set; } - protected X509Certificate Certificate { get; private set; } + protected X509Certificate2 Certificate { get; private set; } private IEnumerable GetUrlPrefixes() { diff --git a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs index dbfb5e243..263fcdbe9 100644 --- a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs +++ b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs @@ -855,16 +855,5 @@ namespace Emby.Server.Implementations.HttpServer { Dispose(true); } - - public void StartServer(string[] urlPrefixes, IHttpListener httpListener) - { - UrlPrefixes = urlPrefixes; - - _listener = httpListener; - - _listener.WebSocketConnected = OnWebSocketConnected; - _listener.ErrorHandler = ErrorHandler; - _listener.RequestHandler = RequestHandler; - } } } diff --git a/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs b/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs index 70b9e85aa..eb1877440 100644 --- a/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs +++ b/Emby.Server.Implementations/WebSockets/WebSocketHandler.cs @@ -5,6 +5,6 @@ namespace Emby.Server.Implementations.WebSockets { public interface IWebSocketHandler { - Task ProcessMessage(WebSocketMessage message); + Task ProcessMessage(WebSocketMessage message, TaskCompletionSource taskCompletionSource); } } diff --git a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs index 888f2f0fc..5db2c8da7 100644 --- a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs +++ b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs @@ -35,6 +35,7 @@ namespace Emby.Server.Implementations.WebSockets WebSocketReceiveResult result; var message = new List(); + // Keep listening for incoming messages, otherwise the socket closes automatically do { var buffer = WebSocket.CreateServerBuffer(BufferSize); @@ -57,7 +58,7 @@ namespace Emby.Server.Implementations.WebSockets } } - public async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource taskCompletionSource) + private async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource taskCompletionSource) { var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName; var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase) @@ -81,11 +82,12 @@ namespace Emby.Server.Implementations.WebSockets { try { - handler.ProcessMessage(info).ConfigureAwait(false); + handler.ProcessMessage(info, taskCompletionSource).ConfigureAwait(false); } catch (Exception ex) { - _logger.LogError(ex, "{0} failed processing WebSocket message {1}", handler.GetType().Name, info.MessageType ?? string.Empty); + _logger.LogError(ex, "{HandlerType} failed processing WebSocket message {MessageType}", + handler.GetType().Name, info.MessageType ?? string.Empty); } })); diff --git a/Jellyfin.Server/CoreAppHost.cs b/Jellyfin.Server/CoreAppHost.cs index 9e5224790..17259c737 100644 --- a/Jellyfin.Server/CoreAppHost.cs +++ b/Jellyfin.Server/CoreAppHost.cs @@ -34,8 +34,6 @@ namespace Jellyfin.Server public override bool CanSelfRestart => StartupOptions.RestartPath != null; - protected override bool SupportsDualModeSockets => true; - protected override void RestartInternal() => Program.Restart(); protected override IEnumerable GetAssembliesWithPartsInternal() -- cgit v1.2.3 From 394d23a73a530b5ce2609d01575e0ced734aecfe Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Wed, 6 Mar 2019 19:14:03 +0100 Subject: Review comments --- Emby.Server.Implementations/WebSockets/WebSocketManager.cs | 2 +- .../Playback/Progressive/BaseProgressiveStreamingService.cs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'Emby.Server.Implementations/WebSockets/WebSocketManager.cs') diff --git a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs index 5db2c8da7..04c73ecea 100644 --- a/Emby.Server.Implementations/WebSockets/WebSocketManager.cs +++ b/Emby.Server.Implementations/WebSockets/WebSocketManager.cs @@ -65,7 +65,7 @@ namespace Emby.Server.Implementations.WebSockets ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length) : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length); - // All messages are expected to be json + // All messages are expected to be valid JSON objects if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) { _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message); diff --git a/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs b/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs index fc81e532d..399ac2be0 100644 --- a/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs +++ b/MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs @@ -362,6 +362,7 @@ namespace MediaBrowser.Api.Playback.Progressive var contentType = state.GetMimeType(outputPath); + // TODO: The isHeadRequest is only here because ServiceStack will add Content-Length=0 to the response // Headers only if (isHeadRequest) { -- cgit v1.2.3