diff --git a/frontend-nuxt/composables/useMapUpdates.ts b/frontend-nuxt/composables/useMapUpdates.ts index 89480b9..de179d4 100644 --- a/frontend-nuxt/composables/useMapUpdates.ts +++ b/frontend-nuxt/composables/useMapUpdates.ts @@ -38,6 +38,9 @@ export interface UseMapUpdatesReturn { const RECONNECT_INITIAL_MS = 1000 const RECONNECT_MAX_MS = 30000 +/** If no SSE message received for this long, treat connection as stale and reconnect. */ +const STALE_CONNECTION_MS = 65000 +const STALE_CHECK_INTERVAL_MS = 30000 export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesReturn { const { backendBase, layer, overlayLayer, map, getCurrentMapId, onMerge, connectionStateRef } = options @@ -50,6 +53,8 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet let batchScheduled = false let source: EventSource | null = null let reconnectTimeoutId: ReturnType | null = null + let staleCheckIntervalId: ReturnType | null = null + let lastMessageTime = 0 let reconnectDelayMs = RECONNECT_INITIAL_MS let destroyed = false @@ -79,15 +84,22 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet overlayLayer.cache[key] = u.T } const visible = getVisibleTileBounds() + // u.Z is backend storage zoom (0..5); visible.zoom is map zoom (1..6). With zoomReverse, current backend Z = maxZoom - mapZoom. + const currentBackendZ = visible ? layer.options.maxZoom - visible.zoom : null + let needRedraw = false for (const u of updates) { - if (visible && u.Z !== visible.zoom) continue + if (visible && currentBackendZ != null && u.Z !== currentBackendZ) continue if ( visible && (u.X < visible.minX || u.X > visible.maxX || u.Y < visible.minY || u.Y > visible.maxY) ) continue - if (layer.map === u.M) layer.refresh(u.X, u.Y, u.Z) - if (overlayLayer.map === u.M) overlayLayer.refresh(u.X, u.Y, u.Z) + if (layer.map === u.M && !layer.refresh(u.X, u.Y, u.Z)) needRedraw = true + if (overlayLayer.map === u.M && !overlayLayer.refresh(u.X, u.Y, u.Z)) needRedraw = true + } + if (needRedraw) { + layer.redraw() + overlayLayer.redraw() } } @@ -99,12 +111,30 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet function connect() { if (destroyed || !import.meta.client) return + if (staleCheckIntervalId != null) { + clearInterval(staleCheckIntervalId) + staleCheckIntervalId = null + } source = new EventSource(updatesUrl) if (connectionStateRef) connectionStateRef.value = 'connecting' source.onopen = () => { if (connectionStateRef) connectionStateRef.value = 'open' + lastMessageTime = Date.now() reconnectDelayMs = RECONNECT_INITIAL_MS + staleCheckIntervalId = setInterval(() => { + if (destroyed || !source) return + if (Date.now() - lastMessageTime > STALE_CONNECTION_MS) { + if (staleCheckIntervalId != null) { + clearInterval(staleCheckIntervalId) + staleCheckIntervalId = null + } + source.close() + source = null + if (connectionStateRef) connectionStateRef.value = 'error' + connect() + } + }, STALE_CHECK_INTERVAL_MS) } source.onerror = () => { @@ -121,6 +151,7 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet } source.onmessage = (event: MessageEvent) => { + lastMessageTime = Date.now() if (connectionStateRef) connectionStateRef.value = 'open' try { const raw: unknown = event?.data @@ -157,6 +188,10 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet function cleanup() { destroyed = true + if (staleCheckIntervalId != null) { + clearInterval(staleCheckIntervalId) + staleCheckIntervalId = null + } if (reconnectTimeoutId != null) { clearTimeout(reconnectTimeoutId) reconnectTimeoutId = null diff --git a/frontend-nuxt/lib/SmartTileLayer.ts b/frontend-nuxt/lib/SmartTileLayer.ts index cdaaf15..e2f7c5c 100644 --- a/frontend-nuxt/lib/SmartTileLayer.ts +++ b/frontend-nuxt/lib/SmartTileLayer.ts @@ -58,7 +58,7 @@ export const SmartTileLayer = L.TileLayer.extend({ return Util.template(this._url, Util.extend(data, this.options)) }, - refresh(x: number, y: number, z: number) { + refresh(x: number, y: number, z: number): boolean { let zoom = z const maxZoom = this.options.maxZoom const zoomReverse = this.options.zoomReverse @@ -71,19 +71,20 @@ export const SmartTileLayer = L.TileLayer.extend({ const key = `${x}:${y}:${zoom}` const tile = this._tiles[key] - if (!tile?.el) return + if (!tile?.el) return false const newUrl = this.getTrueTileUrl({ x, y }, z) - if (tile.el.dataset.tileUrl === newUrl) return + if (tile.el.dataset.tileUrl === newUrl) return true tile.el.dataset.tileUrl = newUrl tile.el.src = newUrl tile.el.classList.add('tile-fresh') const el = tile.el setTimeout(() => el.classList.remove('tile-fresh'), 400) + return true }, }) as unknown as new (urlTemplate: string, options?: L.TileLayerOptions) => L.TileLayer & { cache: SmartTileLayerCache invalidTile: string map: number getTrueTileUrl: (coords: { x: number; y: number }, zoom: number) => string - refresh: (x: number, y: number, z: number) => void + refresh: (x: number, y: number, z: number) => boolean } diff --git a/internal/app/app.go b/internal/app/app.go index 998c06c..1868dcb 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -26,9 +26,10 @@ const ( MultipartMaxMemory = 100 << 20 // 100 MB MergeMaxMemory = 500 << 20 // 500 MB ClientVersion = "4" - SSETickInterval = 5 * time.Second - SSETileChannelSize = 1000 - SSEMergeChannelSize = 5 + SSETickInterval = 1 * time.Second + SSEKeepaliveInterval = 30 * time.Second + SSETileChannelSize = 2000 + SSEMergeChannelSize = 5 ) // App is the main application (map server) state. diff --git a/internal/app/app_test.go b/internal/app/app_test.go index cea99e1..632d91a 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -93,16 +93,41 @@ func TestTopicClose(t *testing.T) { } } -func TestTopicDropsSlowSubscriber(t *testing.T) { +func TestTopicSkipsFullChannel(t *testing.T) { topic := &app.Topic[int]{} - slow := make(chan *int) // unbuffered, will block + slow := make(chan *int) // unbuffered, so Send will skip this subscriber + fast := make(chan *int, 10) topic.Watch(slow) + topic.Watch(fast) val := 42 - topic.Send(&val) // should drop the slow subscriber + topic.Send(&val) // slow is full (unbuffered), message dropped for slow only; fast receives + topic.Send(&val) + // Fast subscriber got both messages + for i := 0; i < 2; i++ { + select { + case got := <-fast: + if *got != 42 { + t.Fatalf("fast got %d", *got) + } + default: + t.Fatalf("expected fast to have message %d", i+1) + } + } + // Slow subscriber was skipped (channel full), not closed - channel still open and empty + select { + case _, ok := <-slow: + if !ok { + t.Fatal("slow channel should not be closed when subscriber is skipped") + } + t.Fatal("slow should have received no message") + default: + // slow is open and empty, which is correct + } + topic.Close() _, ok := <-slow if ok { - t.Fatal("expected slow subscriber channel to be closed") + t.Fatal("expected slow channel closed after topic.Close()") } } diff --git a/internal/app/handlers/tile.go b/internal/app/handlers/tile.go index da7b1a4..cd555cf 100644 --- a/internal/app/handlers/tile.go +++ b/internal/app/handlers/tile.go @@ -61,8 +61,17 @@ func (h *Handlers) WatchGridUpdates(rw http.ResponseWriter, req *http.Request) { ticker := time.NewTicker(app.SSETickInterval) defer ticker.Stop() + keepaliveTicker := time.NewTicker(app.SSEKeepaliveInterval) + defer keepaliveTicker.Stop() for { select { + case <-ctx.Done(): + return + case <-keepaliveTicker.C: + if _, err := fmt.Fprint(rw, ": keepalive\n\n"); err != nil { + return + } + flusher.Flush() case e, ok := <-c: if !ok { return @@ -99,7 +108,9 @@ func (h *Handlers) WatchGridUpdates(rw http.ResponseWriter, req *http.Request) { case <-ticker.C: raw, _ := json.Marshal(tileCache) fmt.Fprint(rw, "data: ") - _, _ = rw.Write(raw) + if _, err := rw.Write(raw); err != nil { + return + } fmt.Fprint(rw, "\n\n") tileCache = tileCache[:0] flusher.Flush() diff --git a/internal/app/topic.go b/internal/app/topic.go index 0a3f42e..8e33e18 100644 --- a/internal/app/topic.go +++ b/internal/app/topic.go @@ -15,7 +15,9 @@ func (t *Topic[T]) Watch(c chan *T) { t.c = append(t.c, c) } -// Send broadcasts to all subscribers. +// Send broadcasts to all subscribers. If a subscriber's channel is full, +// the message is dropped for that subscriber only; the subscriber is not +// removed, so the connection stays alive and later updates are still delivered. func (t *Topic[T]) Send(b *T) { t.mu.Lock() defer t.mu.Unlock() @@ -23,9 +25,7 @@ func (t *Topic[T]) Send(b *T) { select { case t.c[i] <- b: default: - close(t.c[i]) - t.c[i] = t.c[len(t.c)-1] - t.c = t.c[:len(t.c)-1] + // Channel full: drop this message for this subscriber, keep them subscribed } } }