Skip to content

Commit

Permalink
Allow processing of AckNack submessages with count == 0 (#4639) (#4774)
Browse files Browse the repository at this point in the history
* Allow processing of AckNack submessages with count == 0 (#4639)

* Refs #20729. Regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20729. Fix issue.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20729. Update doxydoc.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20729. Dont create timers if participant is nullptr.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 66fc7c5)

# Conflicts:
#	test/unittest/rtps/writer/ReaderProxyTests.cpp

* Fix conflicts

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
mergify[bot] and MiguelCompany authored May 16, 2024
1 parent d9c275f commit f8d09ba
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 28 deletions.
15 changes: 9 additions & 6 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,19 @@ class ReaderProxy
}

/**
* Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
* Called when an ACKNACK is received to set a new value for the minimum count accepted for following received
* ACKNACKs.
*
* @param acknack_count The count of the received ACKNACK.
* @return true if internal count changed (i.e. new ACKNACK is accepted)
* @return true if internal count changed (i.e. received ACKNACK is accepted)
*/
bool check_and_set_acknack_count(
uint32_t acknack_count)
{
if (last_acknack_count_ < acknack_count)
if (acknack_count >= next_expected_acknack_count_)
{
last_acknack_count_ = acknack_count;
next_expected_acknack_count_ = acknack_count;
++next_expected_acknack_count_;
return true;
}

Expand Down Expand Up @@ -423,8 +426,8 @@ class ReaderProxy
TimedEvent* initial_heartbeat_event_;
//! Are timed events enabled?
std::atomic_bool timers_enabled_;
//! Last ack/nack count
uint32_t last_acknack_count_;
//! Next expected ack/nack count
uint32_t next_expected_acknack_count_;
//! Last NACKFRAG count.
uint32_t last_nackfrag_count_;

Expand Down
54 changes: 32 additions & 22 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,27 @@ ReaderProxy::ReaderProxy(
, nack_supression_event_(nullptr)
, initial_heartbeat_event_(nullptr)
, timers_enabled_(false)
, last_acknack_count_(0)
, next_expected_acknack_count_(0)
, last_nackfrag_count_(0)
{
nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
[&]() -> bool
{
writer_->perform_nack_supression(guid());
return false;
},
TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));
auto participant = writer_->getRTPSParticipant();
if (nullptr != participant)
{
nack_supression_event_ = new TimedEvent(participant->getEventResource(),
[&]() -> bool
{
writer_->perform_nack_supression(guid());
return false;
},
TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));

initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
[&]() -> bool
{
writer_->intraprocess_heartbeat(this);
return false;
}, 0);
initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(),
[&]() -> bool
{
writer_->intraprocess_heartbeat(this);
return false;
}, 0);
}

stop();
}
Expand Down Expand Up @@ -135,7 +139,7 @@ void ReaderProxy::start(
}

timers_enabled_.store(is_remote_and_reliable());
if (is_local_reader())
if (is_local_reader() && initial_heartbeat_event_)
{
initial_heartbeat_event_->restart_timer();
}
Expand Down Expand Up @@ -166,32 +170,38 @@ void ReaderProxy::stop()
disable_timers();

changes_for_reader_.clear();
last_acknack_count_ = 0;
next_expected_acknack_count_ = 0;
last_nackfrag_count_ = 0;
changes_low_mark_ = SequenceNumber_t();
}

void ReaderProxy::disable_timers()
{
if (timers_enabled_.exchange(false))
if (timers_enabled_.exchange(false) && nack_supression_event_)
{
nack_supression_event_->cancel_timer();
}
initial_heartbeat_event_->cancel_timer();
if (initial_heartbeat_event_)
{
initial_heartbeat_event_->cancel_timer();
}
}

void ReaderProxy::update_nack_supression_interval(
const Duration_t& interval)
{
nack_supression_event_->update_interval(interval);
if (nack_supression_event_)
{
nack_supression_event_->update_interval(interval);
}
}

void ReaderProxy::add_change(
const ChangeForReader_t& change,
bool is_relevant,
bool restart_nack_supression)
{
if (restart_nack_supression && timers_enabled_.load())
if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_)
{
nack_supression_event_->restart_timer();
}
Expand All @@ -205,7 +215,7 @@ void ReaderProxy::add_change(
bool restart_nack_supression,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
if (restart_nack_supression && timers_enabled_)
if (restart_nack_supression && timers_enabled_ && nack_supression_event_)
{
nack_supression_event_->restart_timer(max_blocking_time);
}
Expand Down Expand Up @@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status(
// It will use acked_changes_set().
assert(is_reliable_);

if (restart_nack_supression && is_remote_and_reliable())
if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_)
{
assert(timers_enabled_.load());
nack_supression_event_->restart_timer();
Expand Down
32 changes: 32 additions & 0 deletions test/unittest/rtps/writer/ReaderProxyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,38 @@ TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_te
TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u);
}

// Test expectations regarding acknack count.
// Serves as a regression test for redmine issue #20729.
TEST(ReaderProxyTests, acknack_count)
{
StatefulWriter writer_mock;
WriterTimes w_times;
RemoteLocatorsAllocationAttributes alloc;
ReaderProxy rproxy(w_times, alloc, &writer_mock);

ReaderProxyData reader_attributes(0, 0);
reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
rproxy.start(reader_attributes);

// Check that the initial acknack count is 0.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(0u));
// Check that it is not accepted twice.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u));
// Check that it is accepted if it is incremented.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(1u));
// Check that it is not accepted twice.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(1u));
// Check that it is not accepted if it is decremented.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u));
// Check that it is accepted if it has a big increment.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(100u));
// Check that previous values are rejected.
for (uint32_t i = 0; i <= 100u; ++i)
{
EXPECT_FALSE(rproxy.check_and_set_acknack_count(i));
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down

0 comments on commit f8d09ba

Please sign in to comment.