// Fake telemetry server for TelemetryMonitor. Accepts one WebSocket client at // a time on ws://127.0.0.1:9000 and streams synthetic DataPackets at 1 kHz // plus a LogPacket every ~3 s. Wire format matches proto/messages.proto. // // Build: cmake -S . -B build && cmake --build build -j // Run: ./build/fake_telemetry [port] #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "messages.pb.h" using namespace std::chrono_literals; namespace pb = telemetry; namespace { // --------------------------------------------------------------------------- // SHA-1 (FIPS 180-4) — used once per connection for the WebSocket handshake. // --------------------------------------------------------------------------- struct Sha1 { uint32_t h[5]{0x67452301u, 0xEFCDAB89u, 0x98BADCFEu, 0x10325476u, 0xC3D2E1F0u}; uint64_t bit_len = 0; std::array block{}; size_t block_len = 0; void update(std::span data) { for (uint8_t b : data) { block[block_len++] = b; if (block_len == 64) { transform(); block_len = 0; } } bit_len += data.size() * 8; } std::array finalize() { block[block_len++] = 0x80; if (block_len > 56) { while (block_len < 64) block[block_len++] = 0; transform(); block_len = 0; } while (block_len < 56) block[block_len++] = 0; for (int i = 7; i >= 0; --i) block[block_len++] = static_cast((bit_len >> (8 * i)) & 0xff); transform(); std::array out{}; for (int i = 0; i < 5; ++i) { out[i * 4 + 0] = static_cast(h[i] >> 24); out[i * 4 + 1] = static_cast(h[i] >> 16); out[i * 4 + 2] = static_cast(h[i] >> 8); out[i * 4 + 3] = static_cast(h[i]); } return out; } void transform() { uint32_t w[80]; for (int i = 0; i < 16; ++i) { w[i] = (uint32_t(block[i * 4]) << 24) | (uint32_t(block[i * 4 + 1]) << 16) | (uint32_t(block[i * 4 + 2]) << 8) | uint32_t(block[i * 4 + 3]); } for (int i = 16; i < 80; ++i) w[i] = std::rotl(w[i - 3] ^ w[i - 8] ^ w[i - 14] ^ w[i - 16], 1); uint32_t a = h[0], b = h[1], c = h[2], d = h[3], e = h[4]; for (int i = 0; i < 80; ++i) { uint32_t f, k; if (i < 20) { f = (b & c) | (~b & d); k = 0x5A827999u; } else if (i < 40) { f = b ^ c ^ d; k = 0x6ED9EBA1u; } else if (i < 60) { f = (b & c) | (b & d) | (c & d); k = 0x8F1BBCDCu; } else { f = b ^ c ^ d; k = 0xCA62C1D6u; } uint32_t t = std::rotl(a, 5) + f + e + k + w[i]; e = d; d = c; c = std::rotl(b, 30); b = a; a = t; } h[0] += a; h[1] += b; h[2] += c; h[3] += d; h[4] += e; } }; std::string base64(std::span bytes) { static constexpr char kTbl[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; std::string out; out.reserve(((bytes.size() + 2) / 3) * 4); size_t i = 0; for (; i + 3 <= bytes.size(); i += 3) { uint32_t n = (uint32_t(bytes[i]) << 16) | (uint32_t(bytes[i + 1]) << 8) | uint32_t(bytes[i + 2]); out.push_back(kTbl[(n >> 18) & 0x3f]); out.push_back(kTbl[(n >> 12) & 0x3f]); out.push_back(kTbl[(n >> 6) & 0x3f]); out.push_back(kTbl[n & 0x3f]); } const size_t rem = bytes.size() - i; if (rem == 1) { uint32_t n = uint32_t(bytes[i]) << 16; out.push_back(kTbl[(n >> 18) & 0x3f]); out.push_back(kTbl[(n >> 12) & 0x3f]); out.push_back('='); out.push_back('='); } else if (rem == 2) { uint32_t n = (uint32_t(bytes[i]) << 16) | (uint32_t(bytes[i + 1]) << 8); out.push_back(kTbl[(n >> 18) & 0x3f]); out.push_back(kTbl[(n >> 12) & 0x3f]); out.push_back(kTbl[(n >> 6) & 0x3f]); out.push_back('='); } return out; } // --------------------------------------------------------------------------- // WebSocket handshake + framing. // --------------------------------------------------------------------------- constexpr std::string_view kWsGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; std::string accept_key(std::string_view client_key) { Sha1 s; s.update({reinterpret_cast(client_key.data()), client_key.size()}); s.update({reinterpret_cast(kWsGuid.data()), kWsGuid.size()}); auto digest = s.finalize(); return base64({digest.data(), digest.size()}); } std::string_view find_header(std::string_view request, std::string_view name) { size_t pos = 0; while (pos < request.size()) { size_t eol = request.find("\r\n", pos); if (eol == std::string_view::npos) break; std::string_view line = request.substr(pos, eol - pos); size_t colon = line.find(':'); if (colon != std::string_view::npos) { std::string_view h = line.substr(0, colon); if (h.size() == name.size() && std::equal(h.begin(), h.end(), name.begin(), [](char a, char b) { return std::tolower(static_cast(a)) == std::tolower(static_cast(b)); })) { std::string_view v = line.substr(colon + 1); while (!v.empty() && (v.front() == ' ' || v.front() == '\t')) v.remove_prefix(1); while (!v.empty() && (v.back() == ' ' || v.back() == '\t')) v.remove_suffix(1); return v; } } pos = eol + 2; } return {}; } bool write_all(int fd, const void* data, size_t len) { const auto* p = static_cast(data); while (len > 0) { ssize_t n = send(fd, p, len, MSG_NOSIGNAL); if (n <= 0) return false; p += n; len -= static_cast(n); } return true; } bool handshake(int fd) { std::string req; char buf[4096]; while (req.find("\r\n\r\n") == std::string::npos) { ssize_t n = recv(fd, buf, sizeof(buf), 0); if (n <= 0) return false; req.append(buf, static_cast(n)); if (req.size() > 64 * 1024) return false; } auto key = find_header(req, "Sec-WebSocket-Key"); if (key.empty()) return false; std::string resp = "HTTP/1.1 101 Switching Protocols\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" "Sec-WebSocket-Accept: " + accept_key(key) + "\r\n\r\n"; return write_all(fd, resp.data(), resp.size()); } // Server-to-client binary frame (unmasked, FIN set). bool send_binary(int fd, std::span payload) { std::array hdr{}; size_t hdr_len; hdr[0] = 0x82; // FIN=1, opcode=2 (binary) if (payload.size() < 126) { hdr[1] = static_cast(payload.size()); hdr_len = 2; } else if (payload.size() < 65536) { hdr[1] = 126; uint16_t be = htons(static_cast(payload.size())); std::memcpy(hdr.data() + 2, &be, 2); hdr_len = 4; } else { hdr[1] = 127; uint64_t v = payload.size(); for (int i = 0; i < 8; ++i) hdr[2 + i] = static_cast((v >> (56 - 8 * i)) & 0xff); hdr_len = 10; } if (!write_all(fd, hdr.data(), hdr_len)) return false; return write_all(fd, payload.data(), payload.size()); } // --------------------------------------------------------------------------- // Synthetic signals. // --------------------------------------------------------------------------- void fill_data(pb::DataPacket& p, int64_t now_us, uint64_t tick, std::mt19937_64& rng) { std::normal_distribution noise(0.0, 0.05); const double t = now_us * 1e-6; p.set_timestamp_us(now_us); p.set_ch1(std::sin(2 * M_PI * 1.0 * t) + noise(rng)); p.set_ch2(std::sin(2 * M_PI * 0.3 * t) * 2.0 + noise(rng)); p.set_ch3(std::cos(2 * M_PI * 0.7 * t) + noise(rng)); p.set_ch4(std::fmod(t * 0.5, 1.0) * 2.0 - 1.0); // saw p.set_ch5(std::fabs(std::fmod(t * 0.3, 2.0) - 1.0) * 2.0 - 1.0); // tri p.set_ch6(std::sin(2 * M_PI * 0.5 * t) > 0 ? 1.0 : -1.0); // square p.set_ch7(std::fmod(t * 0.1, 1.0)); // slow ramp p.set_ch8(noise(rng) * 4.0); // noise auto sine = [&](int i) { const double freq = 0.1 * i; const double phase = i * 0.2; return std::sin(2 * M_PI * freq * t + phase) + noise(rng) * 0.3; }; p.set_ch9(sine(9)); p.set_ch10(sine(10)); p.set_ch11(sine(11)); p.set_ch12(sine(12)); p.set_ch13(sine(13)); p.set_ch14(sine(14)); p.set_ch15(sine(15)); p.set_ch16(sine(16)); // Statuses rotate once every 2 s, each offset by one step. if (tick % 2000 == 0) { const uint32_t phase = static_cast(tick / 2000); p.set_status1((phase + 0) % 4); p.set_status2((phase + 1) % 4); p.set_status3((phase + 2) % 4); p.set_status4((phase + 3) % 4); p.set_status5((phase + 0) % 4); p.set_status6((phase + 1) % 4); p.set_status7((phase + 2) % 4); p.set_status8((phase + 3) % 4); } } pb::Severity cycle_severity(uint32_t n) { static constexpr std::array seq{ pb::INFO, pb::DEBUG, pb::INFO, pb::WARN, pb::INFO, pb::ERROR, pb::INFO, pb::FATAL, }; return seq[n % seq.size()]; } std::string_view cycle_description(uint32_t n) { static constexpr std::array descs{ "startup complete", "checkpoint reached", "threshold exceeded on channel 3", "sensor recalibrated", "queue depth high", "peer handshake ok", "cache evicted 128 entries", "watchdog tickled", }; return descs[n % descs.size()]; } // --------------------------------------------------------------------------- // Per-client service loop. // --------------------------------------------------------------------------- void serve(int fd) { if (!handshake(fd)) { std::fprintf(stderr, "handshake failed\n"); close(fd); return; } std::fprintf(stderr, "handshake ok, streaming\n"); std::mt19937_64 rng{std::random_device{}()}; const auto start = std::chrono::steady_clock::now(); auto next = start; uint64_t tick = 0; uint32_t log_seq = 0; std::string buf; while (true) { std::this_thread::sleep_until(next); next += 1ms; const int64_t now_us = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); pb::Envelope env; fill_data(*env.mutable_data(), now_us, tick, rng); buf.clear(); env.SerializeToString(&buf); if (!send_binary(fd, {reinterpret_cast(buf.data()), buf.size()})) { break; } if (tick % 3000 == 1500) { pb::Envelope le; auto* log = le.mutable_log(); log->set_timestamp_us(now_us); log->set_severity(cycle_severity(log_seq)); log->set_error_number(1000 + log_seq); log->set_description(std::string(cycle_description(log_seq))); std::string lbuf; le.SerializeToString(&lbuf); if (!send_binary(fd, {reinterpret_cast(lbuf.data()), lbuf.size()})) { break; } ++log_seq; } ++tick; } std::fprintf(stderr, "client gone after %lu ticks\n", static_cast(tick)); close(fd); } uint16_t parse_port(int argc, char** argv) { if (argc >= 2) { uint16_t port = 0; auto* begin = argv[1]; auto* end = argv[1] + std::strlen(argv[1]); auto [p, ec] = std::from_chars(begin, end, port); if (ec == std::errc{} && p == end) return port; } return 9000; } } // namespace int main(int argc, char** argv) { signal(SIGPIPE, SIG_IGN); GOOGLE_PROTOBUF_VERIFY_VERSION; const uint16_t port = parse_port(argc, argv); int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { std::perror("socket"); return 1; } int one = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); if (bind(listen_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { std::perror("bind"); return 1; } if (listen(listen_fd, 1) < 0) { std::perror("listen"); return 1; } std::fprintf(stderr, "fake_telemetry listening on ws://127.0.0.1:%u\n", port); while (true) { int fd = accept(listen_fd, nullptr, nullptr); if (fd < 0) { if (errno == EINTR) continue; std::perror("accept"); break; } std::fprintf(stderr, "client connected\n"); serve(fd); } close(listen_fd); google::protobuf::ShutdownProtobufLibrary(); return 0; }