diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index e9bf3b93a7..dc7f972c13 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -127,8 +127,12 @@ namespace Emby.Server.Implementations.HttpServer { receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); } - catch (WebSocketException ex) + catch (Exception ex) when (ex is WebSocketException or ObjectDisposedException or OperationCanceledException) { + // ObjectDisposedException/OperationCanceledException: the socket was torn + // down underneath us (e.g. by the keep-alive watchdog after the connection + // was declared lost). Fall through so Closed is still raised and the + // session can release this connection. _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); break; } diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index 6a26e92e14..2582ed9df0 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -246,8 +246,21 @@ namespace Emby.Server.Implementations.Session _logger.LogInformation("Lost {0} WebSockets.", lost.Count); foreach (var webSocket in lost) { - // TODO: handle session relative to the lost webSocket RemoveWebSocket(webSocket); + + // The connection stopped answering keep-alives, so a close frame will + // never arrive and the pending receive loop would hang forever, keeping + // the session (and e.g. its SyncPlay group membership) alive. Disposing + // the connection aborts the receive loop, which raises Closed and lets + // the session end normally. + try + { + webSocket.Dispose(); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error disposing lost WebSocket from {RemoteEndPoint}.", webSocket.RemoteEndPoint); + } } } } diff --git a/tests/Jellyfin.Server.Integration.Tests/SyncPlayLostWebSocketTests.cs b/tests/Jellyfin.Server.Integration.Tests/SyncPlayLostWebSocketTests.cs new file mode 100644 index 0000000000..fa15b33af6 --- /dev/null +++ b/tests/Jellyfin.Server.Integration.Tests/SyncPlayLostWebSocketTests.cs @@ -0,0 +1,140 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Json; +using System.Net.WebSockets; +using System.Reflection; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Emby.Server.Implementations.Session; +using Jellyfin.Api.Models.SyncPlayDtos; +using Jellyfin.Extensions.Json; +using MediaBrowser.Controller.Net; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Jellyfin.Server.Integration.Tests; + +public sealed class SyncPlayLostWebSocketTests : IClassFixture +{ + private readonly JellyfinApplicationFactory _factory; + + public SyncPlayLostWebSocketTests(JellyfinApplicationFactory factory) + { + _factory = factory; + } + + [Fact] + public async Task LostWebSocket_EndsSession_And_RemovesEmptySyncPlayGroup() + { + var cancellationToken = TestContext.Current.CancellationToken; + var client = _factory.CreateClient(); + var accessToken = await AuthHelper.CompleteStartupAsync(client); + client.DefaultRequestHeaders.AddAuthHeader(accessToken); + + var wsClient = _factory.Server.CreateWebSocketClient(); + wsClient.ConfigureRequest = request => + request.Headers.Authorization = AuthHelper.DummyAuthHeader + $", Token={accessToken}"; + + var webSocket = await wsClient.ConnectAsync( + new UriBuilder(_factory.Server.BaseAddress) + { + Scheme = "ws", + Path = "websocket" + }.Uri, + cancellationToken); + + _ = DrainAsync(webSocket, cancellationToken); + + var watched = await WaitForWatchedWebSocketsAsync(TimeSpan.FromSeconds(10), cancellationToken); + var connection = Assert.Single(watched); + + using var createResponse = await client.PostAsync( + "SyncPlay/New", + JsonContent.Create(new NewGroupRequestDto { GroupName = "ZombieGroupRepro" }, options: JsonDefaults.Options), + cancellationToken); + Assert.Equal(HttpStatusCode.OK, createResponse.StatusCode); + Assert.Equal(1, await WaitForGroupCountAsync(client, 1, TimeSpan.FromSeconds(10), cancellationToken)); + + connection.LastKeepAliveDate = DateTime.UtcNow - TimeSpan.FromSeconds(180); + + var groupCount = await WaitForGroupCountAsync(client, 0, TimeSpan.FromSeconds(45), cancellationToken); + Assert.True( + groupCount == 0, + $"SyncPlay group still listed {groupCount} group(s) after the WebSocket was lost: " + + "the keep-alive watchdog removed the socket from its watchlist without closing " + + "the session, leaving a zombie participant in the group (SessionWebSocketListener)."); + } + + private static async Task DrainAsync(WebSocket webSocket, CancellationToken cancellationToken) + { + var buffer = new byte[4096]; + try + { + while (webSocket.State == WebSocketState.Open) + { + await webSocket.ReceiveAsync(buffer, cancellationToken); + } + } + catch + { + // The server tears the connection down once the watchdog gives up on it. + } + } + + private async Task> WaitForWatchedWebSocketsAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + var listener = _factory.Services.GetRequiredService>() + .OfType() + .Single(); + var watchlistField = typeof(SessionWebSocketListener) + .GetField("_webSockets", BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(watchlistField); + var watchlist = (IEnumerable)watchlistField.GetValue(listener)!; + + var stopwatch = Stopwatch.StartNew(); + while (true) + { + try + { + var snapshot = watchlist.ToArray(); + if (snapshot.Length > 0 || stopwatch.Elapsed >= timeout) + { + return snapshot; + } + } + catch (InvalidOperationException) + { + // The watchdog mutated the set during enumeration; retry. + } + + await Task.Delay(100, cancellationToken); + } + } + + private static async Task WaitForGroupCountAsync(HttpClient client, int expected, TimeSpan timeout, CancellationToken cancellationToken) + { + var stopwatch = Stopwatch.StartNew(); + var count = -1; + while (stopwatch.Elapsed < timeout) + { + using var response = await client.GetAsync("SyncPlay/List", cancellationToken); + response.EnsureSuccessStatusCode(); + await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken); + using var document = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken); + count = document.RootElement.GetArrayLength(); + if (count == expected) + { + return count; + } + + await Task.Delay(500, cancellationToken); + } + + return count; + } +}