Skip to content

Commit

Permalink
New max_message_size property to limit output datagrams size (#4777)
Browse files Browse the repository at this point in the history
* Refs #20489. Parse property in RTPS writer.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849. Parse property in RTPS participant.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849: Add test for RTPS writer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add test for participant

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Uncrustify

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add tests in DDS layer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 6d20b64)

# Conflicts:
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
  • Loading branch information
elianalf authored and mergify[bot] committed May 18, 2024
1 parent ca40f53 commit a6c5945
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 13 deletions.
3 changes: 3 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <chrono>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -527,6 +528,8 @@ class RTPSWriter

//! Flow controller.
fastdds::rtps::FlowController* flow_controller_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

//!WriterHistory
WriterHistory* mp_history = nullptr;
Expand Down
197 changes: 195 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,196 @@ RTPSParticipantImpl::RTPSParticipantImpl(
{
}

<<<<<<< HEAD
=======
void RTPSParticipantImpl::setup_meta_traffic()
{
/* If metatrafficMulticastLocatorList is empty, add mandatory default Locators
Else -> Take them */

// Creation of metatraffic locator and receiver resources
uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_);
metatraffic_unicast_port_ = m_att.port.getUnicastPort(domain_id_, static_cast<uint32_t>(m_att.participantID));
uint32_t meta_multicast_port_for_check = metatraffic_multicast_port;

/* INSERT DEFAULT MANDATORY MULTICAST LOCATORS HERE */
if (m_att.builtin.metatrafficMulticastLocatorList.empty() && m_att.builtin.metatrafficUnicastLocatorList.empty())
{
get_default_metatraffic_locators();
internal_metatraffic_locators_ = true;
}
else
{
if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() &&
0 != m_att.builtin.metatrafficMulticastLocatorList.begin()->port)
{
meta_multicast_port_for_check = m_att.builtin.metatrafficMulticastLocatorList.begin()->port;
}
std::for_each(m_att.builtin.metatrafficMulticastLocatorList.begin(),
m_att.builtin.metatrafficMulticastLocatorList.end(), [&](Locator_t& locator)
{
m_network_Factory.fillMetatrafficMulticastLocator(locator, metatraffic_multicast_port);
});
m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList);

std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(),
m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
m_network_Factory.fillMetatrafficUnicastLocator(locator, metatraffic_unicast_port_);
});
m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList);
}

if (is_intraprocess_only())
{
m_att.builtin.metatrafficUnicastLocatorList.clear();
}

createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false, true);
createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, false, false, true);

// Check metatraffic multicast port
if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() &&
m_att.builtin.metatrafficMulticastLocatorList.begin()->port != meta_multicast_port_for_check)
{
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT,
"Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened."
" It may is opened by another application. Discovery may fail.");
}

namespace external_locators = fastdds::rtps::network::external_locators;
external_locators::set_listening_locators(m_att.builtin.metatraffic_external_unicast_locators,
m_att.builtin.metatrafficUnicastLocatorList);
}

void RTPSParticipantImpl::setup_user_traffic()
{
// Creation of user locator and receiver resources
//If no default locators are defined we define some.
/* The reasoning here is the following.
If the parameters of the RTPS Participant don't hold default listening locators for the creation
of Endpoints, we make some for Unicast only.
If there is at least one listen locator of any kind, we do not create any default ones.
If there are no sending locators defined, we create default ones for the transports we implement.
*/
if (m_att.defaultUnicastLocatorList.empty() && m_att.defaultMulticastLocatorList.empty())
{
//Default Unicast Locators in case they have not been provided
/* INSERT DEFAULT UNICAST LOCATORS FOR THE PARTICIPANT */
get_default_unicast_locators();
internal_default_locators_ = true;
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
m_att.getName() << " Created with NO default Unicast Locator List, adding Locators:"
<< m_att.defaultUnicastLocatorList);
}
else
{
// Locator with port 0, calculate port.
uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1;
std::for_each(m_att.defaultUnicastLocatorList.begin(), m_att.defaultUnicastLocatorList.end(),
[&](Locator_t& loc)
{
m_network_Factory.fill_default_locator_port(loc, unicast_port);
});
m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList);

