117 lines
3.6 KiB
Dart
117 lines
3.6 KiB
Dart
import '../proto/messages.pb.dart';
|
|
|
|
/// Fixed-capacity ring buffer of DataPackets.
|
|
///
|
|
/// O(1) `add`, O(log N) `findIndexAtOrBefore` via binary search.
|
|
/// Iteration over a time range is O(K + log N) where K is the number of
|
|
/// packets in the range.
|
|
///
|
|
/// This is the single source of truth for sample data. The CSV exporter,
|
|
/// the chart decimator, and the per-frame status snapshot all read from
|
|
/// this buffer; no parallel storage exists.
|
|
class PacketBuffer {
|
|
PacketBuffer({required this.capacity}) : _storage = List.filled(capacity, null);
|
|
|
|
int capacity;
|
|
List<DataPacket?> _storage;
|
|
int _head = 0; // Next write index.
|
|
int _length = 0;
|
|
|
|
int get length => _length;
|
|
bool get isEmpty => _length == 0;
|
|
bool get isNotEmpty => _length > 0;
|
|
|
|
/// Index in storage of the oldest packet, or -1 if empty.
|
|
int get _firstIndex => _length == 0
|
|
? -1
|
|
: (_head - _length + capacity) % capacity;
|
|
|
|
/// Add a packet. If the buffer is full, the oldest packet is evicted.
|
|
///
|
|
/// Packets are expected to arrive monotonically by timestamp. Out-of-order
|
|
/// packets are still stored in arrival order; the buffer doesn't sort.
|
|
void add(DataPacket p) {
|
|
_storage[_head] = p;
|
|
_head = (_head + 1) % capacity;
|
|
if (_length < capacity) _length++;
|
|
}
|
|
|
|
void clear() {
|
|
_head = 0;
|
|
_length = 0;
|
|
for (var i = 0; i < _storage.length; i++) {
|
|
_storage[i] = null;
|
|
}
|
|
}
|
|
|
|
/// Resize to a new capacity. Preserves the most recent packets if shrinking.
|
|
void resize(int newCapacity) {
|
|
if (newCapacity == capacity) return;
|
|
final preserve = newCapacity < _length ? newCapacity : _length;
|
|
final newStorage = List<DataPacket?>.filled(newCapacity, null);
|
|
final start = (_head - preserve + capacity) % capacity;
|
|
for (var i = 0; i < preserve; i++) {
|
|
newStorage[i] = _storage[(start + i) % capacity];
|
|
}
|
|
_storage = newStorage;
|
|
capacity = newCapacity;
|
|
_head = preserve % newCapacity;
|
|
_length = preserve;
|
|
}
|
|
|
|
DataPacket? get oldest => _length == 0 ? null : _storage[_firstIndex];
|
|
DataPacket? get newest =>
|
|
_length == 0 ? null : _storage[(_head - 1 + capacity) % capacity];
|
|
|
|
/// Get the i-th packet in chronological order (0 = oldest).
|
|
DataPacket operator [](int i) {
|
|
assert(i >= 0 && i < _length);
|
|
return _storage[(_firstIndex + i) % capacity]!;
|
|
}
|
|
|
|
/// Binary search: largest index whose timestamp is ≤ [timestampUs].
|
|
/// Returns -1 if no such index. Packets without a timestamp are skipped.
|
|
int findIndexAtOrBefore(int timestampUs) {
|
|
if (_length == 0) return -1;
|
|
var lo = 0, hi = _length - 1, result = -1;
|
|
while (lo <= hi) {
|
|
final mid = (lo + hi) >> 1;
|
|
final ts = this[mid].timestampUs.toInt();
|
|
if (ts <= timestampUs) {
|
|
result = mid;
|
|
lo = mid + 1;
|
|
} else {
|
|
hi = mid - 1;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/// Binary search: smallest index whose timestamp is ≥ [timestampUs].
|
|
int findIndexAtOrAfter(int timestampUs) {
|
|
if (_length == 0) return -1;
|
|
var lo = 0, hi = _length - 1, result = -1;
|
|
while (lo <= hi) {
|
|
final mid = (lo + hi) >> 1;
|
|
final ts = this[mid].timestampUs.toInt();
|
|
if (ts >= timestampUs) {
|
|
result = mid;
|
|
hi = mid - 1;
|
|
} else {
|
|
lo = mid + 1;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
/// Iterate packets in `[startUs, endUs]` (inclusive).
|
|
Iterable<DataPacket> iterateRange(int startUs, int endUs) sync* {
|
|
final start = findIndexAtOrAfter(startUs);
|
|
if (start == -1) return;
|
|
for (var i = start; i < _length; i++) {
|
|
final p = this[i];
|
|
if (p.timestampUs.toInt() > endUs) break;
|
|
yield p;
|
|
}
|
|
}
|
|
} |