From 65a57b3429d398989de9e07db085f551d05a7683 Mon Sep 17 00:00:00 2001 From: Gabriel Lima Date: Tue, 21 Apr 2026 20:06:11 -0300 Subject: [PATCH] Mini WebSocket server with fake data for testing --- server/.gitignore | 3 + server/CMakeLists.txt | 31 ++++ server/main.cpp | 407 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 441 insertions(+) create mode 100644 server/.gitignore create mode 100644 server/CMakeLists.txt create mode 100644 server/main.cpp diff --git a/server/.gitignore b/server/.gitignore new file mode 100644 index 0000000..902f3c7 --- /dev/null +++ b/server/.gitignore @@ -0,0 +1,3 @@ +build/ +.cache/ +compile_commands.json diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt new file mode 100644 index 0000000..317df17 --- /dev/null +++ b/server/CMakeLists.txt @@ -0,0 +1,31 @@ +cmake_minimum_required(VERSION 3.20) +project(fake_telemetry CXX) + +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Release) +endif() + +find_package(Protobuf REQUIRED) + +set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/../proto/messages.proto) +set(PROTO_SRC ${CMAKE_CURRENT_BINARY_DIR}/messages.pb.cc) +set(PROTO_HDR ${CMAKE_CURRENT_BINARY_DIR}/messages.pb.h) + +add_custom_command( + OUTPUT ${PROTO_SRC} ${PROTO_HDR} + COMMAND $ + --cpp_out=${CMAKE_CURRENT_BINARY_DIR} + -I${CMAKE_CURRENT_SOURCE_DIR}/../proto + ${PROTO_FILE} + DEPENDS ${PROTO_FILE} + VERBATIM +) + +add_executable(fake_telemetry main.cpp ${PROTO_SRC}) +target_include_directories(fake_telemetry PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) +target_link_libraries(fake_telemetry PRIVATE protobuf::libprotobuf) +target_compile_options(fake_telemetry PRIVATE -Wall -Wextra -Wpedantic) diff --git a/server/main.cpp b/server/main.cpp new file mode 100644 index 0000000..206f997 --- /dev/null +++ b/server/main.cpp @@ -0,0 +1,407 @@ +// Fake telemetry server for TelemetryMonitor. Accepts one WebSocket client at +// a time on ws://127.0.0.1:9000 and streams synthetic DataPackets at 1 kHz +// plus a LogPacket every ~3 s. Wire format matches proto/messages.proto. +// +// Build: cmake -S . -B build && cmake --build build -j +// Run: ./build/fake_telemetry [port] + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "messages.pb.h" + +using namespace std::chrono_literals; +namespace pb = telemetry; + +namespace { + +// --------------------------------------------------------------------------- +// SHA-1 (FIPS 180-4) — used once per connection for the WebSocket handshake. +// --------------------------------------------------------------------------- +struct Sha1 { + uint32_t h[5]{0x67452301u, 0xEFCDAB89u, 0x98BADCFEu, 0x10325476u, 0xC3D2E1F0u}; + uint64_t bit_len = 0; + std::array block{}; + size_t block_len = 0; + + void update(std::span data) { + for (uint8_t b : data) { + block[block_len++] = b; + if (block_len == 64) { + transform(); + block_len = 0; + } + } + bit_len += data.size() * 8; + } + + std::array finalize() { + block[block_len++] = 0x80; + if (block_len > 56) { + while (block_len < 64) block[block_len++] = 0; + transform(); + block_len = 0; + } + while (block_len < 56) block[block_len++] = 0; + for (int i = 7; i >= 0; --i) + block[block_len++] = static_cast((bit_len >> (8 * i)) & 0xff); + transform(); + std::array out{}; + for (int i = 0; i < 5; ++i) { + out[i * 4 + 0] = static_cast(h[i] >> 24); + out[i * 4 + 1] = static_cast(h[i] >> 16); + out[i * 4 + 2] = static_cast(h[i] >> 8); + out[i * 4 + 3] = static_cast(h[i]); + } + return out; + } + + void transform() { + uint32_t w[80]; + for (int i = 0; i < 16; ++i) { + w[i] = (uint32_t(block[i * 4]) << 24) | + (uint32_t(block[i * 4 + 1]) << 16) | + (uint32_t(block[i * 4 + 2]) << 8) | + uint32_t(block[i * 4 + 3]); + } + for (int i = 16; i < 80; ++i) + w[i] = std::rotl(w[i - 3] ^ w[i - 8] ^ w[i - 14] ^ w[i - 16], 1); + + uint32_t a = h[0], b = h[1], c = h[2], d = h[3], e = h[4]; + for (int i = 0; i < 80; ++i) { + uint32_t f, k; + if (i < 20) { f = (b & c) | (~b & d); k = 0x5A827999u; } + else if (i < 40) { f = b ^ c ^ d; k = 0x6ED9EBA1u; } + else if (i < 60) { f = (b & c) | (b & d) | (c & d); k = 0x8F1BBCDCu; } + else { f = b ^ c ^ d; k = 0xCA62C1D6u; } + uint32_t t = std::rotl(a, 5) + f + e + k + w[i]; + e = d; + d = c; + c = std::rotl(b, 30); + b = a; + a = t; + } + h[0] += a; h[1] += b; h[2] += c; h[3] += d; h[4] += e; + } +}; + +std::string base64(std::span bytes) { + static constexpr char kTbl[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string out; + out.reserve(((bytes.size() + 2) / 3) * 4); + size_t i = 0; + for (; i + 3 <= bytes.size(); i += 3) { + uint32_t n = (uint32_t(bytes[i]) << 16) | + (uint32_t(bytes[i + 1]) << 8) | uint32_t(bytes[i + 2]); + out.push_back(kTbl[(n >> 18) & 0x3f]); + out.push_back(kTbl[(n >> 12) & 0x3f]); + out.push_back(kTbl[(n >> 6) & 0x3f]); + out.push_back(kTbl[n & 0x3f]); + } + const size_t rem = bytes.size() - i; + if (rem == 1) { + uint32_t n = uint32_t(bytes[i]) << 16; + out.push_back(kTbl[(n >> 18) & 0x3f]); + out.push_back(kTbl[(n >> 12) & 0x3f]); + out.push_back('='); + out.push_back('='); + } else if (rem == 2) { + uint32_t n = (uint32_t(bytes[i]) << 16) | (uint32_t(bytes[i + 1]) << 8); + out.push_back(kTbl[(n >> 18) & 0x3f]); + out.push_back(kTbl[(n >> 12) & 0x3f]); + out.push_back(kTbl[(n >> 6) & 0x3f]); + out.push_back('='); + } + return out; +} + +// --------------------------------------------------------------------------- +// WebSocket handshake + framing. +// --------------------------------------------------------------------------- +constexpr std::string_view kWsGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +std::string accept_key(std::string_view client_key) { + Sha1 s; + s.update({reinterpret_cast(client_key.data()), client_key.size()}); + s.update({reinterpret_cast(kWsGuid.data()), kWsGuid.size()}); + auto digest = s.finalize(); + return base64({digest.data(), digest.size()}); +} + +std::string_view find_header(std::string_view request, std::string_view name) { + size_t pos = 0; + while (pos < request.size()) { + size_t eol = request.find("\r\n", pos); + if (eol == std::string_view::npos) break; + std::string_view line = request.substr(pos, eol - pos); + size_t colon = line.find(':'); + if (colon != std::string_view::npos) { + std::string_view h = line.substr(0, colon); + if (h.size() == name.size() && + std::equal(h.begin(), h.end(), name.begin(), [](char a, char b) { + return std::tolower(static_cast(a)) == + std::tolower(static_cast(b)); + })) { + std::string_view v = line.substr(colon + 1); + while (!v.empty() && (v.front() == ' ' || v.front() == '\t')) + v.remove_prefix(1); + while (!v.empty() && (v.back() == ' ' || v.back() == '\t')) + v.remove_suffix(1); + return v; + } + } + pos = eol + 2; + } + return {}; +} + +bool write_all(int fd, const void* data, size_t len) { + const auto* p = static_cast(data); + while (len > 0) { + ssize_t n = send(fd, p, len, MSG_NOSIGNAL); + if (n <= 0) return false; + p += n; + len -= static_cast(n); + } + return true; +} + +bool handshake(int fd) { + std::string req; + char buf[4096]; + while (req.find("\r\n\r\n") == std::string::npos) { + ssize_t n = recv(fd, buf, sizeof(buf), 0); + if (n <= 0) return false; + req.append(buf, static_cast(n)); + if (req.size() > 64 * 1024) return false; + } + auto key = find_header(req, "Sec-WebSocket-Key"); + if (key.empty()) return false; + std::string resp = "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: " + accept_key(key) + "\r\n\r\n"; + return write_all(fd, resp.data(), resp.size()); +} + +// Server-to-client binary frame (unmasked, FIN set). +bool send_binary(int fd, std::span payload) { + std::array hdr{}; + size_t hdr_len; + hdr[0] = 0x82; // FIN=1, opcode=2 (binary) + if (payload.size() < 126) { + hdr[1] = static_cast(payload.size()); + hdr_len = 2; + } else if (payload.size() < 65536) { + hdr[1] = 126; + uint16_t be = htons(static_cast(payload.size())); + std::memcpy(hdr.data() + 2, &be, 2); + hdr_len = 4; + } else { + hdr[1] = 127; + uint64_t v = payload.size(); + for (int i = 0; i < 8; ++i) + hdr[2 + i] = static_cast((v >> (56 - 8 * i)) & 0xff); + hdr_len = 10; + } + if (!write_all(fd, hdr.data(), hdr_len)) return false; + return write_all(fd, payload.data(), payload.size()); +} + +// --------------------------------------------------------------------------- +// Synthetic signals. +// --------------------------------------------------------------------------- +void fill_data(pb::DataPacket& p, int64_t now_us, uint64_t tick, + std::mt19937_64& rng) { + std::normal_distribution noise(0.0, 0.05); + const double t = now_us * 1e-6; + p.set_timestamp_us(now_us); + p.set_ch1(std::sin(2 * M_PI * 1.0 * t) + noise(rng)); + p.set_ch2(std::sin(2 * M_PI * 0.3 * t) * 2.0 + noise(rng)); + p.set_ch3(std::cos(2 * M_PI * 0.7 * t) + noise(rng)); + p.set_ch4(std::fmod(t * 0.5, 1.0) * 2.0 - 1.0); // saw + p.set_ch5(std::fabs(std::fmod(t * 0.3, 2.0) - 1.0) * 2.0 - 1.0); // tri + p.set_ch6(std::sin(2 * M_PI * 0.5 * t) > 0 ? 1.0 : -1.0); // square + p.set_ch7(std::fmod(t * 0.1, 1.0)); // slow ramp + p.set_ch8(noise(rng) * 4.0); // noise + + auto sine = [&](int i) { + const double freq = 0.1 * i; + const double phase = i * 0.2; + return std::sin(2 * M_PI * freq * t + phase) + noise(rng) * 0.3; + }; + p.set_ch9(sine(9)); + p.set_ch10(sine(10)); + p.set_ch11(sine(11)); + p.set_ch12(sine(12)); + p.set_ch13(sine(13)); + p.set_ch14(sine(14)); + p.set_ch15(sine(15)); + p.set_ch16(sine(16)); + + // Statuses rotate once every 2 s, each offset by one step. + if (tick % 2000 == 0) { + const uint32_t phase = static_cast(tick / 2000); + p.set_status1((phase + 0) % 4); + p.set_status2((phase + 1) % 4); + p.set_status3((phase + 2) % 4); + p.set_status4((phase + 3) % 4); + p.set_status5((phase + 0) % 4); + p.set_status6((phase + 1) % 4); + p.set_status7((phase + 2) % 4); + p.set_status8((phase + 3) % 4); + } +} + +pb::Severity cycle_severity(uint32_t n) { + static constexpr std::array seq{ + pb::INFO, pb::DEBUG, pb::INFO, pb::WARN, + pb::INFO, pb::ERROR, pb::INFO, pb::FATAL, + }; + return seq[n % seq.size()]; +} + +std::string_view cycle_description(uint32_t n) { + static constexpr std::array descs{ + "startup complete", + "checkpoint reached", + "threshold exceeded on channel 3", + "sensor recalibrated", + "queue depth high", + "peer handshake ok", + "cache evicted 128 entries", + "watchdog tickled", + }; + return descs[n % descs.size()]; +} + +// --------------------------------------------------------------------------- +// Per-client service loop. +// --------------------------------------------------------------------------- +void serve(int fd) { + if (!handshake(fd)) { + std::fprintf(stderr, "handshake failed\n"); + close(fd); + return; + } + std::fprintf(stderr, "handshake ok, streaming\n"); + + std::mt19937_64 rng{std::random_device{}()}; + const auto start = std::chrono::steady_clock::now(); + auto next = start; + uint64_t tick = 0; + uint32_t log_seq = 0; + std::string buf; + + while (true) { + std::this_thread::sleep_until(next); + next += 1ms; + + const int64_t now_us = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + + pb::Envelope env; + fill_data(*env.mutable_data(), now_us, tick, rng); + buf.clear(); + env.SerializeToString(&buf); + if (!send_binary(fd, {reinterpret_cast(buf.data()), + buf.size()})) { + break; + } + + if (tick % 3000 == 1500) { + pb::Envelope le; + auto* log = le.mutable_log(); + log->set_timestamp_us(now_us); + log->set_severity(cycle_severity(log_seq)); + log->set_error_number(1000 + log_seq); + log->set_description(std::string(cycle_description(log_seq))); + std::string lbuf; + le.SerializeToString(&lbuf); + if (!send_binary(fd, {reinterpret_cast(lbuf.data()), + lbuf.size()})) { + break; + } + ++log_seq; + } + ++tick; + } + std::fprintf(stderr, "client gone after %lu ticks\n", + static_cast(tick)); + close(fd); +} + +uint16_t parse_port(int argc, char** argv) { + if (argc >= 2) { + uint16_t port = 0; + auto* begin = argv[1]; + auto* end = argv[1] + std::strlen(argv[1]); + auto [p, ec] = std::from_chars(begin, end, port); + if (ec == std::errc{} && p == end) return port; + } + return 9000; +} + +} // namespace + +int main(int argc, char** argv) { + signal(SIGPIPE, SIG_IGN); + GOOGLE_PROTOBUF_VERIFY_VERSION; + + const uint16_t port = parse_port(argc, argv); + int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) { std::perror("socket"); return 1; } + int one = 1; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (bind(listen_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { + std::perror("bind"); + return 1; + } + if (listen(listen_fd, 1) < 0) { + std::perror("listen"); + return 1; + } + std::fprintf(stderr, "fake_telemetry listening on ws://127.0.0.1:%u\n", port); + + while (true) { + int fd = accept(listen_fd, nullptr, nullptr); + if (fd < 0) { + if (errno == EINTR) continue; + std::perror("accept"); + break; + } + std::fprintf(stderr, "client connected\n"); + serve(fd); + } + close(listen_fd); + google::protobuf::ShutdownProtobufLibrary(); + return 0; +}