Skip to content

Commit

Permalink
TCP unique client announced local port (#4216) (#4284)
Browse files Browse the repository at this point in the history
* TCP unique client announced local port (#4216)

* Refs #20179: Guarantee unique announced client local port

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Regression tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Prevent from join() deadlock (ReceiverResource mtx taken)

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Fix windows release tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Apply sugestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20179: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
(cherry picked from commit b43f3a0)

# Conflicts:
#	src/cpp/rtps/transport/TCPTransportInterface.cpp

* Refs #20179: Solve conflicts

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
Co-authored-by: Jesús Pérez <78275223+jepemi@users.noreply.github.com>
Co-authored-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
3 people authored Jan 30, 2024
1 parent e09870f commit 5290e99
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 92 deletions.
106 changes: 58 additions & 48 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ void TCPTransportInterface::clean()
}
}

if (initial_peer_local_locator_socket_)
{
if (initial_peer_local_locator_socket_->is_open())
{
initial_peer_local_locator_socket_->close();
}

initial_peer_local_locator_socket_.reset();
}

if (io_service_thread_)
{
io_service_.stop();
Expand Down Expand Up @@ -362,37 +372,46 @@ bool TCPTransportInterface::init(
EPROSIMA_LOG_WARNING(TLS, "Error configuring TLS, using TCP transport without security");
}

if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0)
/*
Open and bind a socket to obtain a unique port. This port is assigned to PDP passed locators.
Although real client socket local port will differ, this ensures uniqueness for server's channel
resources mapping (uses client locators as keys).
Open and bind a socket to obtain a unique port. This unique port is assigned to to PDP passed locators.
This process ensures uniqueness in the server's channel resources mapping, which uses client locators as keys.
Although differing from the real client socket local port, provides a reliable mapping mechanism.
*/
initial_peer_local_locator_socket_ = std::unique_ptr<asio::ip::tcp::socket>(new asio::ip::tcp::socket(io_service_));
initial_peer_local_locator_socket_->open(generate_protocol());

// Binding to port 0 delegates the port selection to the system.
initial_peer_local_locator_socket_->bind(asio::ip::tcp::endpoint(generate_protocol(), 0));

ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint();
initial_peer_local_locator_port_ = local_endpoint.port();

// Check system buffer sizes.
if (configuration()->sendBufferSize == 0)
{
// Check system buffer sizes.
ip::tcp::socket socket(io_service_);
socket.open(generate_protocol());
socket_base::send_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_send_buffer_size(option.value());

if (configuration()->sendBufferSize == 0)
if (configuration()->sendBufferSize < s_minimumSocketBuffer)
{
socket_base::send_buffer_size option;
socket.get_option(option);
set_send_buffer_size(option.value());

if (configuration()->sendBufferSize < s_minimumSocketBuffer)
{
set_send_buffer_size(s_minimumSocketBuffer);
}
set_send_buffer_size(s_minimumSocketBuffer);
}
}

