Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,10 @@ on:
branches: [ main ]

jobs:
format-check:
name: Check code formatting
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install clang-format-18
run: |
sudo apt-get update
sudo apt-get install -y clang-format-18

- name: Check C++ formatting
run: ./scripts/format.sh --check

tidy-check:
name: Run clang-tidy checks
runs-on: ubuntu-latest
container: fedora:42
needs: [format-check]

steps:
- name: Checkout code
Expand Down Expand Up @@ -232,9 +215,7 @@ jobs:
openssl-dev \
openssl-libs-static \
zlib-static \
samurai \
protobuf-dev \
protoc
samurai

- name: Build static musl bundle
run: |
Expand Down
2 changes: 1 addition & 1 deletion cmake/StaticBundle.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ FetchContent_Declare(
)
set(protobuf_BUILD_TESTS OFF CACHE BOOL "" FORCE)
set(protobuf_BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE)
set(protobuf_BUILD_PROTOC_BINARIES OFF CACHE BOOL "" FORCE)
set(protobuf_BUILD_PROTOC_BINARIES ON CACHE BOOL "" FORCE)
set(protobuf_INSTALL OFF CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(protobuf)

Expand Down
17 changes: 6 additions & 11 deletions examples/tck_host_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,12 @@ void TCKHostApplication::handle_prompt_specific(const std::string& message) {
}

std::string metric_name;
auto node_state_opt = host_application_->get_node_state(group_id, edge_node_id);
if (node_state_opt) {
const auto& node_state = node_state_opt->get();
if (!node_state.alias_map.empty()) {
metric_name = node_state.alias_map.begin()->second;
log("INFO",
"Found metric '" + metric_name + "' from NBIRTH, using for command");
}
}

if (metric_name.empty()) {
// Try alias 0 (common first alias in Sparkplug NBIRTH)
auto name_opt = host_application_->get_metric_name(group_id, edge_node_id, "", 0);
if (name_opt) {
metric_name = *name_opt;
log("INFO", "Found metric '" + metric_name + "' from NBIRTH, using for command");
} else {
log("WARN", "No metrics found in NBIRTH, using fallback TestMetric");
metric_name = "TestMetric";
}
Expand Down
4 changes: 2 additions & 2 deletions examples/tck_host_application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class TCKHostApplication : public TCKTestRunner {
void run_message_ordering_test(const std::vector<std::string>& params);
void run_multiple_broker_test(const std::vector<std::string>& params);

[[nodiscard]] auto
establish_session(const std::string& host_id) -> stdx::expected<void, std::string>;
[[nodiscard]] auto establish_session(const std::string& host_id)
-> stdx::expected<void, std::string>;

std::string host_id_;
std::string namespace_prefix_;
Expand Down
3 changes: 1 addition & 2 deletions include/sparkplug/edge_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "topic.hpp"

#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <span>
Expand Down Expand Up @@ -490,4 +489,4 @@ class EdgeNode {
static void on_connection_lost(void* context, char* cause);
};

} // namespace sparkplug
} // namespace sparkplug
35 changes: 25 additions & 10 deletions include/sparkplug/host_application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,31 @@ class HostApplication {
[[nodiscard]] stdx::expected<void, std::string>
subscribe_state(std::string_view host_id);

/**
* @brief Lightweight snapshot of node state (scalars only, no maps).
*
* Returned by value from get_node_state() so callers don't hold references
* into mutex-protected data. Use get_metric_name() for alias resolution.
*/
struct NodeStateSnapshot {
bool is_online{false};
uint64_t last_seq{255};
uint64_t bd_seq{0};
uint64_t birth_timestamp{0};
bool birth_received{false};
};

/**
* @brief Gets the current state of a specific edge node.
*
* @param group_id The group ID
* @param edge_node_id The edge node ID to query
*
* @return NodeState if the node has been seen, std::nullopt otherwise
* @return NodeStateSnapshot if the node has been seen, std::nullopt otherwise
*
* @note Useful for monitoring node online/offline status and bdSeq.
*/
[[nodiscard]] std::optional<std::reference_wrapper<const NodeState>>
[[nodiscard]] std::optional<NodeStateSnapshot>
get_node_state(std::string_view group_id, std::string_view edge_node_id) const;

/**
Expand All @@ -341,16 +355,15 @@ class HostApplication {
* @param device_id The device ID (empty string for node-level metrics)
* @param alias The metric alias to resolve
*
* @return A string_view to the metric name if found, std::nullopt otherwise
* @return The metric name if found, std::nullopt otherwise
*
* @note Returns std::nullopt if the node/device hasn't sent a birth message yet,
* or if the alias is not found in the birth message.
*/
[[nodiscard]] std::optional<std::string_view>
get_metric_name(std::string_view group_id,
std::string_view edge_node_id,
std::string_view device_id,
uint64_t alias) const;
[[nodiscard]] std::optional<std::string> get_metric_name(std::string_view group_id,
std::string_view edge_node_id,
std::string_view device_id,
uint64_t alias) const;

/**
* @brief Publishes a STATE birth message to indicate Host Application is online.
Expand Down Expand Up @@ -420,7 +433,8 @@ class HostApplication {
* @param payload PayloadBuilder containing command metrics (e.g., "Node
* Control/Rebirth")
*
* @return void on success, error message on failure
* @return void on success, error message on failure (fire-and-forget QoS 0,
* only reports local send errors)
*
* @note Common Node Control commands:
* - "Node Control/Rebirth" (bool): Request node to republish NBIRTH
Expand Down Expand Up @@ -451,7 +465,8 @@ class HostApplication {
* @param target_device_id The target device identifier
* @param payload PayloadBuilder containing command metrics
*
* @return void on success, error message on failure
* @return void on success, error message on failure (fire-and-forget QoS 0,
* only reports local send errors)
*
* @par Example Usage
* @code
Expand Down
2 changes: 1 addition & 1 deletion include/sparkplug/payload_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,4 @@ class PayloadBuilder {
bool timestamp_explicitly_set_{false};
};

} // namespace sparkplug
} // namespace sparkplug
7 changes: 3 additions & 4 deletions proto/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# proto/CMakeLists.txt
if(BUILD_STATIC_BUNDLE)
find_program(PROTOC_BIN protoc REQUIRED)
add_custom_command(
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/sparkplug_b.pb.cc ${CMAKE_CURRENT_BINARY_DIR}/sparkplug_b.pb.h
COMMAND ${PROTOC_BIN}
COMMAND $<TARGET_FILE:protoc>
ARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/sparkplug_b.proto
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/sparkplug_b.proto
COMMENT "Generating protobuf code with system protoc"
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/sparkplug_b.proto protoc
COMMENT "Generating protobuf code with fetched protoc"
)
add_library(sparkplug_proto STATIC
${CMAKE_CURRENT_BINARY_DIR}/sparkplug_b.pb.cc
Expand Down
3 changes: 1 addition & 2 deletions scripts/build_static_musl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
#
# Prerequisites (Alpine):
# apk add build-base cmake git bash linux-headers \
# openssl-dev openssl-libs-static zlib-static samurai \
# protobuf-dev protoc
# openssl-dev openssl-libs-static zlib-static samurai

set -e

Expand Down
1 change: 0 additions & 1 deletion src/c_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include <cstring>
#include <format>
#include <memory>

struct sparkplug_publisher {
sparkplug::EdgeNode impl;
Expand Down
62 changes: 38 additions & 24 deletions src/edge_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <cstring>
#include <format>
#include <future>
#include <thread>
#include <utility>

#include <MQTTAsync.h>
Expand All @@ -17,6 +16,26 @@ constexpr int DISCONNECT_TIMEOUT_MS = 11000;
constexpr int SUBSCRIBE_TIMEOUT_MS = 5000;
constexpr uint64_t SEQ_NUMBER_MAX = 256;

// Parse "online" boolean from Sparkplug STATE JSON, tolerating whitespace.
std::optional<bool> parse_state_online(std::string_view json) {
constexpr std::string_view ws = " \t\n\r";
auto key_pos = json.find("\"online\"");
if (key_pos == std::string_view::npos)
return std::nullopt;
auto rest = json.substr(key_pos + 8);
auto colon = rest.find_first_not_of(ws);
if (colon == std::string_view::npos || rest[colon] != ':')
return std::nullopt;
auto val = rest.find_first_not_of(ws, colon + 1);
if (val == std::string_view::npos)
return std::nullopt;
if (rest[val] == 't')
return true;
if (rest[val] == 'f')
return false;
return std::nullopt;
}

void on_connect_success(void* context, MQTTAsync_successData* response) {
(void)response;
auto* promise = static_cast<std::promise<void>*>(context);
Expand Down Expand Up @@ -102,10 +121,8 @@ int EdgeNode::on_message_arrived(void* context,
message->payloadlen);

std::scoped_lock lock(edge_node->mutex_);
if (payload_str.find("\"online\":true") != std::string::npos) {
edge_node->primary_host_online_ = true;
} else if (payload_str.find("\"online\":false") != std::string::npos) {
edge_node->primary_host_online_ = false;
if (auto online = parse_state_online(payload_str)) {
edge_node->primary_host_online_ = *online;
}

MQTTAsync_freeMessage(&message);
Expand Down Expand Up @@ -144,16 +161,19 @@ EdgeNode::~EdgeNode() {
}
}

EdgeNode::EdgeNode(EdgeNode&& other) noexcept
: config_(std::move(other.config_)), client_(std::move(other.client_)),
seq_num_(other.seq_num_), bd_seq_num_(other.bd_seq_num_),
death_payload_data_(std::move(other.death_payload_data_)),
last_birth_payload_(std::move(other.last_birth_payload_)),
device_states_(std::move(other.device_states_)), is_connected_(other.is_connected_)
// mutex_ is default-constructed (mutexes are not moveable)
{
EdgeNode::EdgeNode(EdgeNode&& other) noexcept {
std::scoped_lock lock(other.mutex_);
config_ = std::move(other.config_);
client_ = std::move(other.client_);
seq_num_ = other.seq_num_;
bd_seq_num_ = other.bd_seq_num_;
death_payload_data_ = std::move(other.death_payload_data_);
last_birth_payload_ = std::move(other.last_birth_payload_);
device_states_ = std::move(other.device_states_);
is_connected_ = other.is_connected_;
primary_host_online_ = other.primary_host_online_;
other.is_connected_ = false;
other.primary_host_online_ = false;
}

EdgeNode& EdgeNode::operator=(EdgeNode&& other) noexcept {
Expand Down Expand Up @@ -531,7 +551,7 @@ stdx::expected<void, std::string> EdgeNode::publish_death() {
return stdx::unexpected("Not connected");
}

seq_num_ = (seq_num_ + 1) % 256;
seq_num_ = (seq_num_ + 1) % SEQ_NUMBER_MAX;

PayloadBuilder death_payload;
death_payload.add_metric("bdSeq", bd_seq_num_);
Expand Down Expand Up @@ -593,8 +613,8 @@ stdx::expected<void, std::string> EdgeNode::rebirth() {
proto_payload.set_seq(0);

payload_data.resize(proto_payload.ByteSizeLong());
proto_payload.SerializeToArray(payload_data.data(),
static_cast<int>(payload_data.size()));
(void)proto_payload.SerializeToArray(payload_data.data(),
static_cast<int>(payload_data.size()));
last_birth_payload_ = payload_data;

Topic topic{.group_id = config_.group_id,
Expand All @@ -604,12 +624,6 @@ stdx::expected<void, std::string> EdgeNode::rebirth() {

topic_str = topic.to_string();
qos = config_.data_qos;

// Update NDEATH Will Testament payload with new bdSeq BEFORE disconnecting
// This ensures the Will Testament sent during disconnect has the correct bdSeq
PayloadBuilder death_payload;
death_payload.add_metric("bdSeq", new_bdseq);
death_payload_data_ = death_payload.build();
}

auto result = disconnect()
Expand Down Expand Up @@ -779,7 +793,7 @@ EdgeNode::publish_device_death(std::string_view device_id) {
return stdx::unexpected(std::format("Unknown device: '{}'", device_id));
}

seq_num_ = (seq_num_ + 1) % 256;
seq_num_ = (seq_num_ + 1) % SEQ_NUMBER_MAX;

PayloadBuilder death_payload;
death_payload.set_seq(seq_num_);
Expand Down Expand Up @@ -879,4 +893,4 @@ void EdgeNode::log(LogLevel level, std::string_view message) const noexcept {
}
}

} // namespace sparkplug
} // namespace sparkplug
Loading