Enhance map update handling and connection stability

- Introduced mechanisms to detect stale connections in the map updates, allowing for automatic reconnection if no messages are received within a specified timeframe.
- Updated the `refresh` method in `SmartTileLayer` to return a boolean indicating whether the tile was refreshed, improving the handling of tile updates.
- Enhanced the `Send` method in the `Topic` struct to drop messages for full subscribers while keeping them subscribed, ensuring continuous delivery of future updates.
- Added a keepalive mechanism in the `WatchGridUpdates` handler to maintain the connection and prevent timeouts.
This commit is contained in:
2026-03-04 21:29:53 +03:00
parent 179357bc93
commit dc53b79d84
6 changed files with 92 additions and 19 deletions

View File

@@ -38,6 +38,9 @@ export interface UseMapUpdatesReturn {
const RECONNECT_INITIAL_MS = 1000 const RECONNECT_INITIAL_MS = 1000
const RECONNECT_MAX_MS = 30000 const RECONNECT_MAX_MS = 30000
/** If no SSE message received for this long, treat connection as stale and reconnect. */
const STALE_CONNECTION_MS = 65000
const STALE_CHECK_INTERVAL_MS = 30000
export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesReturn { export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesReturn {
const { backendBase, layer, overlayLayer, map, getCurrentMapId, onMerge, connectionStateRef } = options const { backendBase, layer, overlayLayer, map, getCurrentMapId, onMerge, connectionStateRef } = options
@@ -50,6 +53,8 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet
let batchScheduled = false let batchScheduled = false
let source: EventSource | null = null let source: EventSource | null = null
let reconnectTimeoutId: ReturnType<typeof setTimeout> | null = null let reconnectTimeoutId: ReturnType<typeof setTimeout> | null = null
let staleCheckIntervalId: ReturnType<typeof setInterval> | null = null
let lastMessageTime = 0
let reconnectDelayMs = RECONNECT_INITIAL_MS let reconnectDelayMs = RECONNECT_INITIAL_MS
let destroyed = false let destroyed = false
@@ -79,15 +84,22 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet
overlayLayer.cache[key] = u.T overlayLayer.cache[key] = u.T
} }
const visible = getVisibleTileBounds() const visible = getVisibleTileBounds()
// u.Z is backend storage zoom (0..5); visible.zoom is map zoom (1..6). With zoomReverse, current backend Z = maxZoom - mapZoom.
const currentBackendZ = visible ? layer.options.maxZoom - visible.zoom : null
let needRedraw = false
for (const u of updates) { for (const u of updates) {
if (visible && u.Z !== visible.zoom) continue if (visible && currentBackendZ != null && u.Z !== currentBackendZ) continue
if ( if (
visible && visible &&
(u.X < visible.minX || u.X > visible.maxX || u.Y < visible.minY || u.Y > visible.maxY) (u.X < visible.minX || u.X > visible.maxX || u.Y < visible.minY || u.Y > visible.maxY)
) )
continue continue
if (layer.map === u.M) layer.refresh(u.X, u.Y, u.Z) if (layer.map === u.M && !layer.refresh(u.X, u.Y, u.Z)) needRedraw = true
if (overlayLayer.map === u.M) overlayLayer.refresh(u.X, u.Y, u.Z) if (overlayLayer.map === u.M && !overlayLayer.refresh(u.X, u.Y, u.Z)) needRedraw = true
}
if (needRedraw) {
layer.redraw()
overlayLayer.redraw()
} }
} }
@@ -99,12 +111,30 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet
function connect() { function connect() {
if (destroyed || !import.meta.client) return if (destroyed || !import.meta.client) return
if (staleCheckIntervalId != null) {
clearInterval(staleCheckIntervalId)
staleCheckIntervalId = null
}
source = new EventSource(updatesUrl) source = new EventSource(updatesUrl)
if (connectionStateRef) connectionStateRef.value = 'connecting' if (connectionStateRef) connectionStateRef.value = 'connecting'
source.onopen = () => { source.onopen = () => {
if (connectionStateRef) connectionStateRef.value = 'open' if (connectionStateRef) connectionStateRef.value = 'open'
lastMessageTime = Date.now()
reconnectDelayMs = RECONNECT_INITIAL_MS reconnectDelayMs = RECONNECT_INITIAL_MS
staleCheckIntervalId = setInterval(() => {
if (destroyed || !source) return
if (Date.now() - lastMessageTime > STALE_CONNECTION_MS) {
if (staleCheckIntervalId != null) {
clearInterval(staleCheckIntervalId)
staleCheckIntervalId = null
}
source.close()
source = null
if (connectionStateRef) connectionStateRef.value = 'error'
connect()
}
}, STALE_CHECK_INTERVAL_MS)
} }
source.onerror = () => { source.onerror = () => {
@@ -121,6 +151,7 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet
} }
source.onmessage = (event: MessageEvent) => { source.onmessage = (event: MessageEvent) => {
lastMessageTime = Date.now()
if (connectionStateRef) connectionStateRef.value = 'open' if (connectionStateRef) connectionStateRef.value = 'open'
try { try {
const raw: unknown = event?.data const raw: unknown = event?.data
@@ -157,6 +188,10 @@ export function startMapUpdates(options: UseMapUpdatesOptions): UseMapUpdatesRet
function cleanup() { function cleanup() {
destroyed = true destroyed = true
if (staleCheckIntervalId != null) {
clearInterval(staleCheckIntervalId)
staleCheckIntervalId = null
}
if (reconnectTimeoutId != null) { if (reconnectTimeoutId != null) {
clearTimeout(reconnectTimeoutId) clearTimeout(reconnectTimeoutId)
reconnectTimeoutId = null reconnectTimeoutId = null

View File

@@ -58,7 +58,7 @@ export const SmartTileLayer = L.TileLayer.extend({
return Util.template(this._url, Util.extend(data, this.options)) return Util.template(this._url, Util.extend(data, this.options))
}, },
refresh(x: number, y: number, z: number) { refresh(x: number, y: number, z: number): boolean {
let zoom = z let zoom = z
const maxZoom = this.options.maxZoom const maxZoom = this.options.maxZoom
const zoomReverse = this.options.zoomReverse const zoomReverse = this.options.zoomReverse
@@ -71,19 +71,20 @@ export const SmartTileLayer = L.TileLayer.extend({
const key = `${x}:${y}:${zoom}` const key = `${x}:${y}:${zoom}`
const tile = this._tiles[key] const tile = this._tiles[key]
if (!tile?.el) return if (!tile?.el) return false
const newUrl = this.getTrueTileUrl({ x, y }, z) const newUrl = this.getTrueTileUrl({ x, y }, z)
if (tile.el.dataset.tileUrl === newUrl) return if (tile.el.dataset.tileUrl === newUrl) return true
tile.el.dataset.tileUrl = newUrl tile.el.dataset.tileUrl = newUrl
tile.el.src = newUrl tile.el.src = newUrl
tile.el.classList.add('tile-fresh') tile.el.classList.add('tile-fresh')
const el = tile.el const el = tile.el
setTimeout(() => el.classList.remove('tile-fresh'), 400) setTimeout(() => el.classList.remove('tile-fresh'), 400)
return true
}, },
}) as unknown as new (urlTemplate: string, options?: L.TileLayerOptions) => L.TileLayer & { }) as unknown as new (urlTemplate: string, options?: L.TileLayerOptions) => L.TileLayer & {
cache: SmartTileLayerCache cache: SmartTileLayerCache
invalidTile: string invalidTile: string
map: number map: number
getTrueTileUrl: (coords: { x: number; y: number }, zoom: number) => string getTrueTileUrl: (coords: { x: number; y: number }, zoom: number) => string
refresh: (x: number, y: number, z: number) => void refresh: (x: number, y: number, z: number) => boolean
} }

View File

@@ -26,9 +26,10 @@ const (
MultipartMaxMemory = 100 << 20 // 100 MB MultipartMaxMemory = 100 << 20 // 100 MB
MergeMaxMemory = 500 << 20 // 500 MB MergeMaxMemory = 500 << 20 // 500 MB
ClientVersion = "4" ClientVersion = "4"
SSETickInterval = 5 * time.Second SSETickInterval = 1 * time.Second
SSETileChannelSize = 1000 SSEKeepaliveInterval = 30 * time.Second
SSEMergeChannelSize = 5 SSETileChannelSize = 2000
SSEMergeChannelSize = 5
) )
// App is the main application (map server) state. // App is the main application (map server) state.

View File

@@ -93,16 +93,41 @@ func TestTopicClose(t *testing.T) {
} }
} }
func TestTopicDropsSlowSubscriber(t *testing.T) { func TestTopicSkipsFullChannel(t *testing.T) {
topic := &app.Topic[int]{} topic := &app.Topic[int]{}
slow := make(chan *int) // unbuffered, will block slow := make(chan *int) // unbuffered, so Send will skip this subscriber
fast := make(chan *int, 10)
topic.Watch(slow) topic.Watch(slow)
topic.Watch(fast)
val := 42 val := 42
topic.Send(&val) // should drop the slow subscriber topic.Send(&val) // slow is full (unbuffered), message dropped for slow only; fast receives
topic.Send(&val)
// Fast subscriber got both messages
for i := 0; i < 2; i++ {
select {
case got := <-fast:
if *got != 42 {
t.Fatalf("fast got %d", *got)
}
default:
t.Fatalf("expected fast to have message %d", i+1)
}
}
// Slow subscriber was skipped (channel full), not closed - channel still open and empty
select {
case _, ok := <-slow:
if !ok {
t.Fatal("slow channel should not be closed when subscriber is skipped")
}
t.Fatal("slow should have received no message")
default:
// slow is open and empty, which is correct
}
topic.Close()
_, ok := <-slow _, ok := <-slow
if ok { if ok {
t.Fatal("expected slow subscriber channel to be closed") t.Fatal("expected slow channel closed after topic.Close()")
} }
} }

View File

@@ -61,8 +61,17 @@ func (h *Handlers) WatchGridUpdates(rw http.ResponseWriter, req *http.Request) {
ticker := time.NewTicker(app.SSETickInterval) ticker := time.NewTicker(app.SSETickInterval)
defer ticker.Stop() defer ticker.Stop()
keepaliveTicker := time.NewTicker(app.SSEKeepaliveInterval)
defer keepaliveTicker.Stop()
for { for {
select { select {
case <-ctx.Done():
return
case <-keepaliveTicker.C:
if _, err := fmt.Fprint(rw, ": keepalive\n\n"); err != nil {
return
}
flusher.Flush()
case e, ok := <-c: case e, ok := <-c:
if !ok { if !ok {
return return
@@ -99,7 +108,9 @@ func (h *Handlers) WatchGridUpdates(rw http.ResponseWriter, req *http.Request) {
case <-ticker.C: case <-ticker.C:
raw, _ := json.Marshal(tileCache) raw, _ := json.Marshal(tileCache)
fmt.Fprint(rw, "data: ") fmt.Fprint(rw, "data: ")
_, _ = rw.Write(raw) if _, err := rw.Write(raw); err != nil {
return
}
fmt.Fprint(rw, "\n\n") fmt.Fprint(rw, "\n\n")
tileCache = tileCache[:0] tileCache = tileCache[:0]
flusher.Flush() flusher.Flush()

View File

@@ -15,7 +15,9 @@ func (t *Topic[T]) Watch(c chan *T) {
t.c = append(t.c, c) t.c = append(t.c, c)
} }
// Send broadcasts to all subscribers. // Send broadcasts to all subscribers. If a subscriber's channel is full,
// the message is dropped for that subscriber only; the subscriber is not
// removed, so the connection stays alive and later updates are still delivered.
func (t *Topic[T]) Send(b *T) { func (t *Topic[T]) Send(b *T) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@@ -23,9 +25,7 @@ func (t *Topic[T]) Send(b *T) {
select { select {
case t.c[i] <- b: case t.c[i] <- b:
default: default:
close(t.c[i]) // Channel full: drop this message for this subscriber, keep them subscribed
t.c[i] = t.c[len(t.c)-1]
t.c = t.c[:len(t.c)-1]
} }
} }
} }