if (configuration()->receiveBufferSize == 0)
{
socket_base::receive_buffer_size option;
socket.get_option(option);
set_receive_buffer_size(option.value());
if (configuration()->receiveBufferSize == 0)
{
socket_base::receive_buffer_size option;
initial_peer_local_locator_socket_->get_option(option);
set_receive_buffer_size(option.value());

if (configuration()->receiveBufferSize < s_minimumSocketBuffer)
{
set_receive_buffer_size(s_minimumSocketBuffer);
}
if (configuration()->receiveBufferSize < s_minimumSocketBuffer)
{
set_receive_buffer_size(s_minimumSocketBuffer);
}

socket.close();
}

if (configuration()->maxMessageSize > s_maximumMessageSize)
Expand Down Expand Up @@ -1455,18 +1474,7 @@ bool TCPTransportInterface::fillMetatrafficUnicastLocator(
{
if (IPLocator::getPhysicalPort(locator.port) == 0)
{
const TCPTransportDescriptor* config = configuration();
if (config != nullptr)
{
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
}
fill_local_physical_port(locator);
}

if (IPLocator::getLogicalPort(locator) == 0)
Expand Down Expand Up @@ -1524,18 +1532,7 @@ bool TCPTransportInterface::fillUnicastLocator(
{
if (IPLocator::getPhysicalPort(locator.port) == 0)
{
const TCPTransportDescriptor* config = configuration();
if (config != nullptr)
{
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
}
fill_local_physical_port(locator);
}

if (IPLocator::getLogicalPort(locator) == 0)
Expand Down Expand Up @@ -1723,6 +1720,19 @@ bool TCPTransportInterface::is_localhost_allowed() const
return is_locator_allowed(local_locator);
}

void TCPTransportInterface::fill_local_physical_port(
Locator& locator) const
{
if (!configuration()->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(configuration()->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, initial_peer_local_locator_port_);
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
10 changes: 10 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class TCPTransportInterface : public TransportInterface
std::vector<fastrtps::rtps::IPFinder::info_IP> current_interfaces_;
asio::io_service io_service_;
asio::io_service io_service_timers_;
std::unique_ptr<asio::ip::tcp::socket> initial_peer_local_locator_socket_;
uint16_t initial_peer_local_locator_port_;

#if TLS_FOUND
asio::ssl::context ssl_context_;
#endif // if TLS_FOUND
Expand Down Expand Up @@ -434,6 +437,13 @@ class TCPTransportInterface : public TransportInterface
void update_network_interfaces() override;

bool is_localhost_allowed() const override;

/**
* Method to fill local locator physical port.
* @param locator locator to be filled.
*/
void fill_local_physical_port(
Locator& locator) const;
};

} // namespace rtps
Expand Down
11 changes: 2 additions & 9 deletions src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,11 @@ TCPTransactionId RTCPMessageManager::sendConnectionRequest(
Locator locator;
mTransport->endpoint_to_locator(channel->local_endpoint(), locator);

auto config = mTransport->configuration();
if (!config->listening_ports.empty())
{
IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin()));
}
else
{
IPLocator::setPhysicalPort(locator, static_cast<uint16_t>(SystemInfo::instance().process_id()));
}
mTransport->fill_local_physical_port(locator);

if (locator.kind == LOCATOR_KIND_TCPv4)
{
auto config = mTransport->configuration();
const TCPv4TransportDescriptor* pTCPv4Desc = static_cast<TCPv4TransportDescriptor*>(config);
IPLocator::setWan(locator, pTCPv4Desc->wan_addr[0], pTCPv4Desc->wan_addr[1], pTCPv4Desc->wan_addr[2],
pTCPv4Desc->wan_addr[3]);
Expand Down
37 changes: 37 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,43 @@ TEST_F(TCPv4Tests, autofill_port)
EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3);
}

// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
{
TCPv4TransportDescriptor recvDescriptor;
recvDescriptor.add_listener_port(g_default_port);
MockTCPv4Transport receiveTransportUnderTest(recvDescriptor);
receiveTransportUnderTest.init();

TCPv4TransportDescriptor sendDescriptor_1;
TCPv4Transport sendTransportUnderTest_1(sendDescriptor_1);
sendTransportUnderTest_1.init();

TCPv4TransportDescriptor sendDescriptor_2;
TCPv4Transport sendTransportUnderTest_2(sendDescriptor_2);
sendTransportUnderTest_2.init();

Locator_t outputLocator;
outputLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(outputLocator, 127, 0, 0, 1);
outputLocator.port = g_default_port;
IPLocator::setLogicalPort(outputLocator, 7410);

SendResourceList send_resource_list_1;
ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator));
ASSERT_FALSE(send_resource_list_1.empty());

SendResourceList send_resource_list_2;
ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator));
ASSERT_FALSE(send_resource_list_2.empty());

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
38 changes: 38 additions & 0 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fastrtps/utils/Semaphore.h>

