Files
TelemetryMonitor/lib/transport/websocket_transport_base.dart
2026-04-21 19:38:20 -03:00

100 lines
2.9 KiB
Dart

import 'dart:async';
import 'dart:typed_data';
import 'package:flutter/foundation.dart';
import 'connection_state.dart';
/// Common base for the platform-specific WebSocket transports.
///
/// Owns reconnection with exponential backoff. Subclasses implement only the
/// platform-specific channel construction.
abstract class WebSocketTransportBase {
WebSocketTransportBase({
this.initialReconnectDelay = const Duration(milliseconds: 500),
this.maxReconnectDelay = const Duration(seconds: 30),
this.backoffFactor = 2.0,
});
Duration initialReconnectDelay;
Duration maxReconnectDelay;
double backoffFactor;
final ValueNotifier<WsConnectionState> _state =
ValueNotifier(WsConnectionState.disconnected);
ValueListenable<WsConnectionState> get state => _state;
final StreamController<Uint8List> _frames = StreamController.broadcast();
Stream<Uint8List> get frames => _frames.stream;
String? _url;
String? get currentUrl => _url;
bool _shouldReconnect = false;
Duration _nextReconnectDelay = const Duration(milliseconds: 500);
Timer? _reconnectTimer;
/// Connect to [url]. If a connection already exists it is closed first.
Future<void> connect(String url) async {
await disconnect();
_url = url;
_shouldReconnect = true;
_nextReconnectDelay = initialReconnectDelay;
await _openOnce();
}
/// Close the connection and stop reconnecting.
Future<void> disconnect() async {
_shouldReconnect = false;
_reconnectTimer?.cancel();
_reconnectTimer = null;
await closePlatform();
_state.value = WsConnectionState.disconnected;
}
/// Subclasses call this when bytes arrive.
@protected
void onBytes(Uint8List bytes) => _frames.add(bytes);
/// Subclasses call this when the underlying channel closes (with or without
/// error). Triggers reconnect if [_shouldReconnect] is true.
@protected
void onChannelClosed() {
if (!_shouldReconnect) {
_state.value = WsConnectionState.disconnected;
return;
}
_state.value = WsConnectionState.reconnecting;
_reconnectTimer?.cancel();
_reconnectTimer = Timer(_nextReconnectDelay, _openOnce);
final next = _nextReconnectDelay * backoffFactor;
_nextReconnectDelay = next > maxReconnectDelay ? maxReconnectDelay : next;
}
/// Subclass: open the platform-specific channel using [_url] and call
/// [onBytes]/[onChannelClosed] as appropriate.
@protected
Future<void> openPlatform(String url);
/// Subclass: close the platform-specific channel if open.
@protected
Future<void> closePlatform();
Future<void> _openOnce() async {
if (_url == null) return;
_state.value = WsConnectionState.connecting;
try {
await openPlatform(_url!);
_state.value = WsConnectionState.connected;
_nextReconnectDelay = initialReconnectDelay;
} catch (_) {
onChannelClosed();
}
}
Future<void> dispose() async {
await disconnect();
await _frames.close();
_state.dispose();
}
}