Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions libudpard/udpard.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ typedef unsigned char byte_t; ///< For compatibility with platforms where byte s
/// The number of most recent transfers to keep in the history for duplicate rejection.
/// Should be a power of two to allow replacement of modulo operation with a bitwise AND.
///
/// Implementation node: we used to store bitmap windows instead of a full list of recent transfer-IDs, but they
/// Implementation note: we used to store bitmap windows instead of a full list of recent transfer-IDs, but they
/// were found to offer no advantage except in the perfect scenario of non-restarting senders, and an increased
/// implementation complexity (more branches, more lines of code), so they were replaced with a simple list.
/// The list works equally well given a non-contiguous transfer-ID stream, unlike the bitmap, thus more robust.
Expand Down Expand Up @@ -1584,7 +1584,7 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
rx_slot_destroy(slot_ref, self->port->memory.fragment, self->port->memory.slot);
}

/// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one. Returns NULL of OOM.
/// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one. Returns NULL on OOM.
/// We return a pointer to pointer to allow the caller to NULL out the slot on destruction.
static rx_slot_t** rx_session_get_slot(rx_session_t* const self, const udpard_us_t ts, const uint64_t transfer_id)
{
Expand Down Expand Up @@ -1705,8 +1705,8 @@ static void rx_port_accept_stateful(udpard_rx_t* const rx,
}
}

/// The stateless strategy accepts only single-frame transfers and does not maintain any session state.
/// It could be trivially extended to fallback to UNORDERED when multi-frame transfers are detected.
/// The stateless strategy accepts transfers that fit in the first frame after extent truncation.
/// It does not maintain any session state.
static void rx_port_accept_stateless(udpard_rx_t* const rx,
udpard_rx_port_t* const port,
const udpard_us_t timestamp,
Expand All @@ -1715,6 +1715,8 @@ static void rx_port_accept_stateless(udpard_rx_t* const rx,
const udpard_deleter_t payload_deleter,
const uint_fast8_t iface_index)
{
// Stateless subscriptions only care about the prefix up to the configured extent.
// If the first frame already covers that much payload, the rest of the transfer is ignored.
const size_t required_size = smaller(port->extent, frame->meta.transfer_payload_size);
const bool full_transfer = (frame->base.offset == 0) && (frame->base.payload.size >= required_size);
if (full_transfer) {
Expand Down
6 changes: 3 additions & 3 deletions libudpard/udpard.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,9 @@ bool udpard_rx_port_new(udpard_rx_port_t* const self,
const udpard_rx_mem_resources_t memory,
const udpard_rx_port_vtable_t* const vtable);

/// A specialization of udpard_rx_port_new() for scalable stateless subscriptions, where only single-frame transfers
/// are accepted, and no attempt at deduplication is made. This is useful for the heartbeat topic mostly, and perhaps
/// other topics with a great number of publishers and/or very high traffic.
/// A specialization of udpard_rx_port_new() for scalable stateless subscriptions, where only the prefix up to the
/// configured extent is accepted from the first frame, and no attempt at deduplication is made. This is useful for
/// the heartbeat topic mostly, and perhaps other topics with a great number of publishers and/or very high traffic.
bool udpard_rx_port_new_stateless(udpard_rx_port_t* const self,
const size_t extent,
const udpard_rx_mem_resources_t memory,
Expand Down
4 changes: 2 additions & 2 deletions tests/src/test_e2e_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void test_subject_roundtrip()
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 1024U, rx_mem, &rx_vtable));

// Send one multi-frame transfer over two interfaces.
std::vector<uint8_t> payload(300U);
std::vector<uint8_t> payload(600U);
for (std::size_t i = 0; i < payload.size(); i++) {
payload[i] = static_cast<uint8_t>(i);
}
Expand All @@ -133,7 +133,7 @@ void test_subject_roundtrip()
make_scattered(payload.data(), payload.size()),
nullptr));
udpard_tx_poll(&tx, 1001, UDPARD_IFACE_BITMAP_ALL);
TEST_ASSERT_TRUE(!frames.empty());
TEST_ASSERT_TRUE(frames.size() > 1U);

// Deliver the first interface copy only.
for (const auto& frame : frames) {
Expand Down
157 changes: 155 additions & 2 deletions tests/src/test_e2e_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void test_out_of_order_multiframe_reassembly()
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, 4096U, rx_mem, &rx_vtable));

