Files
TelemetryMonitor/lib/decoder/decoder_isolate.dart
2026-04-21 19:38:20 -03:00

119 lines
3.4 KiB
Dart

import 'dart:async';
import 'dart:isolate';
import 'dart:typed_data';
import '../proto/messages.pb.dart';
import 'decoder_base.dart';
export 'decoder_base.dart';
/// Native decoder. Runs protobuf decoding in a worker isolate and batches
/// decoded envelopes back to the main isolate every [batchInterval] to
/// minimize SendPort overhead at 1 kHz.
class DecoderIsolate implements Decoder {
DecoderIsolate({this.batchInterval = const Duration(milliseconds: 8)});
final Duration batchInterval;
final StreamController<Envelope> _out = StreamController.broadcast();
Isolate? _isolate;
SendPort? _toIsolate;
ReceivePort? _fromIsolate;
bool _ready = false;
final List<Uint8List> _pending = [];
@override
Stream<Envelope> get envelopes => _out.stream;
@override
Future<void> start() async {
if (_isolate != null) return;
_fromIsolate = ReceivePort();
final completer = Completer<SendPort>();
_fromIsolate!.listen((dynamic message) {
if (message is SendPort) {
completer.complete(message);
} else if (message is List) {
// Batch of encoded envelopes (List<Uint8List>) sent back from the
// isolate. We could also send already-decoded objects, but Envelope
// is not a transferable type without copy, and re-decoding on the
// main isolate would defeat the purpose. So the isolate sends raw
// bytes of pre-validated envelopes — we trust them and decode here.
// A simpler model: send decoded envelopes via SendPort; protobuf
// objects survive the copy. We use that.
for (final item in message) {
if (item is Envelope) _out.add(item);
}
}
});
_isolate = await Isolate.spawn(
_isolateEntry,
_IsolateInit(
sendPort: _fromIsolate!.sendPort,
batchIntervalMs: batchInterval.inMilliseconds,
),
);
_toIsolate = await completer.future;
_ready = true;
// Drain anything queued before the isolate was ready.
for (final f in _pending) {
_toIsolate!.send(f);
}
_pending.clear();
}
@override
void feed(Uint8List frame) {
if (!_ready) {
_pending.add(frame);
return;
}
// SendPort copies the bytes. At 1 kHz this is ~150 KB/s, negligible.
_toIsolate!.send(frame);
}
@override
Future<void> dispose() async {
_isolate?.kill(priority: Isolate.immediate);
_isolate = null;
_fromIsolate?.close();
_fromIsolate = null;
await _out.close();
}
}
/// Public name for `DecoderIsolate` so consumers can write
/// `Decoder d = DecoderImpl(...)` regardless of platform.
class DecoderImpl extends DecoderIsolate {
DecoderImpl({super.batchInterval});
}
class _IsolateInit {
_IsolateInit({required this.sendPort, required this.batchIntervalMs});
final SendPort sendPort;
final int batchIntervalMs;
}
void _isolateEntry(_IsolateInit init) {
final inbox = ReceivePort();
init.sendPort.send(inbox.sendPort);
final List<Envelope> batch = [];
Timer.periodic(Duration(milliseconds: init.batchIntervalMs), (_) {
if (batch.isEmpty) return;
init.sendPort.send(List<Envelope>.from(batch));
batch.clear();
});
inbox.listen((dynamic message) {
if (message is Uint8List) {
try {
final env = Envelope.fromBuffer(message);
batch.add(env);
} catch (_) {
// Malformed packet — silently dropped. The session layer can
// observe this via gaps/PPS mismatch if needed.
}
}
});
}