Skip to content

Commit

Permalink
Prevent index overflow and correctly assert the end iterator in DataS…
Browse files Browse the repository at this point in the history
…haring (#4321)

* Refs #20227: Add DDS communication test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20227: Prevent history index to overflow history size

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20227: Correctly assert the current end

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20227: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20227: Initialize test timeout argument properly

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20227: Rev suggestion

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored Feb 20, 2024
1 parent 8cff0ca commit 66de89e
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 46 deletions.
5 changes: 4 additions & 1 deletion src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ void DataSharingPayloadPool::advance(
uint64_t& index) const
{
// lower part is the index, upper part is the loop counter
++index;
if (static_cast<uint32_t>(index) + 1 <= descriptor_->history_size)
{
++index;
}
if (static_cast<uint32_t>(index) % descriptor_->history_size == 0)
{
index = ((index >> 32) + 1) << 32;
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/rtps/DataSharing/ReaderPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,15 @@ class ReaderPool : public DataSharingPayloadPool
{
CacheChange_t ch;
SequenceNumber_t last_sequence = c_SequenceNumber_Unknown;
get_next_unread_payload(ch, last_sequence);
while (ch.sequenceNumber != SequenceNumber_t::unknown())
uint64_t current_end = end();
get_next_unread_payload(ch, last_sequence, current_end);
while (ch.sequenceNumber != SequenceNumber_t::unknown() || next_payload_ != current_end)
{
current_end = end();
advance(next_payload_);
get_next_unread_payload(ch, last_sequence);
get_next_unread_payload(ch, last_sequence, current_end);
}
assert(next_payload_ == end());
assert(next_payload_ == current_end);
}

return true;
Expand Down
1 change: 1 addition & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ list(APPEND TEST_DEFINITIONS
zero_copy_sub_communication
mix_zero_copy_communication
close_TCP_client
simple_data_sharing_stress
)


Expand Down
32 changes: 28 additions & 4 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,22 @@ using namespace eprosima::fastdds::dds;
* --xmlfile <path>
* --publishers <int>
* --publisher_loops <int>
* --interval <int>
*/

void publisher_run(
PublisherModule* publisher,
uint32_t wait,
uint32_t samples,
uint32_t loops)
uint32_t loops,
uint32_t interval)
{
if (wait > 0)
{
publisher->wait_discovery(wait);
}

publisher->run(samples, loops);
publisher->run(samples, loops, interval);
}

int main(
Expand All @@ -66,8 +68,10 @@ int main(
uint32_t wait = 0;
uint32_t samples = 4;
uint32_t publishers = 1;
uint32_t timeout = 86400000; // 24 hours in ms
// The first loop could be easily ignored by the reader
uint32_t publisher_loops = 2;
uint32_t interval = 250;
char* xml_file = nullptr;
std::string magic;

Expand Down Expand Up @@ -119,6 +123,26 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--interval expects a parameter" << std::endl;
return -1;
}

interval = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--timeout") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--timeout expects a parameter" << std::endl;
return -1;
}

timeout = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--magic") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -180,11 +204,11 @@ int main(

if (publisher.init(seed, magic))
{
std::thread publisher_thread(publisher_run, &publisher, wait, samples, publisher_loops);
std::thread publisher_thread(publisher_run, &publisher, wait, samples, publisher_loops, interval);

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit) ? 0 : -1;
result = subscriber.run(notexit, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
14 changes: 13 additions & 1 deletion test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using namespace eprosima::fastdds::dds;
* --samples <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
*/

int main(
Expand All @@ -45,6 +46,7 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t interval = 250;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -91,6 +93,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--interval expects a parameter" << std::endl;
return -1;
}

interval = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--magic") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -134,7 +146,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples);
publisher.run(samples, 0, interval);
return 0;
}

Expand Down
17 changes: 9 additions & 8 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ bool PublisherModule::init(
uint32_t seed,
const std::string& magic)
{
std::cout << "Initializing Publisher" << std::endl;
std::cout << "Initializing Publisher" << std::endl;

participant_ =
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this);

if (participant_ == nullptr)
{
std::cout << "Error creating publisher participant" << std::endl;
EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher participant");
return false;
}

Expand All @@ -92,14 +92,14 @@ bool PublisherModule::init(
publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, this);
if (publisher_ == nullptr)
{
std::cout << "Error creating publisher" << std::endl;
EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher");
return false;
}

topic_ = participant_->create_topic(topic_name.str(), type_.get_type_name(), TOPIC_QOS_DEFAULT);
if (topic_ == nullptr)
{
std::cout << "Error creating publisher topic" << std::endl;
EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher topic");
return false;
}

Expand All @@ -111,7 +111,7 @@ bool PublisherModule::init(
writer_ = publisher_->create_datawriter(topic_, wqos, this);
if (writer_ == nullptr)
{
std::cout << "Error creating publisher datawriter" << std::endl;
EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher datawriter");
return false;
}
std::cout << "Writer created correctly in topic " << topic_->get_name()
Expand All @@ -134,7 +134,8 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t loops)
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
Expand Down Expand Up @@ -166,7 +167,7 @@ void PublisherModule::run(
data->message("HelloWorld");
}
}
std::cout << "Publisher writting index " << index << std::endl;
EPROSIMA_LOG_INFO(PUBLISHER_MODULE, "Publisher writting index " << index);
writer_->write(sample);

if (index == samples)
Expand All @@ -184,7 +185,7 @@ void PublisherModule::run(
type_.delete_data(sample);
}

std::this_thread::sleep_for(std::chrono::milliseconds(250));
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
}

Expand Down
3 changes: 2 additions & 1 deletion test/dds/communication/PublisherModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class PublisherModule

void run(
uint32_t samples,
uint32_t loops = 0);
uint32_t loops = 0,
uint32_t interval = 250);

private:

Expand Down
22 changes: 20 additions & 2 deletions test/dds/communication/SubscriberMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ using namespace eprosima::fastdds::dds;
* --magic <str>
* --xmlfile <path>
* --publishers <int>
* --succeed_on_timeout
* --timeout <int>
*/

int main(
Expand All @@ -41,9 +43,11 @@ int main(
bool notexit = false;
bool fixed_type = false;
bool zero_copy = false;
bool succeed_on_timeout = false;
uint32_t seed = 7800;
uint32_t samples = 4;
uint32_t publishers = 1;
uint32_t timeout = 86400000; // 24 h in ms
char* xml_file = nullptr;
std::string magic;

Expand All @@ -61,6 +65,10 @@ int main(
{
zero_copy = true;
}
else if (strcmp(argv[arg_count], "--succeed_on_timeout") == 0)
{
succeed_on_timeout = true;
}
else if (strcmp(argv[arg_count], "--seed") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -91,6 +99,16 @@ int main(

magic = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--timeout") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--run-for expects a parameter" << std::endl;
return -1;
}

timeout = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--xmlfile") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -125,11 +143,11 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, succeed_on_timeout);

if (subscriber.init(seed, magic))
{
return subscriber.run(notexit) ? 0 : -1;
return subscriber.run(notexit, timeout) ? 0 : -1;
}

return -1;
Expand Down
Loading

0 comments on commit 66de89e

Please sign in to comment.