// Send a payload that spans multiple frames.
std::vector<std::uint8_t> payload(280U);
std::vector<std::uint8_t> payload(600U);
for (std::size_t i = 0; i < payload.size(); i++) {
payload[i] = static_cast<std::uint8_t>(i ^ 0x5AU);
}
Expand All @@ -214,7 +214,7 @@ void test_out_of_order_multiframe_reassembly()
make_scattered(payload.data(), payload.size()),
nullptr));
udpard_tx_poll(&tx, 1001, UDPARD_IFACE_BITMAP_ALL);
TEST_ASSERT_TRUE(!frames.empty());
TEST_ASSERT_TRUE(frames.size() > 1U);

// Deliver frames in reverse order to exercise out-of-order reassembly.
std::reverse(frames.begin(), frames.end());
Expand Down Expand Up @@ -243,6 +243,155 @@ void test_out_of_order_multiframe_reassembly()
instrumented_allocator_reset(&rx_alloc_fragment);
}

void test_stateless_single_frame_acceptance()
{
seed_prng();

// Configure TX and RX.
instrumented_allocator_t tx_alloc_transfer{};
instrumented_allocator_t tx_alloc_payload{};
instrumented_allocator_t rx_alloc_session{};
instrumented_allocator_t rx_alloc_fragment{};
instrumented_allocator_new(&tx_alloc_transfer);
instrumented_allocator_new(&tx_alloc_payload);
instrumented_allocator_new(&rx_alloc_session);
instrumented_allocator_new(&rx_alloc_fragment);

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x1234123412341234ULL, 777U, 8U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
tx.mtu[0] = 128U;
tx.mtu[1] = 128U;
tx.mtu[2] = 128U;
tx.user = &frames;

const auto rx_mem = make_rx_mem(rx_alloc_session, rx_alloc_fragment);
const udpard_deleter_t del = instrumented_allocator_make_deleter(&rx_alloc_fragment);
udpard_rx_t rx{};
udpard_rx_port_t port{};
RxState state{};
udpard_rx_new(&rx);
rx.user = &state;
TEST_ASSERT_TRUE(udpard_rx_port_new_stateless(&port, 1U, rx_mem, &rx_vtable));

// Send and deliver one single-frame transfer.
const std::vector<std::uint8_t> payload{ 0x10U, 0x20U, 0x30U, 0x40U };
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
100U,
10000U,
1U,
udpard_prio_nominal,
88U,
udpard_make_subject_endpoint(66U),
make_scattered(payload.data(), payload.size()),
nullptr));
udpard_tx_poll(&tx, 101U, UDPARD_IFACE_BITMAP_ALL);
TEST_ASSERT_EQUAL_size_t(1U, frames.size());

deliver(frames.front(), rx_mem.fragment, del, &rx, &port, 200U);
udpard_rx_poll(&rx, 201U);
TEST_ASSERT_EQUAL_size_t(1U, state.count);
TEST_ASSERT_EQUAL_size_t(payload.size(), state.payload.size());
TEST_ASSERT_EQUAL_MEMORY(payload.data(), state.payload.data(), payload.size());
TEST_ASSERT_EQUAL_UINT64(0U, rx.errors_transfer_malformed);

// Release all resources.
udpard_rx_port_free(&rx, &port);
udpard_tx_free(&tx);
TEST_ASSERT_EQUAL_size_t(0U, tx_alloc_transfer.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, tx_alloc_payload.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, rx_alloc_session.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, rx_alloc_fragment.allocated_fragments);
instrumented_allocator_reset(&tx_alloc_transfer);
instrumented_allocator_reset(&tx_alloc_payload);
instrumented_allocator_reset(&rx_alloc_session);
instrumented_allocator_reset(&rx_alloc_fragment);
}

