From 5290e99985cd311a8397b866b99843f8e3ef28d7 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 30 Jan 2024 15:21:18 +0100 Subject: [PATCH] TCP unique client announced local port (#4216) (#4284) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * TCP unique client announced local port (#4216) * Refs #20179: Guarantee unique announced client local port Signed-off-by: Jesus Perez * Refs #20179: Regression tests Signed-off-by: Jesus Perez * Refs #20179: Prevent from join() deadlock (ReceiverResource mtx taken) Signed-off-by: Jesus Perez * Refs #20179: Uncrustify Signed-off-by: Jesus Perez * Refs #20179: Fix windows release tests Signed-off-by: Jesus Perez * Refs #20179: Apply sugestions Signed-off-by: Jesus Perez * Refs #20179: Uncrustify Signed-off-by: Jesus Perez --------- Signed-off-by: Jesus Perez (cherry picked from commit b43f3a065bed3ea5250330aacf1da792d2fbce55) # Conflicts: # src/cpp/rtps/transport/TCPTransportInterface.cpp * Refs #20179: Solve conflicts Signed-off-by: Jesus Perez --------- Signed-off-by: Jesus Perez Co-authored-by: Jesús Pérez <78275223+jepemi@users.noreply.github.com> Co-authored-by: Jesus Perez --- .../rtps/transport/TCPTransportInterface.cpp | 106 ++++++++++-------- .../rtps/transport/TCPTransportInterface.h | 10 ++ .../rtps/transport/tcp/RTCPMessageManager.cpp | 11 +- test/unittest/transport/TCPv4Tests.cpp | 37 ++++++ test/unittest/transport/TCPv6Tests.cpp | 38 +++++++ .../transport/mock/MockTCPv4Transport.h | 38 +------ .../transport/mock/MockTCPv6Transport.h | 49 ++++++++ 7 files changed, 197 insertions(+), 92 deletions(-) create mode 100644 test/unittest/transport/mock/MockTCPv6Transport.h diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index b36ff3c06e0..9f25966ebb9 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -205,6 +205,16 @@ void TCPTransportInterface::clean() } } + if (initial_peer_local_locator_socket_) + { + if (initial_peer_local_locator_socket_->is_open()) + { + initial_peer_local_locator_socket_->close(); + } + + initial_peer_local_locator_socket_.reset(); + } + if (io_service_thread_) { io_service_.stop(); @@ -362,37 +372,46 @@ bool TCPTransportInterface::init( EPROSIMA_LOG_WARNING(TLS, "Error configuring TLS, using TCP transport without security"); } - if (configuration()->sendBufferSize == 0 || configuration()->receiveBufferSize == 0) + /* + Open and bind a socket to obtain a unique port. This port is assigned to PDP passed locators. + Although real client socket local port will differ, this ensures uniqueness for server's channel + resources mapping (uses client locators as keys). + Open and bind a socket to obtain a unique port. This unique port is assigned to to PDP passed locators. + This process ensures uniqueness in the server's channel resources mapping, which uses client locators as keys. + Although differing from the real client socket local port, provides a reliable mapping mechanism. + */ + initial_peer_local_locator_socket_ = std::unique_ptr(new asio::ip::tcp::socket(io_service_)); + initial_peer_local_locator_socket_->open(generate_protocol()); + + // Binding to port 0 delegates the port selection to the system. + initial_peer_local_locator_socket_->bind(asio::ip::tcp::endpoint(generate_protocol(), 0)); + + ip::tcp::endpoint local_endpoint = initial_peer_local_locator_socket_->local_endpoint(); + initial_peer_local_locator_port_ = local_endpoint.port(); + + // Check system buffer sizes. + if (configuration()->sendBufferSize == 0) { - // Check system buffer sizes. - ip::tcp::socket socket(io_service_); - socket.open(generate_protocol()); + socket_base::send_buffer_size option; + initial_peer_local_locator_socket_->get_option(option); + set_send_buffer_size(option.value()); - if (configuration()->sendBufferSize == 0) + if (configuration()->sendBufferSize < s_minimumSocketBuffer) { - socket_base::send_buffer_size option; - socket.get_option(option); - set_send_buffer_size(option.value()); - - if (configuration()->sendBufferSize < s_minimumSocketBuffer) - { - set_send_buffer_size(s_minimumSocketBuffer); - } + set_send_buffer_size(s_minimumSocketBuffer); } + } - if (configuration()->receiveBufferSize == 0) - { - socket_base::receive_buffer_size option; - socket.get_option(option); - set_receive_buffer_size(option.value()); + if (configuration()->receiveBufferSize == 0) + { + socket_base::receive_buffer_size option; + initial_peer_local_locator_socket_->get_option(option); + set_receive_buffer_size(option.value()); - if (configuration()->receiveBufferSize < s_minimumSocketBuffer) - { - set_receive_buffer_size(s_minimumSocketBuffer); - } + if (configuration()->receiveBufferSize < s_minimumSocketBuffer) + { + set_receive_buffer_size(s_minimumSocketBuffer); } - - socket.close(); } if (configuration()->maxMessageSize > s_maximumMessageSize) @@ -1455,18 +1474,7 @@ bool TCPTransportInterface::fillMetatrafficUnicastLocator( { if (IPLocator::getPhysicalPort(locator.port) == 0) { - const TCPTransportDescriptor* config = configuration(); - if (config != nullptr) - { - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } - } + fill_local_physical_port(locator); } if (IPLocator::getLogicalPort(locator) == 0) @@ -1524,18 +1532,7 @@ bool TCPTransportInterface::fillUnicastLocator( { if (IPLocator::getPhysicalPort(locator.port) == 0) { - const TCPTransportDescriptor* config = configuration(); - if (config != nullptr) - { - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } - } + fill_local_physical_port(locator); } if (IPLocator::getLogicalPort(locator) == 0) @@ -1723,6 +1720,19 @@ bool TCPTransportInterface::is_localhost_allowed() const return is_locator_allowed(local_locator); } +void TCPTransportInterface::fill_local_physical_port( + Locator& locator) const +{ + if (!configuration()->listening_ports.empty()) + { + IPLocator::setPhysicalPort(locator, *(configuration()->listening_ports.begin())); + } + else + { + IPLocator::setPhysicalPort(locator, initial_peer_local_locator_port_); + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index d9b9cede221..3136aecf347 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -76,6 +76,9 @@ class TCPTransportInterface : public TransportInterface std::vector current_interfaces_; asio::io_service io_service_; asio::io_service io_service_timers_; + std::unique_ptr initial_peer_local_locator_socket_; + uint16_t initial_peer_local_locator_port_; + #if TLS_FOUND asio::ssl::context ssl_context_; #endif // if TLS_FOUND @@ -434,6 +437,13 @@ class TCPTransportInterface : public TransportInterface void update_network_interfaces() override; bool is_localhost_allowed() const override; + + /** + * Method to fill local locator physical port. + * @param locator locator to be filled. + */ + void fill_local_physical_port( + Locator& locator) const; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index f5b0364d4a1..cb3694a1b30 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -299,18 +299,11 @@ TCPTransactionId RTCPMessageManager::sendConnectionRequest( Locator locator; mTransport->endpoint_to_locator(channel->local_endpoint(), locator); - auto config = mTransport->configuration(); - if (!config->listening_ports.empty()) - { - IPLocator::setPhysicalPort(locator, *(config->listening_ports.begin())); - } - else - { - IPLocator::setPhysicalPort(locator, static_cast(SystemInfo::instance().process_id())); - } + mTransport->fill_local_physical_port(locator); if (locator.kind == LOCATOR_KIND_TCPv4) { + auto config = mTransport->configuration(); const TCPv4TransportDescriptor* pTCPv4Desc = static_cast(config); IPLocator::setWan(locator, pTCPv4Desc->wan_addr[0], pTCPv4Desc->wan_addr[1], pTCPv4Desc->wan_addr[2], pTCPv4Desc->wan_addr[3]); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 504283dfa10..f42aca8450a 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1658,6 +1658,43 @@ TEST_F(TCPv4Tests, autofill_port) EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3); } +// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. +// Clients typically communicated its PID as its locator port. When having several clients in the same +// process this lead to overwriting server's channel resources map elements. +TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) +{ + TCPv4TransportDescriptor recvDescriptor; + recvDescriptor.add_listener_port(g_default_port); + MockTCPv4Transport receiveTransportUnderTest(recvDescriptor); + receiveTransportUnderTest.init(); + + TCPv4TransportDescriptor sendDescriptor_1; + TCPv4Transport sendTransportUnderTest_1(sendDescriptor_1); + sendTransportUnderTest_1.init(); + + TCPv4TransportDescriptor sendDescriptor_2; + TCPv4Transport sendTransportUnderTest_2(sendDescriptor_2); + sendTransportUnderTest_2.init(); + + Locator_t outputLocator; + outputLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); + outputLocator.port = g_default_port; + IPLocator::setLogicalPort(outputLocator, 7410); + + SendResourceList send_resource_list_1; + ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator)); + ASSERT_FALSE(send_resource_list_1.empty()); + + SendResourceList send_resource_list_2; + ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator)); + ASSERT_FALSE(send_resource_list_2.empty()); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2); +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 9198370c99d..1b86d26d04d 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -25,6 +25,7 @@ #include #include +#include "mock/MockTCPv6Transport.h" #include #include @@ -213,6 +214,43 @@ TEST_F(TCPv6Tests, autofill_port) transportUnderTest_multiple_autofill.configuration()->listening_ports[2]); EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3); } + +// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. +// Clients typically communicated its PID as its locator port. When having several clients in the same +// process this lead to overwriting server's channel resources map elements. +TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) +{ + TCPv6TransportDescriptor recvDescriptor; + recvDescriptor.add_listener_port(g_default_port); + MockTCPv6Transport receiveTransportUnderTest(recvDescriptor); + receiveTransportUnderTest.init(); + + TCPv6TransportDescriptor sendDescriptor_1; + TCPv6Transport sendTransportUnderTest_1(sendDescriptor_1); + sendTransportUnderTest_1.init(); + + TCPv6TransportDescriptor sendDescriptor_2; + TCPv6Transport sendTransportUnderTest_2(sendDescriptor_2); + sendTransportUnderTest_2.init(); + + Locator_t outputLocator; + outputLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(outputLocator, "::1"); + outputLocator.port = g_default_port; + IPLocator::setLogicalPort(outputLocator, 7610); + + SendResourceList send_resource_list_1; + ASSERT_TRUE(sendTransportUnderTest_1.OpenOutputChannel(send_resource_list_1, outputLocator)); + ASSERT_FALSE(send_resource_list_1.empty()); + + SendResourceList send_resource_list_2; + ASSERT_TRUE(sendTransportUnderTest_2.OpenOutputChannel(send_resource_list_2, outputLocator)); + ASSERT_FALSE(send_resource_list_2.empty()); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2); +} /* TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) { diff --git a/test/unittest/transport/mock/MockTCPv4Transport.h b/test/unittest/transport/mock/MockTCPv4Transport.h index aa14065a36d..a561e473b4c 100644 --- a/test/unittest/transport/mock/MockTCPv4Transport.h +++ b/test/unittest/transport/mock/MockTCPv4Transport.h @@ -16,7 +16,6 @@ #define MOCK_TRANSPORT_TCP4_STUFF_H #include -#include #include namespace eprosima { @@ -25,10 +24,6 @@ namespace rtps { using TCPv4Transport = eprosima::fastdds::rtps::TCPv4Transport; using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource; -using TCPChannelResourceBasic = eprosima::fastdds::rtps::TCPChannelResourceBasic; -#if TLS_FOUND -using TCPChannelResourceSecure = eprosima::fastdds::rtps::TCPChannelResourceSecure; -#endif // if TLS_FOUND class MockTCPv4Transport : public TCPv4Transport { @@ -36,42 +31,15 @@ class MockTCPv4Transport : public TCPv4Transport MockTCPv4Transport( const TCPv4TransportDescriptor& descriptor) + : TCPv4Transport(descriptor) { - configuration_ = descriptor; } - virtual bool OpenOutputChannel( - SendResourceList&, - const Locator_t& locator) override + const std::map>& get_channel_resources() const { - const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator); - std::shared_ptr channel( -#if TLS_FOUND - (configuration_.apply_security) ? - static_cast( - new TCPChannelResourceSecure(this, io_service_, ssl_context_, physicalLocator, 0)) : -#endif // if TLS_FOUND - static_cast( - new TCPChannelResourceBasic(this, io_service_, physicalLocator, 0)) - ); - - channel_resources_[physicalLocator] = channel; - return true; + return channel_resources_; } - /* - virtual bool CloseOutputChannel(const Locator_t& locator) override - { - const Locator_t& physicalLocator = IPLocator::toPhysicalLocator(locator); - auto it = channel_resources_.find(physicalLocator); - if (it != channel_resources_.end()) - { - delete it->second; - channel_resources_.erase(it); - } - return true; - } - */ }; } // namespace rtps diff --git a/test/unittest/transport/mock/MockTCPv6Transport.h b/test/unittest/transport/mock/MockTCPv6Transport.h new file mode 100644 index 00000000000..d84347cbce7 --- /dev/null +++ b/test/unittest/transport/mock/MockTCPv6Transport.h @@ -0,0 +1,49 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MOCK_TRANSPORT_TCP6_STUFF_H +#define MOCK_TRANSPORT_TCP6_STUFF_H + +#include +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +using TCPv6Transport = eprosima::fastdds::rtps::TCPv6Transport; +using TCPChannelResource = eprosima::fastdds::rtps::TCPChannelResource; + +class MockTCPv6Transport : public TCPv6Transport +{ +public: + + MockTCPv6Transport( + const TCPv6TransportDescriptor& descriptor) + : TCPv6Transport(descriptor) + { + } + + const std::map>& get_channel_resources() const + { + return channel_resources_; + } + +}; + +} // namespace rtps +} // namespace fastrtps +} // namespace eprosima + +#endif //MOCK_TRANSPORT_TCP6_STUFF_H