// Locator with port 0, calculate port.
uint32_t multicast_port = m_network_Factory.calculate_well_known_port(domain_id_, m_att, true);
std::for_each(m_att.defaultMulticastLocatorList.begin(), m_att.defaultMulticastLocatorList.end(),
[&](Locator_t& loc)
{
m_network_Factory.fill_default_locator_port(loc, multicast_port);
});
}

if (is_intraprocess_only())
{
m_att.defaultUnicastLocatorList.clear();
m_att.defaultMulticastLocatorList.clear();
}

createReceiverResources(m_att.defaultUnicastLocatorList, true, false, true);
createReceiverResources(m_att.defaultMulticastLocatorList, false, false, true);

namespace external_locators = fastdds::rtps::network::external_locators;
external_locators::set_listening_locators(m_att.default_external_unicast_locators,
m_att.defaultUnicastLocatorList);
}

void RTPSParticipantImpl::setup_initial_peers()
{
// Initial peers
if (m_att.builtin.initialPeersList.empty())
{
m_att.builtin.initialPeersList = m_att.builtin.metatrafficMulticastLocatorList;
}
else
{
LocatorList_t initial_peers;
initial_peers.swap(m_att.builtin.initialPeersList);

std::for_each(initial_peers.begin(), initial_peers.end(),
[&](Locator_t& locator)
{
m_network_Factory.configureInitialPeerLocator(domain_id_, locator, m_att);
});
}
}

void RTPSParticipantImpl::setup_output_traffic()
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic;
size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number;
if (num_send_buffers == 0)
{
// Two buffers (user, events)
num_send_buffers = 2;
// Add one buffer per reception thread
num_send_buffers += m_receiverResourcelist.size();
}

// Create buffer pool
send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers));
send_buffers_->init(this);

// Initialize flow controller factory.
// This must be done after initiate network layer.
flow_controller_factory_.init(this);

// Support old API
if (m_att.throughputController.bytesPerPeriod != UINT32_MAX && m_att.throughputController.periodMillisecs != 0)
{
fastdds::rtps::FlowControllerDescriptor old_descriptor;
old_descriptor.name = guid_str_.c_str();
old_descriptor.max_bytes_per_period = m_att.throughputController.bytesPerPeriod;
old_descriptor.period_ms = m_att.throughputController.periodMillisecs;
flow_controller_factory_.register_flow_controller(old_descriptor);
}

// Register user's flow controllers.
for (auto flow_controller_desc : m_att.flow_controllers)
{
flow_controller_factory_.register_flow_controller(*flow_controller_desc.get());
}
}

>>>>>>> 6d20b649b (New `max_message_size` property to limit output datagrams size (#4777))
void RTPSParticipantImpl::enable()
{
mp_builtinProtocols->enable();
Expand Down Expand Up @@ -2191,8 +2381,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const
#endif // if HAVE_SECURITY

return (std::min)(
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size);
{
max_output_message_size_,
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size
});
}

uint32_t RTPSParticipantImpl::getMaxDataSize()
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <limits>
#include <list>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -594,6 +595,8 @@ class RTPSParticipantImpl
std::function<bool(const std::string&)> type_check_fn_;
//!Pool of send buffers
std::unique_ptr<SendBuffersManager> send_buffers_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

/**
* Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed
Expand Down
40 changes: 29 additions & 11 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
*
*/

#include <mutex>

#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <rtps/participant/RTPSParticipantImpl.h>
#include <mutex>

#include <fastdds/dds/log/Log.hpp>

#include <fastdds/rtps/writer/RTPSWriter.h>

#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/history/WriterHistory.h>

#include <fastdds/rtps/messages/RTPSMessageCreator.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
#include <rtps/participant/RTPSParticipantImpl.h>

#include <statistics/rtps/StatisticsBase.hpp>
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>

Expand Down Expand Up @@ -109,6 +107,22 @@ void RTPSWriter::init(
const std::shared_ptr<IChangePool>& change_pool,
const WriterAttributes& att)
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

payload_pool_ = payload_pool;
change_pool_ = change_pool;
fixed_payload_size_ = 0;
Expand Down Expand Up @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize()
uint32_t flow_max = flow_controller_->get_max_payload();
uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize();
uint32_t max_size = flow_max > part_max ? part_max : flow_max;
if (max_output_message_size_ < max_size)
{
max_size = max_output_message_size_;
}

max_size = calculateMaxDataSize(max_size);
return max_size &= ~3;
Expand Down
Loading

0 comments on commit a6c5945

Please sign in to comment.