void test_stateless_multiframe_first_frame_handling(const std::size_t extent, const bool expect_accept)
{
seed_prng();

// Configure TX and RX.
instrumented_allocator_t tx_alloc_transfer{};
instrumented_allocator_t tx_alloc_payload{};
instrumented_allocator_t rx_alloc_session{};
instrumented_allocator_t rx_alloc_fragment{};
instrumented_allocator_new(&tx_alloc_transfer);
instrumented_allocator_new(&tx_alloc_payload);
instrumented_allocator_new(&rx_alloc_session);
instrumented_allocator_new(&rx_alloc_fragment);

udpard_tx_t tx{};
std::vector<CapturedFrame> frames;
TEST_ASSERT_TRUE(udpard_tx_new(
&tx, 0x5555666677778888ULL, 999U, 16U, make_tx_mem(tx_alloc_transfer, tx_alloc_payload), &tx_vtable));
tx.mtu[0] = 128U;
tx.mtu[1] = 128U;
tx.mtu[2] = 128U;
tx.user = &frames;

const auto rx_mem = make_rx_mem(rx_alloc_session, rx_alloc_fragment);
const udpard_deleter_t del = instrumented_allocator_make_deleter(&rx_alloc_fragment);
udpard_rx_t rx{};
udpard_rx_port_t port{};
RxState state{};
udpard_rx_new(&rx);
rx.user = &state;
TEST_ASSERT_TRUE(udpard_rx_port_new_stateless(&port, extent, rx_mem, &rx_vtable));

// Emit a transfer that is guaranteed to span multiple frames.
std::vector<std::uint8_t> payload(600U);
for (std::size_t i = 0; i < payload.size(); i++) {
payload[i] = static_cast<std::uint8_t>(i);
}
TEST_ASSERT_TRUE(udpard_tx_push(&tx,
1000U,
100000U,
1U,
udpard_prio_nominal,
99U,
udpard_make_subject_endpoint(67U),
make_scattered(payload.data(), payload.size()),
nullptr));
udpard_tx_poll(&tx, 1001U, UDPARD_IFACE_BITMAP_ALL);
TEST_ASSERT_TRUE(frames.size() > 1U);

// Deliver only the first frame. Stateless mode may accept it if the configured extent is already covered.
deliver(frames.front(), rx_mem.fragment, del, &rx, &port, 2000U);
udpard_rx_poll(&rx, 2001U);
if (expect_accept) {
TEST_ASSERT_EQUAL_size_t(1U, state.count);
TEST_ASSERT_EQUAL_UINT64(0U, rx.errors_transfer_malformed);
TEST_ASSERT_EQUAL_size_t(payload.size(), state.payload_size_wire);
TEST_ASSERT_GREATER_OR_EQUAL_size_t(std::min(extent, payload.size()), state.payload.size());
TEST_ASSERT_LESS_THAN_size_t(payload.size(), state.payload.size());
TEST_ASSERT_EQUAL_MEMORY(payload.data(), state.payload.data(), state.payload.size());
} else {
TEST_ASSERT_EQUAL_size_t(0U, state.count);
TEST_ASSERT_EQUAL_UINT64(1U, rx.errors_transfer_malformed);
}

// Release all resources.
udpard_rx_port_free(&rx, &port);
udpard_tx_free(&tx);
TEST_ASSERT_EQUAL_size_t(0U, tx_alloc_transfer.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, tx_alloc_payload.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, rx_alloc_session.allocated_fragments);
TEST_ASSERT_EQUAL_size_t(0U, rx_alloc_fragment.allocated_fragments);
instrumented_allocator_reset(&tx_alloc_transfer);
instrumented_allocator_reset(&tx_alloc_payload);
instrumented_allocator_reset(&rx_alloc_session);
instrumented_allocator_reset(&rx_alloc_fragment);
}

void test_stateless_multiframe_truncation_small_extent() { test_stateless_multiframe_first_frame_handling(10U, true); }

void test_stateless_multiframe_truncation_zero_extent() { test_stateless_multiframe_first_frame_handling(0U, true); }

void test_stateless_multiframe_rejection_large_extent() { test_stateless_multiframe_first_frame_handling(600U, false); }

} // namespace

void setUp() {}
Expand All @@ -253,5 +402,9 @@ int main()
UNITY_BEGIN();
RUN_TEST(test_zero_payload_transfer);
RUN_TEST(test_out_of_order_multiframe_reassembly);
RUN_TEST(test_stateless_single_frame_acceptance);
RUN_TEST(test_stateless_multiframe_truncation_small_extent);
RUN_TEST(test_stateless_multiframe_truncation_zero_extent);
RUN_TEST(test_stateless_multiframe_rejection_large_extent);
return UNITY_END();
}
Loading