Fix disconnections every 20sec and fix Y zoom
This commit is contained in:
@@ -33,7 +33,20 @@ class ViewState extends ChangeNotifier {
|
||||
notifyListeners();
|
||||
}
|
||||
|
||||
void togglePause() => userPaused = !_userPaused;
|
||||
/// Toggle the user pause flag AND snap the X window accordingly: pausing
|
||||
/// freezes the view at `newestUs - width` (absolute anchor); resuming
|
||||
/// returns to followLive.
|
||||
void togglePause({required int newestUs}) {
|
||||
if (_userPaused) {
|
||||
_userPaused = false;
|
||||
_anchorMode = ViewAnchorMode.followLive;
|
||||
} else {
|
||||
_userPaused = true;
|
||||
_absoluteStartUs = newestUs - _windowWidth.inMicroseconds;
|
||||
_anchorMode = ViewAnchorMode.absolute;
|
||||
}
|
||||
notifyListeners();
|
||||
}
|
||||
|
||||
/// Compute the visible time range given current state.
|
||||
///
|
||||
|
||||
@@ -79,6 +79,11 @@ class ChartPainter extends CustomPainter {
|
||||
double yForVal(double v) =>
|
||||
padTop + plotH * (1 - (v - yMin) / ySpan);
|
||||
|
||||
// Everything that draws into the plot area must stay within it; a
|
||||
// Y-zoom can push line segments outside [padTop, padTop+plotH].
|
||||
canvas.save();
|
||||
canvas.clipRect(Rect.fromLTWH(padLeft, padTop, plotW, plotH));
|
||||
|
||||
// Hatched gap rectangles.
|
||||
final hatchPaint = Paint()
|
||||
..color = const Color(0x55993C1D)
|
||||
@@ -134,6 +139,8 @@ class ChartPainter extends CustomPainter {
|
||||
);
|
||||
}
|
||||
|
||||
canvas.restore();
|
||||
|
||||
// Axes labels.
|
||||
_drawText(canvas, '${yMax.toStringAsFixed(2)}',
|
||||
Offset(padLeft - 4, padTop), align: TextAlign.right);
|
||||
|
||||
@@ -66,30 +66,45 @@ class _ChartWidgetState extends State<ChartWidget> {
|
||||
);
|
||||
}
|
||||
|
||||
// Modifier conventions:
|
||||
// Ctrl/Meta + wheel → Y-zoom (per-chart).
|
||||
// Shift + wheel → X-zoom (shared).
|
||||
// Bare wheel → bubble to the outer GridView scroll.
|
||||
void _onPointerSignal(PointerSignalEvent ev, BoxConstraints c) {
|
||||
if (ev is! PointerScrollEvent) return;
|
||||
final scope = AppScope.of(context);
|
||||
final ctrl = HardwareKeyboard.instance.isControlPressed ||
|
||||
HardwareKeyboard.instance.isMetaPressed;
|
||||
final factor = ev.scrollDelta.dy > 0 ? 1.1 : 1 / 1.1;
|
||||
final shift = HardwareKeyboard.instance.isShiftPressed;
|
||||
if (!ctrl && !shift) return; // Let the grid scroll handle it.
|
||||
GestureBinding.instance.pointerSignalResolver.register(ev, (e) {
|
||||
if (e is! PointerScrollEvent) return;
|
||||
if (ctrl) {
|
||||
// Y-zoom for this chart only.
|
||||
_zoomY(e, c);
|
||||
} else {
|
||||
_zoomX(e, c);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _zoomY(PointerScrollEvent ev, BoxConstraints c) {
|
||||
final scope = AppScope.of(context);
|
||||
final factor = ev.scrollDelta.dy > 0 ? 1.1 : 1 / 1.1;
|
||||
final cfg = scope.layout.configFor(widget.channel);
|
||||
final localY = ev.localPosition.dy;
|
||||
// Map local Y back to data value (top = yMax, bottom = yMin).
|
||||
final h = c.maxHeight;
|
||||
final v =
|
||||
cfg.yMax - (localY / h) * (cfg.yMax - cfg.yMin);
|
||||
final v = cfg.yMax - (localY / h) * (cfg.yMax - cfg.yMin);
|
||||
scope.layout.zoomY(
|
||||
channel: widget.channel,
|
||||
pivotValue: v,
|
||||
factor: factor,
|
||||
);
|
||||
} else {
|
||||
// Shared X-zoom.
|
||||
}
|
||||
|
||||
void _zoomX(PointerScrollEvent ev, BoxConstraints c) {
|
||||
final scope = AppScope.of(context);
|
||||
final factor = ev.scrollDelta.dy > 0 ? 1.1 : 1 / 1.1;
|
||||
final newest = scope.session.packets.newest;
|
||||
final oldestUs =
|
||||
scope.session.packets.oldest?.timestampUs.toInt() ?? 0;
|
||||
final oldestUs = scope.session.packets.oldest?.timestampUs.toInt() ?? 0;
|
||||
final newestUs = newest?.timestampUs.toInt() ??
|
||||
DateTime.now().microsecondsSinceEpoch;
|
||||
final win = scope.session.viewState.currentWindow(
|
||||
@@ -99,8 +114,8 @@ class _ChartWidgetState extends State<ChartWidget> {
|
||||
);
|
||||
final localX = ev.localPosition.dx;
|
||||
final w = c.maxWidth;
|
||||
final pivotUs = win.startUs +
|
||||
((localX / w) * (win.endUs - win.startUs)).round();
|
||||
final pivotUs =
|
||||
win.startUs + ((localX / w) * (win.endUs - win.startUs)).round();
|
||||
final maxWindow = Duration(
|
||||
microseconds: (newestUs - oldestUs).clamp(
|
||||
const Duration(milliseconds: 10).inMicroseconds,
|
||||
@@ -115,7 +130,6 @@ class _ChartWidgetState extends State<ChartWidget> {
|
||||
newestUs: newestUs,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void _dragStartHandler(DragStartDetails d) {
|
||||
final scope = AppScope.of(context);
|
||||
|
||||
@@ -27,7 +27,12 @@ class Toolbar extends StatelessWidget {
|
||||
ListenableBuilder(
|
||||
listenable: scope.session.viewState,
|
||||
builder: (_, __) => OutlinedButton.icon(
|
||||
onPressed: scope.session.viewState.togglePause,
|
||||
onPressed: () {
|
||||
final newestUs = scope.session.packets.newest?.timestampUs
|
||||
.toInt() ??
|
||||
DateTime.now().microsecondsSinceEpoch;
|
||||
scope.session.viewState.togglePause(newestUs: newestUs);
|
||||
},
|
||||
icon: Icon(scope.session.viewState.userPaused
|
||||
? Icons.play_arrow
|
||||
: Icons.pause),
|
||||
|
||||
162
server/main.cpp
162
server/main.cpp
@@ -27,6 +27,7 @@
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "messages.pb.h"
|
||||
|
||||
@@ -205,11 +206,12 @@ bool handshake(int fd) {
|
||||
return write_all(fd, resp.data(), resp.size());
|
||||
}
|
||||
|
||||
// Server-to-client binary frame (unmasked, FIN set).
|
||||
bool send_binary(int fd, std::span<const uint8_t> payload) {
|
||||
// Send one server-to-client frame (unmasked). `op_byte` is the full first
|
||||
// byte — e.g. 0x82 = FIN+binary, 0x8A = FIN+pong, 0x88 = FIN+close.
|
||||
bool send_frame(int fd, uint8_t op_byte, std::span<const uint8_t> payload) {
|
||||
std::array<uint8_t, 10> hdr{};
|
||||
size_t hdr_len;
|
||||
hdr[0] = 0x82; // FIN=1, opcode=2 (binary)
|
||||
hdr[0] = op_byte;
|
||||
if (payload.size() < 126) {
|
||||
hdr[1] = static_cast<uint8_t>(payload.size());
|
||||
hdr_len = 2;
|
||||
@@ -229,36 +231,138 @@ bool send_binary(int fd, std::span<const uint8_t> payload) {
|
||||
return write_all(fd, payload.data(), payload.size());
|
||||
}
|
||||
|
||||
bool send_binary(int fd, std::span<const uint8_t> payload) {
|
||||
return send_frame(fd, 0x82, payload);
|
||||
}
|
||||
|
||||
// Try to parse one inbound (client-masked) frame from `in`.
|
||||
// Returns bytes consumed, 0 if more data is needed, -1 on protocol error.
|
||||
ssize_t try_parse_frame(std::span<const uint8_t> in, uint8_t& op, bool& fin,
|
||||
std::vector<uint8_t>& payload) {
|
||||
constexpr uint64_t kMaxInbound = 1u << 20; // 1 MiB cap is ample for control frames
|
||||
if (in.size() < 2) return 0;
|
||||
const uint8_t b0 = in[0];
|
||||
const uint8_t b1 = in[1];
|
||||
fin = (b0 & 0x80) != 0;
|
||||
op = b0 & 0x0f;
|
||||
const bool masked = (b1 & 0x80) != 0;
|
||||
uint64_t len = b1 & 0x7fu;
|
||||
size_t pos = 2;
|
||||
if (len == 126) {
|
||||
if (in.size() < 4) return 0;
|
||||
len = (uint64_t(in[2]) << 8) | in[3];
|
||||
pos = 4;
|
||||
} else if (len == 127) {
|
||||
if (in.size() < 10) return 0;
|
||||
len = 0;
|
||||
for (int i = 0; i < 8; ++i) len = (len << 8) | in[2 + i];
|
||||
pos = 10;
|
||||
}
|
||||
if (!masked) return -1; // RFC 6455: client frames MUST be masked.
|
||||
if (len > kMaxInbound) return -1;
|
||||
if (in.size() < pos + 4) return 0;
|
||||
uint8_t mask[4];
|
||||
std::memcpy(mask, in.data() + pos, 4);
|
||||
pos += 4;
|
||||
if (in.size() < pos + len) return 0;
|
||||
payload.assign(len, 0);
|
||||
for (uint64_t i = 0; i < len; ++i)
|
||||
payload[i] = in[pos + i] ^ mask[i & 3];
|
||||
return static_cast<ssize_t>(pos + len);
|
||||
}
|
||||
|
||||
// Drain any bytes pending on `fd`, parse complete frames out of `rx`, and
|
||||
// respond to control frames. Returns false on peer close, protocol error,
|
||||
// or socket error — caller should exit the serve loop.
|
||||
bool pump_inbox(int fd, std::vector<uint8_t>& rx) {
|
||||
uint8_t tmp[4096];
|
||||
while (true) {
|
||||
ssize_t n = recv(fd, tmp, sizeof(tmp), MSG_DONTWAIT);
|
||||
if (n > 0) {
|
||||
rx.insert(rx.end(), tmp, tmp + n);
|
||||
continue;
|
||||
}
|
||||
if (n == 0) return false; // peer closed
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
||||
if (errno == EINTR) continue;
|
||||
return false;
|
||||
}
|
||||
while (true) {
|
||||
uint8_t op;
|
||||
bool fin;
|
||||
std::vector<uint8_t> payload;
|
||||
ssize_t consumed = try_parse_frame(
|
||||
{rx.data(), rx.size()}, op, fin, payload);
|
||||
if (consumed == 0) break; // need more data
|
||||
if (consumed < 0) return false; // protocol error
|
||||
rx.erase(rx.begin(), rx.begin() + consumed);
|
||||
switch (op) {
|
||||
case 0x9: // ping → pong with echoed payload
|
||||
if (!send_frame(fd, 0x8A, payload)) return false;
|
||||
break;
|
||||
case 0x8: // close → echo close and exit
|
||||
send_frame(fd, 0x88, payload);
|
||||
return false;
|
||||
default:
|
||||
// continuation/text/binary/pong — ignore for this test server
|
||||
break;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Synthetic signals.
|
||||
// Synthetic signals. Each channel has an independent on/off gate that
|
||||
// simulates sensor outages of varying length, so the client's missing-data
|
||||
// hatching renders at short, medium, and long gap scales.
|
||||
// ---------------------------------------------------------------------------
|
||||
struct ChannelGate {
|
||||
bool emitting = true;
|
||||
int remaining = 0; // ticks (ms) until state flip
|
||||
};
|
||||
|
||||
void advance_gate(ChannelGate& g, std::mt19937_64& rng) {
|
||||
if (--g.remaining > 0) return;
|
||||
g.emitting = !g.emitting;
|
||||
std::uniform_real_distribution<double> u(0.0, 1.0);
|
||||
if (g.emitting) {
|
||||
g.remaining = std::uniform_int_distribution<int>(3000, 15000)(rng);
|
||||
} else {
|
||||
const double r = u(rng);
|
||||
if (r < 0.60) g.remaining = std::uniform_int_distribution<int>(30, 200)(rng); // short
|
||||
else if (r < 0.90) g.remaining = std::uniform_int_distribution<int>(300, 1500)(rng); // medium
|
||||
else g.remaining = std::uniform_int_distribution<int>(2000, 8000)(rng); // long
|
||||
}
|
||||
}
|
||||
|
||||
void fill_data(pb::DataPacket& p, int64_t now_us, uint64_t tick,
|
||||
std::mt19937_64& rng) {
|
||||
std::mt19937_64& rng,
|
||||
const std::array<ChannelGate, 16>& gates) {
|
||||
std::normal_distribution<double> noise(0.0, 0.05);
|
||||
const double t = now_us * 1e-6;
|
||||
p.set_timestamp_us(now_us);
|
||||
p.set_ch1(std::sin(2 * M_PI * 1.0 * t) + noise(rng));
|
||||
p.set_ch2(std::sin(2 * M_PI * 0.3 * t) * 2.0 + noise(rng));
|
||||
p.set_ch3(std::cos(2 * M_PI * 0.7 * t) + noise(rng));
|
||||
p.set_ch4(std::fmod(t * 0.5, 1.0) * 2.0 - 1.0); // saw
|
||||
p.set_ch5(std::fabs(std::fmod(t * 0.3, 2.0) - 1.0) * 2.0 - 1.0); // tri
|
||||
p.set_ch6(std::sin(2 * M_PI * 0.5 * t) > 0 ? 1.0 : -1.0); // square
|
||||
p.set_ch7(std::fmod(t * 0.1, 1.0)); // slow ramp
|
||||
p.set_ch8(noise(rng) * 4.0); // noise
|
||||
if (gates[0].emitting) p.set_ch1(std::sin(2 * M_PI * 1.0 * t) + noise(rng));
|
||||
if (gates[1].emitting) p.set_ch2(std::sin(2 * M_PI * 0.3 * t) * 2.0 + noise(rng));
|
||||
if (gates[2].emitting) p.set_ch3(std::cos(2 * M_PI * 0.7 * t) + noise(rng));
|
||||
if (gates[3].emitting) p.set_ch4(std::fmod(t * 0.5, 1.0) * 2.0 - 1.0); // saw
|
||||
if (gates[4].emitting) p.set_ch5(std::fabs(std::fmod(t * 0.3, 2.0) - 1.0) * 2.0 - 1.0); // tri
|
||||
if (gates[5].emitting) p.set_ch6(std::sin(2 * M_PI * 0.5 * t) > 0 ? 1.0 : -1.0); // square
|
||||
if (gates[6].emitting) p.set_ch7(std::fmod(t * 0.1, 1.0)); // slow ramp
|
||||
if (gates[7].emitting) p.set_ch8(noise(rng) * 4.0); // noise
|
||||
|
||||
auto sine = [&](int i) {
|
||||
const double freq = 0.1 * i;
|
||||
const double phase = i * 0.2;
|
||||
return std::sin(2 * M_PI * freq * t + phase) + noise(rng) * 0.3;
|
||||
};
|
||||
p.set_ch9(sine(9));
|
||||
p.set_ch10(sine(10));
|
||||
p.set_ch11(sine(11));
|
||||
p.set_ch12(sine(12));
|
||||
p.set_ch13(sine(13));
|
||||
p.set_ch14(sine(14));
|
||||
p.set_ch15(sine(15));
|
||||
p.set_ch16(sine(16));
|
||||
if (gates[8].emitting) p.set_ch9(sine(9));
|
||||
if (gates[9].emitting) p.set_ch10(sine(10));
|
||||
if (gates[10].emitting) p.set_ch11(sine(11));
|
||||
if (gates[11].emitting) p.set_ch12(sine(12));
|
||||
if (gates[12].emitting) p.set_ch13(sine(13));
|
||||
if (gates[13].emitting) p.set_ch14(sine(14));
|
||||
if (gates[14].emitting) p.set_ch15(sine(15));
|
||||
if (gates[15].emitting) p.set_ch16(sine(16));
|
||||
|
||||
// Statuses rotate once every 2 s, each offset by one step.
|
||||
if (tick % 2000 == 0) {
|
||||
@@ -314,17 +418,31 @@ void serve(int fd) {
|
||||
uint32_t log_seq = 0;
|
||||
std::string buf;
|
||||
|
||||
// Stagger each channel's initial "on" window so outages don't line up.
|
||||
std::array<ChannelGate, 16> gates;
|
||||
for (auto& g : gates) {
|
||||
g.emitting = true;
|
||||
g.remaining = std::uniform_int_distribution<int>(500, 12000)(rng);
|
||||
}
|
||||
|
||||
// Inbound byte buffer for parsing client control frames (pings, close).
|
||||
std::vector<uint8_t> rx;
|
||||
|
||||
while (true) {
|
||||
std::this_thread::sleep_until(next);
|
||||
next += 1ms;
|
||||
|
||||
if (!pump_inbox(fd, rx)) break;
|
||||
|
||||
const int64_t now_us =
|
||||
std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now() - start)
|
||||
.count();
|
||||
|
||||
for (auto& g : gates) advance_gate(g, rng);
|
||||
|
||||
pb::Envelope env;
|
||||
fill_data(*env.mutable_data(), now_us, tick, rng);
|
||||
fill_data(*env.mutable_data(), now_us, tick, rng, gates);
|
||||
buf.clear();
|
||||
env.SerializeToString(&buf);
|
||||
if (!send_binary(fd, {reinterpret_cast<const uint8_t*>(buf.data()),
|
||||
|
||||
Reference in New Issue
Block a user