mirror of
https://github.com/jellyfin/jellyfin.git
synced 2026-07-03 12:52:56 +01:00
Close sessions for lost WebSockets to prevent zombie SyncPlay groups (#17079)
Some checks failed
CodeQL / Analyze (csharp) (push) Has been cancelled
Format / format-check (push) Has been cancelled
Tests / run-tests (macos-latest) (push) Has been cancelled
Tests / run-tests (ubuntu-latest) (push) Has been cancelled
Tests / run-tests (windows-latest) (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Artifact (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Unstable Spec (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Stable Spec (push) Has been cancelled
Project Automation / Project board (push) Has been cancelled
Merge Conflict Labeler / main (push) Has been cancelled
Stale PR Check / Check PRs with merge conflicts (push) Has been cancelled
Some checks failed
CodeQL / Analyze (csharp) (push) Has been cancelled
Format / format-check (push) Has been cancelled
Tests / run-tests (macos-latest) (push) Has been cancelled
Tests / run-tests (ubuntu-latest) (push) Has been cancelled
Tests / run-tests (windows-latest) (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Artifact (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Unstable Spec (push) Has been cancelled
OpenAPI Publish / OpenAPI - Publish Stable Spec (push) Has been cancelled
Project Automation / Project board (push) Has been cancelled
Merge Conflict Labeler / main (push) Has been cancelled
Stale PR Check / Check PRs with merge conflicts (push) Has been cancelled
Close sessions for lost WebSockets to prevent zombie SyncPlay groups
This commit is contained in:
@@ -127,8 +127,12 @@ namespace Emby.Server.Implementations.HttpServer
|
||||
{
|
||||
receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (WebSocketException ex)
|
||||
catch (Exception ex) when (ex is WebSocketException or ObjectDisposedException or OperationCanceledException)
|
||||
{
|
||||
// ObjectDisposedException/OperationCanceledException: the socket was torn
|
||||
// down underneath us (e.g. by the keep-alive watchdog after the connection
|
||||
// was declared lost). Fall through so Closed is still raised and the
|
||||
// session can release this connection.
|
||||
_logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -246,8 +246,21 @@ namespace Emby.Server.Implementations.Session
|
||||
_logger.LogInformation("Lost {0} WebSockets.", lost.Count);
|
||||
foreach (var webSocket in lost)
|
||||
{
|
||||
// TODO: handle session relative to the lost webSocket
|
||||
RemoveWebSocket(webSocket);
|
||||
|
||||
// The connection stopped answering keep-alives, so a close frame will
|
||||
// never arrive and the pending receive loop would hang forever, keeping
|
||||
// the session (and e.g. its SyncPlay group membership) alive. Disposing
|
||||
// the connection aborts the receive loop, which raises Closed and lets
|
||||
// the session end normally.
|
||||
try
|
||||
{
|
||||
webSocket.Dispose();
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogWarning(exception, "Error disposing lost WebSocket from {RemoteEndPoint}.", webSocket.RemoteEndPoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Json;
|
||||
using System.Net.WebSockets;
|
||||
using System.Reflection;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Emby.Server.Implementations.Session;
|
||||
using Jellyfin.Api.Models.SyncPlayDtos;
|
||||
using Jellyfin.Extensions.Json;
|
||||
using MediaBrowser.Controller.Net;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Xunit;
|
||||
|
||||
namespace Jellyfin.Server.Integration.Tests;
|
||||
|
||||
public sealed class SyncPlayLostWebSocketTests : IClassFixture<JellyfinApplicationFactory>
|
||||
{
|
||||
private readonly JellyfinApplicationFactory _factory;
|
||||
|
||||
public SyncPlayLostWebSocketTests(JellyfinApplicationFactory factory)
|
||||
{
|
||||
_factory = factory;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task LostWebSocket_EndsSession_And_RemovesEmptySyncPlayGroup()
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
var client = _factory.CreateClient();
|
||||
var accessToken = await AuthHelper.CompleteStartupAsync(client);
|
||||
client.DefaultRequestHeaders.AddAuthHeader(accessToken);
|
||||
|
||||
var wsClient = _factory.Server.CreateWebSocketClient();
|
||||
wsClient.ConfigureRequest = request =>
|
||||
request.Headers.Authorization = AuthHelper.DummyAuthHeader + $", Token={accessToken}";
|
||||
|
||||
var webSocket = await wsClient.ConnectAsync(
|
||||
new UriBuilder(_factory.Server.BaseAddress)
|
||||
{
|
||||
Scheme = "ws",
|
||||
Path = "websocket"
|
||||
}.Uri,
|
||||
cancellationToken);
|
||||
|
||||
_ = DrainAsync(webSocket, cancellationToken);
|
||||
|
||||
var watched = await WaitForWatchedWebSocketsAsync(TimeSpan.FromSeconds(10), cancellationToken);
|
||||
var connection = Assert.Single(watched);
|
||||
|
||||
using var createResponse = await client.PostAsync(
|
||||
"SyncPlay/New",
|
||||
JsonContent.Create(new NewGroupRequestDto { GroupName = "ZombieGroupRepro" }, options: JsonDefaults.Options),
|
||||
cancellationToken);
|
||||
Assert.Equal(HttpStatusCode.OK, createResponse.StatusCode);
|
||||
Assert.Equal(1, await WaitForGroupCountAsync(client, 1, TimeSpan.FromSeconds(10), cancellationToken));
|
||||
|
||||
connection.LastKeepAliveDate = DateTime.UtcNow - TimeSpan.FromSeconds(180);
|
||||
|
||||
var groupCount = await WaitForGroupCountAsync(client, 0, TimeSpan.FromSeconds(45), cancellationToken);
|
||||
Assert.True(
|
||||
groupCount == 0,
|
||||
$"SyncPlay group still listed {groupCount} group(s) after the WebSocket was lost: "
|
||||
+ "the keep-alive watchdog removed the socket from its watchlist without closing "
|
||||
+ "the session, leaving a zombie participant in the group (SessionWebSocketListener).");
|
||||
}
|
||||
|
||||
private static async Task DrainAsync(WebSocket webSocket, CancellationToken cancellationToken)
|
||||
{
|
||||
var buffer = new byte[4096];
|
||||
try
|
||||
{
|
||||
while (webSocket.State == WebSocketState.Open)
|
||||
{
|
||||
await webSocket.ReceiveAsync(buffer, cancellationToken);
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// The server tears the connection down once the watchdog gives up on it.
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IReadOnlyList<IWebSocketConnection>> WaitForWatchedWebSocketsAsync(TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
var listener = _factory.Services.GetRequiredService<IEnumerable<IWebSocketListener>>()
|
||||
.OfType<SessionWebSocketListener>()
|
||||
.Single();
|
||||
var watchlistField = typeof(SessionWebSocketListener)
|
||||
.GetField("_webSockets", BindingFlags.NonPublic | BindingFlags.Instance);
|
||||
Assert.NotNull(watchlistField);
|
||||
var watchlist = (IEnumerable<IWebSocketConnection>)watchlistField.GetValue(listener)!;
|
||||
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
var snapshot = watchlist.ToArray();
|
||||
if (snapshot.Length > 0 || stopwatch.Elapsed >= timeout)
|
||||
{
|
||||
return snapshot;
|
||||
}
|
||||
}
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
// The watchdog mutated the set during enumeration; retry.
|
||||
}
|
||||
|
||||
await Task.Delay(100, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<int> WaitForGroupCountAsync(HttpClient client, int expected, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
var count = -1;
|
||||
while (stopwatch.Elapsed < timeout)
|
||||
{
|
||||
using var response = await client.GetAsync("SyncPlay/List", cancellationToken);
|
||||
response.EnsureSuccessStatusCode();
|
||||
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
|
||||
using var document = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken);
|
||||
count = document.RootElement.GetArrayLength();
|
||||
if (count == expected)
|
||||
{
|
||||
return count;
|
||||
}
|
||||
|
||||
await Task.Delay(500, cancellationToken);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user