Files
TelemetryMonitor/lib/session/session_controller.dart

177 lines
4.9 KiB
Dart

import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter/scheduler.dart';
import '../config/settings.dart';
import '../decoder/decoder.dart';
import '../proto/messages.pb.dart';
import '../transport/connection_state.dart';
import '../transport/websocket_transport.dart';
import 'decimator.dart';
import 'log_buffer.dart';
import 'packet_buffer.dart';
import 'pps_counter.dart';
import 'status_snapshot.dart';
import 'view_state.dart';
/// Central state owner. Wires transport → decoder → buffers → notifiers.
///
/// Lifecycle: construct → call `start()` → use → call `dispose()`.
class SessionController {
SessionController({
required this.transport,
required this.decoder,
required this.settings,
}) : packets = PacketBuffer(capacity: settings.packetBufferCapacity),
logs = LogBuffer(capacity: settings.logBufferCapacity),
viewState = ViewState();
final WebSocketTransport transport;
final Decoder decoder;
final Settings settings;
final PacketBuffer packets;
final LogBuffer logs;
final ViewState viewState;
final Decimator decimator = Decimator();
final PpsCounter _pps = PpsCounter();
final ValueNotifier<int> _frameTick = ValueNotifier(0);
final ValueNotifier<int> _logTick = ValueNotifier(0);
final ValueNotifier<StatusSnapshot> _snapshot =
ValueNotifier(StatusSnapshot.empty());
final ValueNotifier<bool> cursorEnabled = ValueNotifier(false);
ValueListenable<int> get frameTick => _frameTick;
ValueListenable<int> get logTick => _logTick;
ValueListenable<StatusSnapshot> get statusSnapshot => _snapshot;
ValueListenable<WsConnectionState> get connectionState => transport.state;
StreamSubscription<Envelope>? _envSub;
Ticker? _ticker;
Future<void> start() async {
_envSub = decoder.envelopes.listen(_onEnvelope);
transport.frames.listen((bytes) => decoder.feed(bytes));
_ticker = Ticker(_onTick)..start();
await transport.connect(settings.wsUrl);
}
void _onEnvelope(Envelope env) {
if (env.hasData()) {
packets.add(env.data);
_pps.recordArrival();
decimator.invalidateTail();
} else if (env.hasLog()) {
logs.add(env.log);
_logTick.value++;
}
}
void _onTick(Duration elapsed) {
_snapshot.value = _computeSnapshot();
_frameTick.value++;
}
StatusSnapshot _computeSnapshot() {
final lookback = settings.statusLookback;
final statusValues = List<int?>.filled(8, null);
bool? protoPaused;
int resolved = 0;
// Walk backward from newest, up to `lookback` packets, stop when all 9
// fields are resolved.
final n = packets.length;
final scan = lookback < n ? lookback : n;
for (var i = 0; i < scan && resolved < 9; i++) {
final p = packets[n - 1 - i];
for (var s = 0; s < 8; s++) {
if (statusValues[s] == null && _hasStatus(p, s + 1)) {
statusValues[s] = _getStatus(p, s + 1);
resolved++;
}
}
if (protoPaused == null && p.hasPause()) {
protoPaused = p.pause;
resolved++;
}
}
return StatusSnapshot(
connection: transport.state.value,
pps: _pps.current(),
statusValues: statusValues,
protoPaused: protoPaused,
);
}
bool get isPaused =>
viewState.userPaused || (_snapshot.value.protoPaused ?? false);
PauseSource get pauseSource {
final u = viewState.userPaused;
final p = _snapshot.value.protoPaused ?? false;
if (u && p) return PauseSource.both;
if (u) return PauseSource.user;
if (p) return PauseSource.proto;
return PauseSource.none;
}
void clearAll() {
packets.clear();
logs.clear();
decimator.clear();
_pps.reset();
viewState.goLive();
_logTick.value++;
}
Future<void> reconnect() async {
await transport.connect(settings.wsUrl);
}
Future<void> dispose() async {
_ticker?.dispose();
_ticker = null;
await _envSub?.cancel();
_envSub = null;
_frameTick.dispose();
_logTick.dispose();
_snapshot.dispose();
cursorEnabled.dispose();
await decoder.dispose();
await transport.dispose();
viewState.dispose();
}
// ---- field accessors (mirrored from Decimator for status fields) ----
static bool _hasStatus(DataPacket p, int idx) {
switch (idx) {
case 1: return p.hasStatus1();
case 2: return p.hasStatus2();
case 3: return p.hasStatus3();
case 4: return p.hasStatus4();
case 5: return p.hasStatus5();
case 6: return p.hasStatus6();
case 7: return p.hasStatus7();
case 8: return p.hasStatus8();
default: return false;
}
}
static int _getStatus(DataPacket p, int idx) {
switch (idx) {
case 1: return p.status1;
case 2: return p.status2;
case 3: return p.status3;
case 4: return p.status4;
case 5: return p.status5;
case 6: return p.status6;
case 7: return p.status7;
case 8: return p.status8;
default: return 0;
}
}
}