526 lines
18 KiB
C++
526 lines
18 KiB
C++
// Fake telemetry server for TelemetryMonitor. Accepts one WebSocket client at
|
|
// a time on ws://127.0.0.1:9000 and streams synthetic DataPackets at 1 kHz
|
|
// plus a LogPacket every ~3 s. Wire format matches proto/messages.proto.
|
|
//
|
|
// Build: cmake -S . -B build && cmake --build build -j
|
|
// Run: ./build/fake_telemetry [port]
|
|
|
|
#include <arpa/inet.h>
|
|
#include <netinet/in.h>
|
|
#include <signal.h>
|
|
#include <sys/socket.h>
|
|
#include <unistd.h>
|
|
|
|
#include <algorithm>
|
|
#include <array>
|
|
#include <bit>
|
|
#include <cctype>
|
|
#include <cerrno>
|
|
#include <charconv>
|
|
#include <chrono>
|
|
#include <cmath>
|
|
#include <cstdint>
|
|
#include <cstdio>
|
|
#include <cstring>
|
|
#include <random>
|
|
#include <span>
|
|
#include <string>
|
|
#include <string_view>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include "messages.pb.h"
|
|
|
|
using namespace std::chrono_literals;
|
|
namespace pb = telemetry;
|
|
|
|
namespace {
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// SHA-1 (FIPS 180-4) — used once per connection for the WebSocket handshake.
|
|
// ---------------------------------------------------------------------------
|
|
struct Sha1 {
|
|
uint32_t h[5]{0x67452301u, 0xEFCDAB89u, 0x98BADCFEu, 0x10325476u, 0xC3D2E1F0u};
|
|
uint64_t bit_len = 0;
|
|
std::array<uint8_t, 64> block{};
|
|
size_t block_len = 0;
|
|
|
|
void update(std::span<const uint8_t> data) {
|
|
for (uint8_t b : data) {
|
|
block[block_len++] = b;
|
|
if (block_len == 64) {
|
|
transform();
|
|
block_len = 0;
|
|
}
|
|
}
|
|
bit_len += data.size() * 8;
|
|
}
|
|
|
|
std::array<uint8_t, 20> finalize() {
|
|
block[block_len++] = 0x80;
|
|
if (block_len > 56) {
|
|
while (block_len < 64) block[block_len++] = 0;
|
|
transform();
|
|
block_len = 0;
|
|
}
|
|
while (block_len < 56) block[block_len++] = 0;
|
|
for (int i = 7; i >= 0; --i)
|
|
block[block_len++] = static_cast<uint8_t>((bit_len >> (8 * i)) & 0xff);
|
|
transform();
|
|
std::array<uint8_t, 20> out{};
|
|
for (int i = 0; i < 5; ++i) {
|
|
out[i * 4 + 0] = static_cast<uint8_t>(h[i] >> 24);
|
|
out[i * 4 + 1] = static_cast<uint8_t>(h[i] >> 16);
|
|
out[i * 4 + 2] = static_cast<uint8_t>(h[i] >> 8);
|
|
out[i * 4 + 3] = static_cast<uint8_t>(h[i]);
|
|
}
|
|
return out;
|
|
}
|
|
|
|
void transform() {
|
|
uint32_t w[80];
|
|
for (int i = 0; i < 16; ++i) {
|
|
w[i] = (uint32_t(block[i * 4]) << 24) |
|
|
(uint32_t(block[i * 4 + 1]) << 16) |
|
|
(uint32_t(block[i * 4 + 2]) << 8) |
|
|
uint32_t(block[i * 4 + 3]);
|
|
}
|
|
for (int i = 16; i < 80; ++i)
|
|
w[i] = std::rotl(w[i - 3] ^ w[i - 8] ^ w[i - 14] ^ w[i - 16], 1);
|
|
|
|
uint32_t a = h[0], b = h[1], c = h[2], d = h[3], e = h[4];
|
|
for (int i = 0; i < 80; ++i) {
|
|
uint32_t f, k;
|
|
if (i < 20) { f = (b & c) | (~b & d); k = 0x5A827999u; }
|
|
else if (i < 40) { f = b ^ c ^ d; k = 0x6ED9EBA1u; }
|
|
else if (i < 60) { f = (b & c) | (b & d) | (c & d); k = 0x8F1BBCDCu; }
|
|
else { f = b ^ c ^ d; k = 0xCA62C1D6u; }
|
|
uint32_t t = std::rotl(a, 5) + f + e + k + w[i];
|
|
e = d;
|
|
d = c;
|
|
c = std::rotl(b, 30);
|
|
b = a;
|
|
a = t;
|
|
}
|
|
h[0] += a; h[1] += b; h[2] += c; h[3] += d; h[4] += e;
|
|
}
|
|
};
|
|
|
|
std::string base64(std::span<const uint8_t> bytes) {
|
|
static constexpr char kTbl[] =
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
|
std::string out;
|
|
out.reserve(((bytes.size() + 2) / 3) * 4);
|
|
size_t i = 0;
|
|
for (; i + 3 <= bytes.size(); i += 3) {
|
|
uint32_t n = (uint32_t(bytes[i]) << 16) |
|
|
(uint32_t(bytes[i + 1]) << 8) | uint32_t(bytes[i + 2]);
|
|
out.push_back(kTbl[(n >> 18) & 0x3f]);
|
|
out.push_back(kTbl[(n >> 12) & 0x3f]);
|
|
out.push_back(kTbl[(n >> 6) & 0x3f]);
|
|
out.push_back(kTbl[n & 0x3f]);
|
|
}
|
|
const size_t rem = bytes.size() - i;
|
|
if (rem == 1) {
|
|
uint32_t n = uint32_t(bytes[i]) << 16;
|
|
out.push_back(kTbl[(n >> 18) & 0x3f]);
|
|
out.push_back(kTbl[(n >> 12) & 0x3f]);
|
|
out.push_back('=');
|
|
out.push_back('=');
|
|
} else if (rem == 2) {
|
|
uint32_t n = (uint32_t(bytes[i]) << 16) | (uint32_t(bytes[i + 1]) << 8);
|
|
out.push_back(kTbl[(n >> 18) & 0x3f]);
|
|
out.push_back(kTbl[(n >> 12) & 0x3f]);
|
|
out.push_back(kTbl[(n >> 6) & 0x3f]);
|
|
out.push_back('=');
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// WebSocket handshake + framing.
|
|
// ---------------------------------------------------------------------------
|
|
constexpr std::string_view kWsGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
|
|
|
std::string accept_key(std::string_view client_key) {
|
|
Sha1 s;
|
|
s.update({reinterpret_cast<const uint8_t*>(client_key.data()), client_key.size()});
|
|
s.update({reinterpret_cast<const uint8_t*>(kWsGuid.data()), kWsGuid.size()});
|
|
auto digest = s.finalize();
|
|
return base64({digest.data(), digest.size()});
|
|
}
|
|
|
|
std::string_view find_header(std::string_view request, std::string_view name) {
|
|
size_t pos = 0;
|
|
while (pos < request.size()) {
|
|
size_t eol = request.find("\r\n", pos);
|
|
if (eol == std::string_view::npos) break;
|
|
std::string_view line = request.substr(pos, eol - pos);
|
|
size_t colon = line.find(':');
|
|
if (colon != std::string_view::npos) {
|
|
std::string_view h = line.substr(0, colon);
|
|
if (h.size() == name.size() &&
|
|
std::equal(h.begin(), h.end(), name.begin(), [](char a, char b) {
|
|
return std::tolower(static_cast<unsigned char>(a)) ==
|
|
std::tolower(static_cast<unsigned char>(b));
|
|
})) {
|
|
std::string_view v = line.substr(colon + 1);
|
|
while (!v.empty() && (v.front() == ' ' || v.front() == '\t'))
|
|
v.remove_prefix(1);
|
|
while (!v.empty() && (v.back() == ' ' || v.back() == '\t'))
|
|
v.remove_suffix(1);
|
|
return v;
|
|
}
|
|
}
|
|
pos = eol + 2;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
bool write_all(int fd, const void* data, size_t len) {
|
|
const auto* p = static_cast<const char*>(data);
|
|
while (len > 0) {
|
|
ssize_t n = send(fd, p, len, MSG_NOSIGNAL);
|
|
if (n <= 0) return false;
|
|
p += n;
|
|
len -= static_cast<size_t>(n);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool handshake(int fd) {
|
|
std::string req;
|
|
char buf[4096];
|
|
while (req.find("\r\n\r\n") == std::string::npos) {
|
|
ssize_t n = recv(fd, buf, sizeof(buf), 0);
|
|
if (n <= 0) return false;
|
|
req.append(buf, static_cast<size_t>(n));
|
|
if (req.size() > 64 * 1024) return false;
|
|
}
|
|
auto key = find_header(req, "Sec-WebSocket-Key");
|
|
if (key.empty()) return false;
|
|
std::string resp = "HTTP/1.1 101 Switching Protocols\r\n"
|
|
"Upgrade: websocket\r\n"
|
|
"Connection: Upgrade\r\n"
|
|
"Sec-WebSocket-Accept: " + accept_key(key) + "\r\n\r\n";
|
|
return write_all(fd, resp.data(), resp.size());
|
|
}
|
|
|
|
// Send one server-to-client frame (unmasked). `op_byte` is the full first
|
|
// byte — e.g. 0x82 = FIN+binary, 0x8A = FIN+pong, 0x88 = FIN+close.
|
|
bool send_frame(int fd, uint8_t op_byte, std::span<const uint8_t> payload) {
|
|
std::array<uint8_t, 10> hdr{};
|
|
size_t hdr_len;
|
|
hdr[0] = op_byte;
|
|
if (payload.size() < 126) {
|
|
hdr[1] = static_cast<uint8_t>(payload.size());
|
|
hdr_len = 2;
|
|
} else if (payload.size() < 65536) {
|
|
hdr[1] = 126;
|
|
uint16_t be = htons(static_cast<uint16_t>(payload.size()));
|
|
std::memcpy(hdr.data() + 2, &be, 2);
|
|
hdr_len = 4;
|
|
} else {
|
|
hdr[1] = 127;
|
|
uint64_t v = payload.size();
|
|
for (int i = 0; i < 8; ++i)
|
|
hdr[2 + i] = static_cast<uint8_t>((v >> (56 - 8 * i)) & 0xff);
|
|
hdr_len = 10;
|
|
}
|
|
if (!write_all(fd, hdr.data(), hdr_len)) return false;
|
|
return write_all(fd, payload.data(), payload.size());
|
|
}
|
|
|
|
bool send_binary(int fd, std::span<const uint8_t> payload) {
|
|
return send_frame(fd, 0x82, payload);
|
|
}
|
|
|
|
// Try to parse one inbound (client-masked) frame from `in`.
|
|
// Returns bytes consumed, 0 if more data is needed, -1 on protocol error.
|
|
ssize_t try_parse_frame(std::span<const uint8_t> in, uint8_t& op, bool& fin,
|
|
std::vector<uint8_t>& payload) {
|
|
constexpr uint64_t kMaxInbound = 1u << 20; // 1 MiB cap is ample for control frames
|
|
if (in.size() < 2) return 0;
|
|
const uint8_t b0 = in[0];
|
|
const uint8_t b1 = in[1];
|
|
fin = (b0 & 0x80) != 0;
|
|
op = b0 & 0x0f;
|
|
const bool masked = (b1 & 0x80) != 0;
|
|
uint64_t len = b1 & 0x7fu;
|
|
size_t pos = 2;
|
|
if (len == 126) {
|
|
if (in.size() < 4) return 0;
|
|
len = (uint64_t(in[2]) << 8) | in[3];
|
|
pos = 4;
|
|
} else if (len == 127) {
|
|
if (in.size() < 10) return 0;
|
|
len = 0;
|
|
for (int i = 0; i < 8; ++i) len = (len << 8) | in[2 + i];
|
|
pos = 10;
|
|
}
|
|
if (!masked) return -1; // RFC 6455: client frames MUST be masked.
|
|
if (len > kMaxInbound) return -1;
|
|
if (in.size() < pos + 4) return 0;
|
|
uint8_t mask[4];
|
|
std::memcpy(mask, in.data() + pos, 4);
|
|
pos += 4;
|
|
if (in.size() < pos + len) return 0;
|
|
payload.assign(len, 0);
|
|
for (uint64_t i = 0; i < len; ++i)
|
|
payload[i] = in[pos + i] ^ mask[i & 3];
|
|
return static_cast<ssize_t>(pos + len);
|
|
}
|
|
|
|
// Drain any bytes pending on `fd`, parse complete frames out of `rx`, and
|
|
// respond to control frames. Returns false on peer close, protocol error,
|
|
// or socket error — caller should exit the serve loop.
|
|
bool pump_inbox(int fd, std::vector<uint8_t>& rx) {
|
|
uint8_t tmp[4096];
|
|
while (true) {
|
|
ssize_t n = recv(fd, tmp, sizeof(tmp), MSG_DONTWAIT);
|
|
if (n > 0) {
|
|
rx.insert(rx.end(), tmp, tmp + n);
|
|
continue;
|
|
}
|
|
if (n == 0) return false; // peer closed
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
|
if (errno == EINTR) continue;
|
|
return false;
|
|
}
|
|
while (true) {
|
|
uint8_t op;
|
|
bool fin;
|
|
std::vector<uint8_t> payload;
|
|
ssize_t consumed = try_parse_frame(
|
|
{rx.data(), rx.size()}, op, fin, payload);
|
|
if (consumed == 0) break; // need more data
|
|
if (consumed < 0) return false; // protocol error
|
|
rx.erase(rx.begin(), rx.begin() + consumed);
|
|
switch (op) {
|
|
case 0x9: // ping → pong with echoed payload
|
|
if (!send_frame(fd, 0x8A, payload)) return false;
|
|
break;
|
|
case 0x8: // close → echo close and exit
|
|
send_frame(fd, 0x88, payload);
|
|
return false;
|
|
default:
|
|
// continuation/text/binary/pong — ignore for this test server
|
|
break;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Synthetic signals. Each channel has an independent on/off gate that
|
|
// simulates sensor outages of varying length, so the client's missing-data
|
|
// hatching renders at short, medium, and long gap scales.
|
|
// ---------------------------------------------------------------------------
|
|
struct ChannelGate {
|
|
bool emitting = true;
|
|
int remaining = 0; // ticks (ms) until state flip
|
|
};
|
|
|
|
void advance_gate(ChannelGate& g, std::mt19937_64& rng) {
|
|
if (--g.remaining > 0) return;
|
|
g.emitting = !g.emitting;
|
|
std::uniform_real_distribution<double> u(0.0, 1.0);
|
|
if (g.emitting) {
|
|
g.remaining = std::uniform_int_distribution<int>(3000, 15000)(rng);
|
|
} else {
|
|
const double r = u(rng);
|
|
if (r < 0.60) g.remaining = std::uniform_int_distribution<int>(30, 200)(rng); // short
|
|
else if (r < 0.90) g.remaining = std::uniform_int_distribution<int>(300, 1500)(rng); // medium
|
|
else g.remaining = std::uniform_int_distribution<int>(2000, 8000)(rng); // long
|
|
}
|
|
}
|
|
|
|
void fill_data(pb::DataPacket& p, int64_t now_us, uint64_t tick,
|
|
std::mt19937_64& rng,
|
|
const std::array<ChannelGate, 16>& gates) {
|
|
std::normal_distribution<double> noise(0.0, 0.05);
|
|
const double t = now_us * 1e-6;
|
|
p.set_timestamp_us(now_us);
|
|
if (gates[0].emitting) p.set_ch1(std::sin(2 * M_PI * 1.0 * t) + noise(rng));
|
|
if (gates[1].emitting) p.set_ch2(std::sin(2 * M_PI * 0.3 * t) * 2.0 + noise(rng));
|
|
if (gates[2].emitting) p.set_ch3(std::cos(2 * M_PI * 0.7 * t) + noise(rng));
|
|
if (gates[3].emitting) p.set_ch4(std::fmod(t * 0.5, 1.0) * 2.0 - 1.0); // saw
|
|
if (gates[4].emitting) p.set_ch5(std::fabs(std::fmod(t * 0.3, 2.0) - 1.0) * 2.0 - 1.0); // tri
|
|
if (gates[5].emitting) p.set_ch6(std::sin(2 * M_PI * 0.5 * t) > 0 ? 1.0 : -1.0); // square
|
|
if (gates[6].emitting) p.set_ch7(std::fmod(t * 0.1, 1.0)); // slow ramp
|
|
if (gates[7].emitting) p.set_ch8(noise(rng) * 4.0); // noise
|
|
|
|
auto sine = [&](int i) {
|
|
const double freq = 0.1 * i;
|
|
const double phase = i * 0.2;
|
|
return std::sin(2 * M_PI * freq * t + phase) + noise(rng) * 0.3;
|
|
};
|
|
if (gates[8].emitting) p.set_ch9(sine(9));
|
|
if (gates[9].emitting) p.set_ch10(sine(10));
|
|
if (gates[10].emitting) p.set_ch11(sine(11));
|
|
if (gates[11].emitting) p.set_ch12(sine(12));
|
|
if (gates[12].emitting) p.set_ch13(sine(13));
|
|
if (gates[13].emitting) p.set_ch14(sine(14));
|
|
if (gates[14].emitting) p.set_ch15(sine(15));
|
|
if (gates[15].emitting) p.set_ch16(sine(16));
|
|
|
|
// Statuses rotate every 500 ms, each offset by one step.
|
|
if (tick % 500 == 0) {
|
|
const uint32_t phase = static_cast<uint32_t>(tick / 500);
|
|
p.set_status1((phase + 0) % 4);
|
|
p.set_status2((phase + 1) % 4);
|
|
p.set_status3((phase + 2) % 4);
|
|
p.set_status4((phase + 3) % 4);
|
|
p.set_status5((phase + 0) % 4);
|
|
p.set_status6((phase + 1) % 4);
|
|
p.set_status7((phase + 2) % 4);
|
|
p.set_status8((phase + 3) % 4);
|
|
}
|
|
}
|
|
|
|
pb::Severity cycle_severity(uint32_t n) {
|
|
static constexpr std::array seq{
|
|
pb::INFO, pb::DEBUG, pb::INFO, pb::WARN,
|
|
pb::INFO, pb::ERROR, pb::INFO, pb::FATAL,
|
|
};
|
|
return seq[n % seq.size()];
|
|
}
|
|
|
|
std::string_view cycle_description(uint32_t n) {
|
|
static constexpr std::array<std::string_view, 8> descs{
|
|
"startup complete",
|
|
"checkpoint reached",
|
|
"threshold exceeded on channel 3",
|
|
"sensor recalibrated",
|
|
"queue depth high",
|
|
"peer handshake ok",
|
|
"cache evicted 128 entries",
|
|
"watchdog tickled",
|
|
};
|
|
return descs[n % descs.size()];
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Per-client service loop.
|
|
// ---------------------------------------------------------------------------
|
|
void serve(int fd) {
|
|
if (!handshake(fd)) {
|
|
std::fprintf(stderr, "handshake failed\n");
|
|
close(fd);
|
|
return;
|
|
}
|
|
std::fprintf(stderr, "handshake ok, streaming\n");
|
|
|
|
std::mt19937_64 rng{std::random_device{}()};
|
|
const auto start = std::chrono::steady_clock::now();
|
|
auto next = start;
|
|
uint64_t tick = 0;
|
|
uint32_t log_seq = 0;
|
|
std::string buf;
|
|
|
|
// Stagger each channel's initial "on" window so outages don't line up.
|
|
std::array<ChannelGate, 16> gates;
|
|
for (auto& g : gates) {
|
|
g.emitting = true;
|
|
g.remaining = std::uniform_int_distribution<int>(500, 12000)(rng);
|
|
}
|
|
|
|
// Inbound byte buffer for parsing client control frames (pings, close).
|
|
std::vector<uint8_t> rx;
|
|
|
|
while (true) {
|
|
std::this_thread::sleep_until(next);
|
|
next += 1ms;
|
|
|
|
if (!pump_inbox(fd, rx)) break;
|
|
|
|
const int64_t now_us =
|
|
std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::steady_clock::now() - start)
|
|
.count();
|
|
|
|
for (auto& g : gates) advance_gate(g, rng);
|
|
|
|
pb::Envelope env;
|
|
fill_data(*env.mutable_data(), now_us, tick, rng, gates);
|
|
buf.clear();
|
|
env.SerializeToString(&buf);
|
|
if (!send_binary(fd, {reinterpret_cast<const uint8_t*>(buf.data()),
|
|
buf.size()})) {
|
|
break;
|
|
}
|
|
|
|
if (tick % 3000 == 1500) {
|
|
pb::Envelope le;
|
|
auto* log = le.mutable_log();
|
|
log->set_timestamp_us(now_us);
|
|
log->set_severity(cycle_severity(log_seq));
|
|
log->set_error_number(1000 + log_seq);
|
|
log->set_description(std::string(cycle_description(log_seq)));
|
|
std::string lbuf;
|
|
le.SerializeToString(&lbuf);
|
|
if (!send_binary(fd, {reinterpret_cast<const uint8_t*>(lbuf.data()),
|
|
lbuf.size()})) {
|
|
break;
|
|
}
|
|
++log_seq;
|
|
}
|
|
++tick;
|
|
}
|
|
std::fprintf(stderr, "client gone after %lu ticks\n",
|
|
static_cast<unsigned long>(tick));
|
|
close(fd);
|
|
}
|
|
|
|
uint16_t parse_port(int argc, char** argv) {
|
|
if (argc >= 2) {
|
|
uint16_t port = 0;
|
|
auto* begin = argv[1];
|
|
auto* end = argv[1] + std::strlen(argv[1]);
|
|
auto [p, ec] = std::from_chars(begin, end, port);
|
|
if (ec == std::errc{} && p == end) return port;
|
|
}
|
|
return 9000;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
int main(int argc, char** argv) {
|
|
signal(SIGPIPE, SIG_IGN);
|
|
GOOGLE_PROTOBUF_VERIFY_VERSION;
|
|
|
|
const uint16_t port = parse_port(argc, argv);
|
|
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (listen_fd < 0) { std::perror("socket"); return 1; }
|
|
int one = 1;
|
|
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
|
|
|
|
sockaddr_in addr{};
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(port);
|
|
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
|
|
if (bind(listen_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
|
|
std::perror("bind");
|
|
return 1;
|
|
}
|
|
if (listen(listen_fd, 1) < 0) {
|
|
std::perror("listen");
|
|
return 1;
|
|
}
|
|
std::fprintf(stderr, "fake_telemetry listening on ws://127.0.0.1:%u\n", port);
|
|
|
|
while (true) {
|
|
int fd = accept(listen_fd, nullptr, nullptr);
|
|
if (fd < 0) {
|
|
if (errno == EINTR) continue;
|
|
std::perror("accept");
|
|
break;
|
|
}
|
|
std::fprintf(stderr, "client connected\n");
|
|
serve(fd);
|
|
}
|
|
close(listen_fd);
|
|
google::protobuf::ShutdownProtobufLibrary();
|
|
return 0;
|
|
}
|