Skip to content

Commit

Permalink
Fix on_sample_lost notification on best-effort readers for fragmented…
Browse files Browse the repository at this point in the history
… samples (#4187)

* Refs #20162. Regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20162. Notify sample lost when dropping fragmented change.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20167. Linters.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20162. Apply suggestions.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20162. Use constexpr for buffer size.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20162. Lower buffer size.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20351. Uncrustify.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 5ac198e)
  • Loading branch information
MiguelCompany authored and mergify[bot] committed Mar 22, 2024
1 parent 3e6b1ab commit 5611e88
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 5 deletions.
20 changes: 20 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,26 @@ bool StatelessReader::processDataFragMsg(
{
if (work_change->sequenceNumber < change_to_add->sequenceNumber)
{
SequenceNumber_t updated_seq = work_change->sequenceNumber;
SequenceNumber_t previous_seq{ 0, 0 };
previous_seq = update_last_notified(writer_guid, updated_seq);

// Notify lost samples
auto listener = getListener();
if (listener != nullptr)
{
if (SequenceNumber_t{ 0, 0 } != previous_seq)
{
assert(previous_seq < updated_seq);
uint64_t tmp = (updated_seq - previous_seq).to64long();
int32_t lost_samples =
tmp > static_cast<uint64_t>(std::numeric_limits<int32_t>::max()) ?
std::numeric_limits<int32_t>::max() : static_cast<int32_t>(tmp);
assert (0 < lost_samples);
listener->on_sample_lost(this, lost_samples);
}
}

// Pending change should be dropped. Check if it can be reused
if (sampleSize <= work_change->serializedPayload.max_size)
{
Expand Down
131 changes: 126 additions & 5 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,16 +674,18 @@ TEST_P(DDSStatus, DataAvailableConditions)
subscriber_reader.wait_waitset_timeout();
}

template<typename T>
void sample_lost_test_dw_init(
PubSubWriter<HelloWorldPubSubType>& writer)
PubSubWriter<T>& writer)
{
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID, writerID;
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
Expand Down Expand Up @@ -711,6 +713,43 @@ void sample_lost_test_dw_init(

return false;
};
testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.4 DataFrag Submessage
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;
uint32_t first_fragment = 0;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);
CDRMessage::readUInt32(&msg, &first_fragment);

// restore buffer pos
msg.pos = old_pos;

// generate losses
if ((writerID.value[3] & 0xC0) == 0 // only user endpoints
&& (1 == first_fragment) // only first fragment
&& (sn == SequenceNumber_t{0, 2} ||
sn == SequenceNumber_t(0, 3) ||
sn == SequenceNumber_t(0, 4) ||
sn == SequenceNumber_t(0, 6) ||
sn == SequenceNumber_t(0, 8) ||
sn == SequenceNumber_t(0, 10) ||
sn == SequenceNumber_t(0, 11) ||
sn == SequenceNumber_t(0, 13)))
{
return true;
}

return false;
};


writer.disable_builtin_transport()
Expand All @@ -721,19 +760,29 @@ void sample_lost_test_dw_init(

}

template<typename T>
void sample_lost_test_dr_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubReader<T>& reader,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init.
// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any
// other possible loss.
constexpr uint32_t BUFFER_SIZE =
300ul * 1024ul // sample size
* 13ul // number of samples
* 2ul; // 2x to avoid any possible loss
reader.socket_buffer_size(BUFFER_SIZE);
reader.sample_lost_status_functor(functor)
.init();

ASSERT_TRUE(reader.isInitialized());
}

template<typename T>
void sample_lost_test_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
PubSubReader<T>& reader,
PubSubWriter<T>& writer,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
sample_lost_test_dw_init(writer);
Expand Down Expand Up @@ -802,6 +851,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr)
});
}

/*!
* \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication.
* This is also a regression test for bug redmine 20162
*/
TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

std::mutex test_step_mtx;
std::condition_variable test_step_cv;
uint8_t test_step = 0;

writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);

sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step](
const eprosima::fastdds::dds::SampleLostStatus& status)
{
{
std::unique_lock<std::mutex> lock(test_step_mtx);
std::cout << status.total_count << " " << status.total_count_change << std::endl;
if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else
{
test_step = 0;
}
}

test_step_cv.notify_all();
});


auto data = default_data300kb_data_generator(13);

reader.startReception(data);
writer.send(data, 100);

std::unique_lock<std::mutex> lock(test_step_mtx);
test_step_cv.wait(lock, [&test_step]()
{
return 7 == test_step;
});
}

/*!
* \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader
* communication.
Expand Down

0 comments on commit 5611e88

Please sign in to comment.