From 03864b2a9a901cb9211c10396058c2132681f43a Mon Sep 17 00:00:00 2001 From: Gauvain Date: Sun, 14 Jun 2026 17:20:27 +0200 Subject: [PATCH] feat(home): refresh Continue Watching instantly via WebSocket (#1439) --- providers/WebSocketProvider.tsx | 230 +++++++++++++++++++++++++++----- 1 file changed, 194 insertions(+), 36 deletions(-) diff --git a/providers/WebSocketProvider.tsx b/providers/WebSocketProvider.tsx index ed9db754..c704d373 100644 --- a/providers/WebSocketProvider.tsx +++ b/providers/WebSocketProvider.tsx @@ -1,4 +1,5 @@ import { getSessionApi } from "@jellyfin/sdk/lib/utils/api"; +import { router } from "expo-router"; import { useAtomValue } from "jotai"; import { createContext, @@ -11,7 +12,6 @@ import { useState, } from "react"; import { AppState, type AppStateStatus } from "react-native"; -import useRouter from "@/hooks/useAppRouter"; import { useNetworkAwareQueryClient } from "@/hooks/useNetworkAwareQueryClient"; import { apiAtom, getOrSetDeviceId } from "@/providers/JellyfinProvider"; import { useNetworkStatus } from "@/providers/NetworkStatusProvider"; @@ -28,6 +28,20 @@ const LIBRARY_CHANGE_QUERY_KEYS = [ ["episodes"], ] as const; +// Query keys that depend on per-user playback state (resume position, played +// status, favorites) and should be refreshed when the server reports a +// `UserDataChanged`. Scoped to the progression-based sections so finishing an +// episode does not pointlessly refetch "recently added" or suggestions. +const USER_DATA_CHANGE_QUERY_KEYS = [ + ["home", "continueAndNextUp"], + ["home", "resumeItems"], + ["home", "nextUp-all"], + ["home", "heroItems"], + ["resumeItems"], + ["nextUp-all"], + ["nextUp"], +] as const; + interface WebSocketMessage { MessageType: string; Data: any; @@ -38,10 +52,30 @@ interface WebSocketProviderProps { children: ReactNode; } +/** + * Handler invoked for every message of a given `MessageType`. Receives the + * message `Data` payload and the full message. + */ +type WebSocketMessageHandler = (data: any, message: WebSocketMessage) => void; + interface WebSocketContextType { ws: WebSocket | null; isConnected: boolean; + /** + * @deprecated Prefer `subscribe`. `lastMessage` only keeps the most recent + * message, so bursts arriving in the same tick are coalesced and lost. Kept + * for `useWebsockets` (GeneralCommand handling) until it is migrated. + */ lastMessage: WebSocketMessage | null; + /** + * Subscribe to a given message type. The handler is called synchronously for + * every matching message (no coalescing, unlike `lastMessage`). Returns an + * unsubscribe function to call on cleanup. + */ + subscribe: ( + messageType: string, + handler: WebSocketMessageHandler, + ) => () => void; sendMessage: (message: any) => void; clearLastMessage: () => void; } @@ -54,7 +88,6 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { const [ws, setWs] = useState(null); const [isConnected, setIsConnected] = useState(false); const [lastMessage, setLastMessage] = useState(null); - const router = useRouter(); const queryClient = useNetworkAwareQueryClient(); const deviceId = useMemo(() => { return getOrSetDeviceId(); @@ -63,8 +96,76 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { const libraryChangeDebounceRef = useRef | null>( null, ); + const userDataChangeDebounceRef = useRef | null>(null); + // Handle for the onerror backoff timer. Tracked so a reconnect triggered by + // another path (foreground, network reconnect, effect re-run) can cancel a + // pending one — an untracked timer would later open a second socket. + const reconnectTimeoutRef = useRef | null>( + null, + ); + + // Pub/sub registry: messageType -> set of handlers. Stored in a ref so + // subscribing/dispatching never triggers a re-render. + const listenersRef = useRef>>( + new Map(), + ); + + const subscribe = useCallback( + (messageType: string, handler: WebSocketMessageHandler) => { + const listeners = listenersRef.current; + let handlers = listeners.get(messageType); + if (!handlers) { + handlers = new Set(); + listeners.set(messageType, handlers); + } + handlers.add(handler); + return () => { + handlers?.delete(handler); + // Only drop the map entry if it still points at THIS set. After an + // unsubscribe + re-subscribe for the same type, a stale second call to + // this cleanup would otherwise delete the new subscribers' set and + // silently stop delivering their messages. + if ( + handlers && + handlers.size === 0 && + listeners.get(messageType) === handlers + ) { + listeners.delete(messageType); + } + }; + }, + [], + ); + + const dispatchMessage = useCallback((message: WebSocketMessage) => { + const handlers = listenersRef.current.get(message.MessageType); + if (!handlers || handlers.size === 0) return; + // Copy to tolerate handlers that unsubscribe during dispatch. + for (const handler of [...handlers]) { + // Isolate each handler so one throwing subscriber can't abort the rest + // (and isn't misreported as a parse failure by the outer onmessage catch). + try { + handler(message.Data, message); + } catch (error) { + console.error( + `Error handling WebSocket message type "${message.MessageType}":`, + error, + ); + } + } + }, []); const connectWebSocket = useCallback(() => { + // Cancel any reconnect queued by a previous onerror before opening a new + // socket, so we never end up with two live sockets — each would double the + // message fan-out and double-invalidate queries. + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + if (!deviceId || !api?.accessToken || !isNetworkConnected) { return; } @@ -85,6 +186,10 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { newWebSocket.onopen = () => { setIsConnected(true); reconnectAttemptsRef.current = 0; + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } keepAliveInterval = setInterval(() => { if (newWebSocket.readyState === WebSocket.OPEN) { newWebSocket.send(JSON.stringify({ MessageType: "KeepAlive" })); @@ -96,9 +201,15 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { // Don't log errors - this is expected when offline or server unreachable setIsConnected(false); + // Replace any still-pending reconnect so only one is ever queued; the + // previously untracked handle could leak and open a second socket. + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + } if (reconnectAttemptsRef.current < maxReconnectAttempts) { reconnectAttemptsRef.current++; - setTimeout(() => { + reconnectTimeoutRef.current = setTimeout(() => { + reconnectTimeoutRef.current = null; connectWebSocket(); }, reconnectDelay); } @@ -113,7 +224,10 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { newWebSocket.onmessage = (e) => { try { const message = JSON.parse(e.data); - setLastMessage(message); // Store the last message in context + // Legacy single-slot state, still consumed by useWebsockets. + setLastMessage(message); + // Pub/sub: deliver to every subscriber without coalescing. + dispatchMessage(message); } catch (error) { console.error("Error parsing WebSocket message:", error); } @@ -124,9 +238,13 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { if (keepAliveInterval) { clearInterval(keepAliveInterval); } + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } newWebSocket.close(); }; - }, [api, deviceId, isNetworkConnected]); + }, [api, deviceId, isNetworkConnected, dispatchMessage]); const handleLibraryChanged = useCallback( (data: any) => { @@ -157,47 +275,80 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { [queryClient], ); - useEffect(() => { - if (!lastMessage) { - return; - } - if (lastMessage.MessageType === "Play") { - handlePlayCommand(lastMessage.Data); - } else if (lastMessage.MessageType === "LibraryChanged") { - handleLibraryChanged(lastMessage.Data); - } - }, [lastMessage, router, handleLibraryChanged]); + const handleUserDataChanged = useCallback( + (data: any) => { + // Jellyfin sends UserDataChanged when playback position, played status + // or favorites change (e.g. finishing an episode). Only the + // progression-based home sections care about it. + if (!((data?.UserDataList?.length ?? 0) > 0)) { + return; + } + + // Finishing an item can emit several UserDataChanged messages, so + // debounce to invalidate the affected sections only once. + if (userDataChangeDebounceRef.current) { + clearTimeout(userDataChangeDebounceRef.current); + } + userDataChangeDebounceRef.current = setTimeout(() => { + for (const queryKey of USER_DATA_CHANGE_QUERY_KEYS) { + queryClient.invalidateQueries({ queryKey: [...queryKey] }); + } + }, 800); + }, + [queryClient], + ); + + // Refresh library-dependent queries when the server reports a change. + useEffect( + () => subscribe("LibraryChanged", handleLibraryChanged), + [subscribe, handleLibraryChanged], + ); + + // Refresh "Continue Watching" / "Next Up" when playback state changes. + useEffect( + () => subscribe("UserDataChanged", handleUserDataChanged), + [subscribe, handleUserDataChanged], + ); useEffect(() => { return () => { if (libraryChangeDebounceRef.current) { clearTimeout(libraryChangeDebounceRef.current); } + if (userDataChangeDebounceRef.current) { + clearTimeout(userDataChangeDebounceRef.current); + } + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + } }; }, []); - const handlePlayCommand = useCallback( - (data: any) => { - if (!data?.ItemIds?.length) { - return; - } + const handlePlayCommand = useCallback((data: any) => { + if (!data?.ItemIds?.length) { + return; + } - const itemId = data.ItemIds[0]; + const itemId = data.ItemIds[0]; - router.push({ - pathname: "/(auth)/player/direct-player", - params: { - itemId: itemId, - playCommand: data.PlayCommand || "PlayNow", - audioIndex: data.AudioStreamIndex?.toString(), - subtitleIndex: data.SubtitleStreamIndex?.toString(), - mediaSourceId: data.MediaSourceId || "", - bitrateValue: "", - offline: "false", - }, - }); - }, - [router], + router.push({ + pathname: "/(auth)/player/direct-player", + params: { + itemId: itemId, + playCommand: data.PlayCommand || "PlayNow", + audioIndex: data.AudioStreamIndex?.toString(), + subtitleIndex: data.SubtitleStreamIndex?.toString(), + mediaSourceId: data.MediaSourceId || "", + bitrateValue: "", + offline: "false", + }, + }); + }, []); + + // Server-initiated "Play me this item" remote command. + useEffect( + () => subscribe("Play", handlePlayCommand), + [subscribe, handlePlayCommand], ); useEffect(() => { @@ -267,7 +418,14 @@ export const WebSocketProvider = ({ children }: WebSocketProviderProps) => { }, []); return ( {children}