118 lines
3.4 KiB
Dart
118 lines
3.4 KiB
Dart
import 'dart:async';
|
|
import 'dart:isolate';
|
|
import 'dart:typed_data';
|
|
|
|
import '../proto/messages.pb.dart';
|
|
import 'decoder_base.dart';
|
|
|
|
export 'decoder_base.dart';
|
|
|
|
/// Native decoder. Runs protobuf decoding in a worker isolate and batches
|
|
/// decoded envelopes back to the main isolate every [batchInterval] to
|
|
/// minimize SendPort overhead at 1 kHz.
|
|
class DecoderIsolate implements Decoder {
|
|
DecoderIsolate({this.batchInterval = const Duration(milliseconds: 8)});
|
|
|
|
final Duration batchInterval;
|
|
|
|
final StreamController<Envelope> _out = StreamController.broadcast();
|
|
Isolate? _isolate;
|
|
SendPort? _toIsolate;
|
|
ReceivePort? _fromIsolate;
|
|
bool _ready = false;
|
|
final List<Uint8List> _pending = [];
|
|
|
|
@override
|
|
Stream<Envelope> get envelopes => _out.stream;
|
|
|
|
Future<void> start() async {
|
|
if (_isolate != null) return;
|
|
_fromIsolate = ReceivePort();
|
|
final completer = Completer<SendPort>();
|
|
_fromIsolate!.listen((dynamic message) {
|
|
if (message is SendPort) {
|
|
completer.complete(message);
|
|
} else if (message is List) {
|
|
// Batch of encoded envelopes (List<Uint8List>) sent back from the
|
|
// isolate. We could also send already-decoded objects, but Envelope
|
|
// is not a transferable type without copy, and re-decoding on the
|
|
// main isolate would defeat the purpose. So the isolate sends raw
|
|
// bytes of pre-validated envelopes — we trust them and decode here.
|
|
// A simpler model: send decoded envelopes via SendPort; protobuf
|
|
// objects survive the copy. We use that.
|
|
for (final item in message) {
|
|
if (item is Envelope) _out.add(item);
|
|
}
|
|
}
|
|
});
|
|
_isolate = await Isolate.spawn(
|
|
_isolateEntry,
|
|
_IsolateInit(
|
|
sendPort: _fromIsolate!.sendPort,
|
|
batchIntervalMs: batchInterval.inMilliseconds,
|
|
),
|
|
);
|
|
_toIsolate = await completer.future;
|
|
_ready = true;
|
|
// Drain anything queued before the isolate was ready.
|
|
for (final f in _pending) {
|
|
_toIsolate!.send(f);
|
|
}
|
|
_pending.clear();
|
|
}
|
|
|
|
@override
|
|
void feed(Uint8List frame) {
|
|
if (!_ready) {
|
|
_pending.add(frame);
|
|
return;
|
|
}
|
|
// SendPort copies the bytes. At 1 kHz this is ~150 KB/s, negligible.
|
|
_toIsolate!.send(frame);
|
|
}
|
|
|
|
@override
|
|
Future<void> dispose() async {
|
|
_isolate?.kill(priority: Isolate.immediate);
|
|
_isolate = null;
|
|
_fromIsolate?.close();
|
|
_fromIsolate = null;
|
|
await _out.close();
|
|
}
|
|
}
|
|
|
|
/// Public name for `DecoderIsolate` so consumers can write
|
|
/// `Decoder d = DecoderImpl(...)` regardless of platform.
|
|
class DecoderImpl extends DecoderIsolate {
|
|
DecoderImpl({super.batchInterval});
|
|
}
|
|
|
|
class _IsolateInit {
|
|
_IsolateInit({required this.sendPort, required this.batchIntervalMs});
|
|
final SendPort sendPort;
|
|
final int batchIntervalMs;
|
|
}
|
|
|
|
void _isolateEntry(_IsolateInit init) {
|
|
final inbox = ReceivePort();
|
|
init.sendPort.send(inbox.sendPort);
|
|
|
|
final List<Envelope> batch = [];
|
|
Timer.periodic(Duration(milliseconds: init.batchIntervalMs), (_) {
|
|
if (batch.isEmpty) return;
|
|
init.sendPort.send(List<Envelope>.from(batch));
|
|
batch.clear();
|
|
});
|
|
|
|
inbox.listen((dynamic message) {
|
|
if (message is Uint8List) {
|
|
try {
|
|
final env = Envelope.fromBuffer(message);
|
|
batch.add(env);
|
|
} catch (_) {
|
|
// Malformed packet — silently dropped. The session layer can
|
|
// observe this via gaps/PPS mismatch if needed.
|
|
}
|
|
}
|
|
});
|
|
} |