#include <MockReceiverResource.h>
#include "mock/MockTCPv6Transport.h"
#include <rtps/network/NetworkFactory.h>
#include <rtps/transport/TCPv6Transport.h>

Expand Down Expand Up @@ -213,6 +214,43 @@ TEST_F(TCPv6Tests, autofill_port)
transportUnderTest_multiple_autofill.configuration()->listening_ports[2]);
EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3);
}

// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
TEST_F(TCPv6Tests, client_announced_local_port_uniqueness)
{
TCPv6TransportDescriptor recvDescriptor;
recvDescriptor.add_listener_port(g_default_port);
MockTCPv6Transport receiveTransportUnderTest(recvDescriptor);
receiveTransportUnderTest.init();

TCPv6TransportDescriptor sendDescriptor_1;
TCPv6Transport sendTransportUnderTest_1(sendDescriptor_1);
sendTransportUnderTest_1.init();

TCPv6TransportDescriptor sendDescriptor_2;
TCPv6Transport sendTransportUnderTest_2(sendDescriptor_2);
sendTransportUnderTest_2.init();

Locator_t outputLocator;
outputLocator.kind = LOCATOR_KIND_TCPv6;
IPLocator::setIPv6(outputLocator, "::1");
outputLocator.port = g_default_port;
IPLocator::setLogicalPort(outputLocator, 7610);

SendResourceList send_resource_list_1;
ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator));
ASSERT_FALSE(send_resource_list_1.empty());

SendResourceList send_resource_list_2;
ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator));
ASSERT_FALSE(send_resource_list_2.empty());

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
}
/*
TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports)
{
Expand Down
38 changes: 3 additions & 35 deletions test/unittest/transport/mock/MockTCPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define MOCK_TRANSPORT_TCP4_STUFF_H

#include <fastrtps/transport/TCPv4TransportDescriptor.h>
#include <fastrtps/utils/IPLocator.h>
#include <rtps/transport/TCPv4Transport.h>

namespace eprosima {
Expand All @@ -25,53 +24,22 @@ namespace rtps {

using TCPv4Transport = eprosima::fastdds::rtps::TCPv4Transport;
using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource;
using TCPChannelResourceBasic = eprosima::fastdds::rtps::TCPChannelResourceBasic;
#if TLS_FOUND
using TCPChannelResourceSecure = eprosima::fastdds::rtps::TCPChannelResourceSecure;
#endif // if TLS_FOUND

class MockTCPv4Transport : public TCPv4Transport
{
public:

MockTCPv4Transport(
const TCPv4TransportDescriptor& descriptor)
: TCPv4Transport(descriptor)
{
configuration_ = descriptor;
}

virtual bool OpenOutputChannel(
SendResourceList&,
const Locator_t& locator) override
const std::map<Locator_t, std::shared_ptr<TCPChannelResource>>& get_channel_resources() const
{
const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator);
std::shared_ptr<TCPChannelResource> channel(
#if TLS_FOUND
(configuration_.apply_security) ?
static_cast<TCPChannelResource*>(
new TCPChannelResourceSecure(this, io_service_, ssl_context_, physicalLocator, 0)) :
#endif // if TLS_FOUND
static_cast<TCPChannelResource*>(
new TCPChannelResourceBasic(this, io_service_, physicalLocator, 0))
);

channel_resources_[physicalLocator] = channel;
return true;
return channel_resources_;
}

/*
virtual bool CloseOutputChannel(const Locator_t& locator) override
{
const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator);
auto it = channel_resources_.find(physicalLocator);
if (it != channel_resources_.end())
{
delete it->second;
channel_resources_.erase(it);
}
return true;
}
*/
};

} // namespace rtps
Expand Down
Loading

0 comments on commit 5290e99

Please sign in to comment.