Skip to content

Commit

Permalink
Make get_first_untaken_info() coherent with read()/take() (#4696)
Browse files Browse the repository at this point in the history
* Refs #20706: Add regression BB test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20706: Fix

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20706: Apply review suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit 5fbd88e)
  • Loading branch information
Mario-DL authored and mergify[bot] committed Apr 19, 2024
1 parent 3c95230 commit de4f443
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 4 deletions.
4 changes: 3 additions & 1 deletion include/fastdds/dds/subscriber/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,9 @@ class DataReader : public DomainEntity
const void* instance) const;

/**
* @brief Returns information about the first untaken sample.
* @brief Returns information about the first untaken sample. This method is meant to be called prior to
* a read() or take() operation as it does not modify the status condition of the entity.
*
*
* @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information.
*
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class DataReaderImpl
SampleInfoSeq& sample_infos);

/**
* @brief Returns information about the first untaken sample.
* @brief Returns information about the first untaken sample. This method is meant to be called prior to
* a read() or take() operation as it does not modify the status condition of the entity.
* @param [out] info Pointer to a SampleInfo structure to store first untaken sample information.
* @return true if sample info was returned. false if there is no sample to take.
*/
Expand Down
16 changes: 14 additions & 2 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,21 @@ bool DataReaderHistory::get_first_untaken_info(
for (auto& it : data_available_instances_)
{
auto& instance_changes = it.second->cache_changes;
if (!instance_changes.empty())
for (auto& instance_change : instance_changes)
{
ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front());
WriterProxy* wp = nullptr;
bool is_future_change = false;

if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change))
{
mp_reader->end_sample_access_nts(instance_change, wp, false);
if (is_future_change)
{
continue;
}
}

ReadTakeCommand::generate_info(info, *(it.second), instance_change);
return true;
}
}
Expand Down
106 changes: 106 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <gtest/gtest.h>

#include <fastdds/dds/core/StackAllocatedSequence.hpp>
#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "BlackboxTests.hpp"
Expand Down Expand Up @@ -230,6 +232,110 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

//! Regression test for #20706
//! get_first_untaken_info() returns the first valid change of an instance, not only the first
//! cache change. This implies searching in all the cache changes of the instance.
//! In the scenario of having multiple reliable writers and one reader with history size > 1 in the same topic,
//! it can happen that get_first_untaken_info() returns OK (as it is not currently checking whether the change is in the future)
//! but take() returns NO_DATA because it is waiting for a previous SequenceNumber from the writer.
TEST(DDSDataReader, GetFirstUntakenInfoReturnsTheFirstValidChange)
{
PubSubWriter<HelloWorldPubSubType> writer_1(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldPubSubType> writer_2(TEST_TOPIC_NAME);
// The reader should not take nor read any sample in this test
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, false, false, false);

auto testTransport_1 = std::make_shared<test_UDPv4TransportDescriptor>();

EntityId_t writer1_id;
EntityId_t reader_id;

testTransport_1->drop_data_messages_filter_ =
[&writer1_id, &reader_id](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);

// restore buffer pos
msg.pos = old_pos;

// Loose Seqnum 1
if (writerID == writer1_id &&
readerID == reader_id &&
(sn == SequenceNumber_t{0, 1}))
{
return true;
}

return false;
};

writer_1.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport_1)
.history_depth(3)
.init();

writer_2.history_depth(3)
.init();

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_depth(3)
.init();

ASSERT_TRUE(writer_1.isInitialized());
ASSERT_TRUE(writer_2.isInitialized());
ASSERT_TRUE(reader.isInitialized());

writer1_id = writer_1.datawriter_guid().entityId;
reader_id = reader.datareader_guid().entityId;

// Wait for discovery.
writer_1.wait_discovery();
writer_2.wait_discovery();
reader.wait_discovery(std::chrono::seconds::zero(), 2);

// Send writer_1 samples
auto data = default_helloworld_data_generator(3);

reader.startReception(data);
writer_1.send(data);

// The reader should have received samples 2,3 but not 1
// get_first_untaken_info() should never return OK since the received changes are all in the future.
// We try it several times in case the reader has not received the samples yet.
eprosima::fastdds::dds::SampleInfo info;
for (size_t i = 0; i < 3; i++)
{
ASSERT_NE(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, reader.get_native_reader().get_first_untaken_info(
&info));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

// Now we send data from writer_2 with no drops and all samples shall be received.
data = default_helloworld_data_generator(3);
writer_2.send(data);
reader.block_for_unread_count_of(3);

// get_first_untaken_info() must return OK now
ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
reader.get_native_reader().get_first_untaken_info(&info));
eprosima::fastdds::dds::StackAllocatedSequence<HelloWorld*, 1> data_values;
eprosima::fastdds::dds::SampleInfoSeq sample_infos{1};
// As get_first_untaken_info() returns OK, take() must return OK too
ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
reader.get_native_reader().take(data_values, sample_infos));
}

//! Regression test for Issues #3822 Github #3875
//! This test needs to late join a reader in the same process.
//! Not setting this test as parametrized since it only makes sense in intraprocess.
Expand Down

0 comments on commit de4f443

Please sign in to comment.