From 976459d3e8a8b889cebc2cf281e38b0fbc19c9b9 Mon Sep 17 00:00:00 2001 From: Bond_009 Date: Tue, 17 Dec 2019 23:15:02 +0100 Subject: Rewrite WebSocket handling code --- .../HttpServer/HttpListenerHost.cs | 160 ++++++++------- .../HttpServer/IHttpListener.cs | 40 ---- .../HttpServer/WebSocketConnection.cs | 215 ++++++++++----------- 3 files changed, 183 insertions(+), 232 deletions(-) delete mode 100644 Emby.Server.Implementations/HttpServer/IHttpListener.cs (limited to 'Emby.Server.Implementations/HttpServer') diff --git a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs index b0126f7fa..4baf96ab5 100644 --- a/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs +++ b/Emby.Server.Implementations/HttpServer/HttpListenerHost.cs @@ -7,11 +7,12 @@ using System.Diagnostics; using System.IO; using System.Linq; using System.Net.Sockets; +using System.Net.WebSockets; using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; using Emby.Server.Implementations.Services; +using Emby.Server.Implementations.SocketSharp; using MediaBrowser.Common.Extensions; using MediaBrowser.Common.Net; using MediaBrowser.Controller; @@ -21,9 +22,11 @@ using MediaBrowser.Model.Events; using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Services; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Extensions; using Microsoft.AspNetCore.WebUtilities; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Net.Http.Headers; using ServiceStack.Text.Jsv; namespace Emby.Server.Implementations.HttpServer @@ -31,18 +34,17 @@ namespace Emby.Server.Implementations.HttpServer public class HttpListenerHost : IHttpServer, IDisposable { private readonly ILogger _logger; + private readonly ILoggerFactory _loggerFactory; private readonly IServerConfigurationManager _config; private readonly INetworkManager _networkManager; private readonly IServerApplicationHost _appHost; private readonly IJsonSerializer _jsonSerializer; private readonly IXmlSerializer _xmlSerializer; - private readonly IHttpListener _socketListener; private readonly Func> _funcParseFn; private readonly string _defaultRedirectPath; private readonly string _baseUrlPrefix; private readonly Dictionary ServiceOperationsMap = new Dictionary(); private IWebSocketListener[] _webSocketListeners = Array.Empty(); - private readonly List _webSocketConnections = new List(); private bool _disposed = false; public HttpListenerHost( @@ -53,7 +55,7 @@ namespace Emby.Server.Implementations.HttpServer INetworkManager networkManager, IJsonSerializer jsonSerializer, IXmlSerializer xmlSerializer, - IHttpListener socketListener) + ILoggerFactory loggerFactory) { _appHost = applicationHost; _logger = logger; @@ -63,8 +65,7 @@ namespace Emby.Server.Implementations.HttpServer _networkManager = networkManager; _jsonSerializer = jsonSerializer; _xmlSerializer = xmlSerializer; - _socketListener = socketListener; - _socketListener.WebSocketConnected = OnWebSocketConnected; + _loggerFactory = loggerFactory; _funcParseFn = t => s => JsvReader.GetParseFn(t)(s); @@ -155,38 +156,6 @@ namespace Emby.Server.Implementations.HttpServer return attributes; } - private void OnWebSocketConnected(WebSocketConnectEventArgs e) - { - if (_disposed) - { - return; - } - - var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) - { - OnReceive = ProcessWebSocketMessageReceived, - Url = e.Url, - QueryString = e.QueryString - }; - - connection.Closed += OnConnectionClosed; - - lock (_webSocketConnections) - { - _webSocketConnections.Add(connection); - } - - WebSocketConnected?.Invoke(this, new GenericEventArgs(connection)); - } - - private void OnConnectionClosed(object sender, EventArgs e) - { - lock (_webSocketConnections) - { - _webSocketConnections.Remove((IWebSocketConnection)sender); - } - } - private static Exception GetActualException(Exception ex) { if (ex is AggregateException agg) @@ -274,32 +243,6 @@ namespace Emby.Server.Implementations.HttpServer return msg; } - /// - /// Shut down the Web Service - /// - public void Stop() - { - List connections; - - lock (_webSocketConnections) - { - connections = _webSocketConnections.ToList(); - _webSocketConnections.Clear(); - } - - foreach (var connection in connections) - { - try - { - connection.Dispose(); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error disposing connection"); - } - } - } - public static string RemoveQueryStringByKey(string url, string key) { var uri = new Uri(url); @@ -411,31 +354,47 @@ namespace Emby.Server.Implementations.HttpServer private bool ValidateSsl(string remoteIp, string urlString) { - if (_config.Configuration.RequireHttps && _appHost.EnableHttps && !_config.Configuration.IsBehindProxy) + if (_config.Configuration.RequireHttps + && _appHost.EnableHttps + && !_config.Configuration.IsBehindProxy + && !urlString.Contains("https://", StringComparison.OrdinalIgnoreCase)) { - if (urlString.IndexOf("https://", StringComparison.OrdinalIgnoreCase) == -1) + // These are hacks, but if these ever occur on ipv6 in the local network they could be incorrectly redirected + if (urlString.IndexOf("system/ping", StringComparison.OrdinalIgnoreCase) != -1 + || urlString.IndexOf("dlna/", StringComparison.OrdinalIgnoreCase) != -1) { - // These are hacks, but if these ever occur on ipv6 in the local network they could be incorrectly redirected - if (urlString.IndexOf("system/ping", StringComparison.OrdinalIgnoreCase) != -1 - || urlString.IndexOf("dlna/", StringComparison.OrdinalIgnoreCase) != -1) - { - return true; - } + return true; + } - if (!_networkManager.IsInLocalNetwork(remoteIp)) - { - return false; - } + if (!_networkManager.IsInLocalNetwork(remoteIp)) + { + return false; } } return true; } + /// + public Task RequestHandler(HttpContext context) + { + if (context.WebSockets.IsWebSocketRequest) + { + return WebSocketRequestHandler(context); + } + + var request = context.Request; + var response = context.Response; + var localPath = context.Request.Path.ToString(); + + var req = new WebSocketSharpRequest(request, response, request.Path, _logger); + return RequestHandler(req, request.GetDisplayUrl(), request.Host.ToString(), localPath, context.RequestAborted); + } + /// - /// Overridable method that can be used to implement a custom hnandler + /// Overridable method that can be used to implement a custom handler /// - public async Task RequestHandler(IHttpRequest httpReq, string urlString, string host, string localPath, CancellationToken cancellationToken) + private async Task RequestHandler(IHttpRequest httpReq, string urlString, string host, string localPath, CancellationToken cancellationToken) { var stopWatch = new Stopwatch(); stopWatch.Start(); @@ -552,6 +511,44 @@ namespace Emby.Server.Implementations.HttpServer } } + private async Task WebSocketRequestHandler(HttpContext context) + { + if (_disposed) + { + return; + } + + var url = context.Request.GetDisplayUrl(); + _logger.LogInformation("WS {Url}. UserAgent: {UserAgent}", url, context.Request.Headers[HeaderNames.UserAgent].ToString()); + + try + { + var webSocket = await context.WebSockets.AcceptWebSocketAsync(null).ConfigureAwait(false); + + var connection = new WebSocketConnection( + _loggerFactory.CreateLogger(), + webSocket, + context.Connection.RemoteIpAddress) + { + Url = url, + QueryString = context.Request.Query, + OnReceive = ProcessWebSocketMessageReceived + }; + + WebSocketConnected?.Invoke(this, new GenericEventArgs(connection)); + + await connection.ProcessAsync().ConfigureAwait(false); + } + catch (WebSocketException ex) + { + _logger.LogError(ex, "ProcessWebSocketRequest error"); + if (!context.Response.HasStarted) + { + context.Response.StatusCode = 500; + } + } + } + // Entry point for HttpListener public ServiceHandler GetServiceHandler(IHttpRequest httpReq) { @@ -661,11 +658,6 @@ namespace Emby.Server.Implementations.HttpServer return _jsonSerializer.DeserializeFromStreamAsync(stream, type); } - public Task ProcessWebSocketRequest(HttpContext context) - { - return _socketListener.ProcessWebSocketRequest(context); - } - private string NormalizeEmbyRoutePath(string path) { _logger.LogDebug("Normalizing /emby route"); @@ -697,7 +689,7 @@ namespace Emby.Server.Implementations.HttpServer if (disposing) { - Stop(); + // TODO: } _disposed = true; diff --git a/Emby.Server.Implementations/HttpServer/IHttpListener.cs b/Emby.Server.Implementations/HttpServer/IHttpListener.cs deleted file mode 100644 index 1c3496e5d..000000000 --- a/Emby.Server.Implementations/HttpServer/IHttpListener.cs +++ /dev/null @@ -1,40 +0,0 @@ -#pragma warning disable CS1591 -#pragma warning disable SA1600 - -using System; -using System.Threading; -using System.Threading.Tasks; -using Emby.Server.Implementations.Net; -using MediaBrowser.Model.Services; -using Microsoft.AspNetCore.Http; - -namespace Emby.Server.Implementations.HttpServer -{ - public interface IHttpListener : IDisposable - { - /// - /// Gets or sets the error handler. - /// - /// The error handler. - Func ErrorHandler { get; set; } - - /// - /// Gets or sets the request handler. - /// - /// The request handler. - Func RequestHandler { get; set; } - - /// - /// Gets or sets the web socket handler. - /// - /// The web socket handler. - Action WebSocketConnected { get; set; } - - /// - /// Stops this instance. - /// - Task Stop(); - - Task ProcessWebSocketRequest(HttpContext ctx); - } -} diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 2292d86a4..b4f420e5d 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -1,15 +1,16 @@ using System; +using System.Buffers; +using System.IO.Pipelines; +using System.Net; using System.Net.WebSockets; -using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Emby.Server.Implementations.Net; +using MediaBrowser.Common.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using UtfUnknown; namespace Emby.Server.Implementations.HttpServer { @@ -24,54 +25,46 @@ namespace Emby.Server.Implementations.HttpServer private readonly ILogger _logger; /// - /// The json serializer. + /// The json serializer options. /// - private readonly IJsonSerializer _jsonSerializer; + private readonly JsonSerializerOptions _jsonOptions; /// /// The socket. /// - private readonly IWebSocket _socket; + private readonly WebSocket _socket; + + private bool _disposed = false; /// /// Initializes a new instance of the class. /// /// The socket. /// The remote end point. - /// The json serializer. /// The logger. /// socket - public WebSocketConnection(IWebSocket socket, string remoteEndPoint, IJsonSerializer jsonSerializer, ILogger logger) + public WebSocketConnection(ILogger logger, WebSocket socket, IPAddress remoteEndPoint) { if (socket == null) { throw new ArgumentNullException(nameof(socket)); } - if (string.IsNullOrEmpty(remoteEndPoint)) + if (remoteEndPoint != null) { throw new ArgumentNullException(nameof(remoteEndPoint)); } - if (jsonSerializer == null) - { - throw new ArgumentNullException(nameof(jsonSerializer)); - } - if (logger == null) { throw new ArgumentNullException(nameof(logger)); } - Id = Guid.NewGuid(); - _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveBytes = OnReceiveInternal; - RemoteEndPoint = remoteEndPoint; _logger = logger; - socket.Closed += OnSocketClosed; + _jsonOptions = JsonDefaults.GetOptions(); } /// @@ -80,7 +73,7 @@ namespace Emby.Server.Implementations.HttpServer /// /// Gets or sets the remote end point. /// - public string RemoteEndPoint { get; private set; } + public IPAddress RemoteEndPoint { get; private set; } /// /// Gets or sets the receive action. @@ -94,12 +87,6 @@ namespace Emby.Server.Implementations.HttpServer /// The last activity date. public DateTime LastActivityDate { get; private set; } - /// - /// Gets the id. - /// - /// The id. - public Guid Id { get; private set; } - /// /// Gets or sets the URL. /// @@ -118,46 +105,78 @@ namespace Emby.Server.Implementations.HttpServer /// The state. public WebSocketState State => _socket.State; - void OnSocketClosed(object sender, EventArgs e) - { - Closed?.Invoke(this, EventArgs.Empty); - } - /// - /// Called when [receive]. + /// Sends a message asynchronously. /// - /// The bytes. - private void OnReceiveInternal(byte[] bytes) + /// + /// The message. + /// The cancellation token. + /// Task. + /// message + public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { - LastActivityDate = DateTime.UtcNow; - - if (OnReceive == null) + if (message == null) { - return; + throw new ArgumentNullException(nameof(message)); } - var charset = CharsetDetector.DetectFromBytes(bytes).Detected?.EncodingName; - if (string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)) + var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); + return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); + } + + /// + public async Task ProcessAsync(CancellationToken cancellationToken = default) + { + var pipe = new Pipe(); + var writer = pipe.Writer; + + ValueWebSocketReceiveResult receiveresult; + do { - OnReceiveInternal(Encoding.UTF8.GetString(bytes, 0, bytes.Length)); - } - else + // Allocate at least 512 bytes from the PipeWriter + Memory memory = writer.GetMemory(512); + + receiveresult = await _socket.ReceiveAsync(memory, cancellationToken); + int bytesRead = receiveresult.Count; + if (bytesRead == 0) + { + continue; + } + + // Tell the PipeWriter how much was read from the Socket + writer.Advance(bytesRead); + + // Make the data available to the PipeReader + FlushResult flushResult = await writer.FlushAsync(); + if (flushResult.IsCompleted) + { + // The PipeReader stopped reading + break; + } + + if (receiveresult.EndOfMessage) + { + await ProcessInternal(pipe.Reader).ConfigureAwait(false); + } + } while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close); + + if (_socket.State == WebSocketState.Open) { - OnReceiveInternal(Encoding.ASCII.GetString(bytes, 0, bytes.Length)); + await _socket.CloseAsync( + WebSocketCloseStatus.NormalClosure, + string.Empty, // REVIEW: human readable explanation as to why the connection is closed. + cancellationToken).ConfigureAwait(false); } + + Closed?.Invoke(this, EventArgs.Empty); + + _socket.Dispose(); } - private void OnReceiveInternal(string message) + private async Task ProcessInternal(PipeReader reader) { LastActivityDate = DateTime.UtcNow; - if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase)) - { - // This info is useful sometimes but also clogs up the log - _logger.LogDebug("Received web socket message that is not a json structure: {message}", message); - return; - } - if (OnReceive == null) { return; @@ -165,74 +184,47 @@ namespace Emby.Server.Implementations.HttpServer try { - var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); + var result = await reader.ReadAsync().ConfigureAwait(false); + if (!result.IsCompleted) + { + return; + } + + WebSocketMessage stub; + var buffer = result.Buffer; + if (buffer.IsSingleSegment) + { + stub = JsonSerializer.Deserialize>(buffer.FirstSpan, _jsonOptions); + } + else + { + var buf = ArrayPool.Shared.Rent(Convert.ToInt32(buffer.Length)); + try + { + buffer.CopyTo(buf); + stub = JsonSerializer.Deserialize>(buf, _jsonOptions); + } + finally + { + ArrayPool.Shared.Return(buf); + } + } var info = new WebSocketMessageInfo { MessageType = stub.MessageType, - Data = stub.Data?.ToString(), + Data = stub.Data.ToString(), Connection = this }; - OnReceive(info); + await OnReceive(info).ConfigureAwait(false); } - catch (Exception ex) + catch (JsonException ex) { _logger.LogError(ex, "Error processing web socket message"); } } - /// - /// Sends a message asynchronously. - /// - /// - /// The message. - /// The cancellation token. - /// Task. - /// message - public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var json = _jsonSerializer.SerializeToString(message); - - return SendAsync(json, cancellationToken); - } - - /// - /// Sends a message asynchronously. - /// - /// The buffer. - /// The cancellation token. - /// Task. - public Task SendAsync(byte[] buffer, CancellationToken cancellationToken) - { - if (buffer == null) - { - throw new ArgumentNullException(nameof(buffer)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(buffer, true, cancellationToken); - } - - /// - public Task SendAsync(string text, CancellationToken cancellationToken) - { - if (string.IsNullOrEmpty(text)) - { - throw new ArgumentNullException(nameof(text)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - return _socket.SendAsync(text, true, cancellationToken); - } - /// public void Dispose() { @@ -246,10 +238,17 @@ namespace Emby.Server.Implementations.HttpServer /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { + if (_disposed) + { + return; + } + if (dispose) { _socket.Dispose(); } + + _disposed = true; } } } -- cgit v1.2.3