diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c27c806..d4938c71 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.13 FATAL_ERROR) #----------------------------------------------------------------- # The project #----------------------------------------------------------------- -project (SolidFrame VERSION 11.1) +project (SolidFrame VERSION 12.0) message("SolidFrame version: ${PROJECT_VERSION} - ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}") #----------------------------------------------------------------- diff --git a/RELEASES.md b/RELEASES.md index 8f2725cc..a36282e3 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,5 +1,11 @@ # SolidFrame Releases +## Version 12.0 +* utility: support pthread_spin_lock when available +* utility: ThreadPool improvements and fixes +* frame: Reactor and aio::Reactor using the same new technique from ThreadPool for event passing +* mprpc: preparing the stage for relay multicast support + ## Version 11.1 * mprpc: Split Connection in ClientConnection, ServerConnection, RelayConnection * mprpc: Some cleanup and some small improvements on the MessageReader and MessageWriter diff --git a/cmake/check.config.cmake b/cmake/check.config.cmake index 302456f8..39f78fe1 100644 --- a/cmake/check.config.cmake +++ b/cmake/check.config.cmake @@ -45,5 +45,9 @@ file (READ "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check/epoll2.cpp" source_code) CHECK_CXX_SOURCE_RUNS("${source_code}" SOLID_USE_EPOLL2) +file (READ "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check/pthread_spinlock.cpp" source_code) + +CHECK_CXX_SOURCE_COMPILES("${source_code}" SOLID_USE_PTHREAD_SPINLOCK) + #TODO: #set(SOLID_FRAME_AIO_REACTOR_USE_SPINLOCK TRUE) \ No newline at end of file diff --git a/cmake/check/pthread_spinlock.cpp b/cmake/check/pthread_spinlock.cpp new file mode 100644 index 00000000..9f22f7bd --- /dev/null +++ b/cmake/check/pthread_spinlock.cpp @@ -0,0 +1,8 @@ +#include + +int main(){ + pthread_spinlock_t spl; + auto rv = pthread_spin_init(&spl, PTHREAD_PROCESS_PRIVATE); + if(rv != 0) return -1; + return 0; +} \ No newline at end of file diff --git a/examples/frame/aio_echo/example_echo_auto_client.cpp b/examples/frame/aio_echo/example_echo_auto_client.cpp index 52476d7d..28cf2bfa 100644 --- a/examples/frame/aio_echo/example_echo_auto_client.cpp +++ b/examples/frame/aio_echo/example_echo_auto_client.cpp @@ -196,7 +196,7 @@ int main(int argc, char* argv[]) 1024 * 1024 * 64); } - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/frame/aio_echo/example_secure_echo_client.cpp b/examples/frame/aio_echo/example_secure_echo_client.cpp index 4090f782..7103a95e 100644 --- a/examples/frame/aio_echo/example_secure_echo_client.cpp +++ b/examples/frame/aio_echo/example_secure_echo_client.cpp @@ -174,7 +174,7 @@ int main(int argc, char* argv[]) frame::ServiceT service(manager); frame::ActorIdT actuid; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err; diff --git a/examples/frame/mprpc_echo/example_mprpc_echo.cpp b/examples/frame/mprpc_echo/example_mprpc_echo.cpp index e98c33b1..3b303636 100644 --- a/examples/frame/mprpc_echo/example_mprpc_echo.cpp +++ b/examples/frame/mprpc_echo/example_mprpc_echo.cpp @@ -173,7 +173,7 @@ int main(int argc, char* argv[]) frame::Manager m; frame::mprpc::ServiceT ipcsvc(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); if (!restart(ipcsvc, resolver, sch)) { @@ -211,7 +211,7 @@ bool restart( reflection::v1::metadata::factory, [&](auto& _rmap) { _rmap.template registerMessage(1, "FirstMessage", - []( + []( frame::mprpc::ConnectionContext& _rctx, frame::mprpc::MessagePointerT& _rsend_msg, frame::mprpc::MessagePointerT& _rrecv_msg, diff --git a/examples/frame/relay_server/example_relay_server.cpp b/examples/frame/relay_server/example_relay_server.cpp index 7f2a1a83..29bcfef1 100644 --- a/examples/frame/relay_server/example_relay_server.cpp +++ b/examples/frame/relay_server/example_relay_server.cpp @@ -217,7 +217,7 @@ int main(int argc, char* argv[]) 3, 1024 * 1024 * 64); } - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/frame/relay_server/example_relay_server_bi.cpp b/examples/frame/relay_server/example_relay_server_bi.cpp index 1af1d03a..268af281 100644 --- a/examples/frame/relay_server/example_relay_server_bi.cpp +++ b/examples/frame/relay_server/example_relay_server_bi.cpp @@ -180,7 +180,7 @@ int main(int argc, char* argv[]) 1024 * 1024 * 64); } - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/frame/relay_server/example_relay_server_bi_cp.cpp b/examples/frame/relay_server/example_relay_server_bi_cp.cpp index 22716b60..b638177d 100644 --- a/examples/frame/relay_server/example_relay_server_bi_cp.cpp +++ b/examples/frame/relay_server/example_relay_server_bi_cp.cpp @@ -181,7 +181,7 @@ int main(int argc, char* argv[]) 3, 1024 * 1024 * 64); } - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/frame/relay_server/example_relay_server_bi_ex.cpp b/examples/frame/relay_server/example_relay_server_bi_ex.cpp index 09856ce4..2cc7493e 100644 --- a/examples/frame/relay_server/example_relay_server_bi_ex.cpp +++ b/examples/frame/relay_server/example_relay_server_bi_ex.cpp @@ -382,7 +382,7 @@ int main(int argc, char* argv[]) cout << "sizeof(Connection) = " << sizeof(Connection) << endl; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/frame/relay_server/example_relay_server_bi_sh.cpp b/examples/frame/relay_server/example_relay_server_bi_sh.cpp index d31f6dbb..9cb95f66 100644 --- a/examples/frame/relay_server/example_relay_server_bi_sh.cpp +++ b/examples/frame/relay_server/example_relay_server_bi_sh.cpp @@ -311,7 +311,7 @@ int main(int argc, char* argv[]) cout << "sizeof(Connection) = " << sizeof(Connection) << endl; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); diff --git a/examples/utility/threadpool/example_file_open_pool.cpp b/examples/utility/threadpool/example_file_open_pool.cpp index 65b1a927..4595616a 100644 --- a/examples/utility/threadpool/example_file_open_pool.cpp +++ b/examples/utility/threadpool/example_file_open_pool.cpp @@ -108,7 +108,7 @@ int main(int argc, char* argv[]) using ThreadPoolT = ThreadPool; Context context; ThreadPoolT wp{ - 1, 100, 0, [](const size_t, Context&) {}, [](const size_t, Context&) {}, + {1, 100, 0}, [](const size_t, Context&) {}, [](const size_t, Context&) {}, [](FileDevice* _pfile, Context& _rctx) { int64_t sz = _pfile->size(); int toread; diff --git a/examples/utility/threadpool/example_threadpool.cpp b/examples/utility/threadpool/example_threadpool.cpp index 830086f5..d0d5f62c 100644 --- a/examples/utility/threadpool/example_threadpool.cpp +++ b/examples/utility/threadpool/example_threadpool.cpp @@ -20,7 +20,7 @@ int main(int argc, char* argv[]) solid::log_start(std::cerr, {".*:VIEW"}); ThreadPool wp{ - 1, 100, 0, [](const size_t) {}, [](const size_t) {}, + {1, 100, 0}, [](const size_t) {}, [](const size_t) {}, [](int _v) { solid_log(generic_logger, Info, "v = " << _v); std::this_thread::sleep_for(std::chrono::milliseconds(_v * 10)); diff --git a/solid/frame/aio/aioreactor.hpp b/solid/frame/aio/aioreactor.hpp index 4c0120a9..19717b48 100644 --- a/solid/frame/aio/aioreactor.hpp +++ b/solid/frame/aio/aioreactor.hpp @@ -10,6 +10,8 @@ #pragma once +#include + #include "solid/frame/aio/aiocommon.hpp" #include "solid/frame/aio/aioreactorcontext.hpp" #include "solid/frame/common.hpp" @@ -140,9 +142,10 @@ class Reactor : public frame::ReactorBase { Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _schedidx, const size_t _wake_capacity); ~Reactor(); - size_t pushWakeIndex() noexcept + std::tuple pushWakeIndex() noexcept { - return push_wake_index_.fetch_add(1) % wake_capacity_; + const auto index = push_wake_index_.fetch_add(1); + return {index % wake_capacity_, frame::impl::computeCounter(index, wake_capacity_)}; } template @@ -374,7 +377,7 @@ class Reactor : public impl::Reactor { Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _sched_idx, const size_t _wake_capacity) : impl::Reactor(_rsched, _rstatistic, _sched_idx, _wake_capacity) - , wake_arr_(new WakeStubT[_wake_capacity]) + , wake_arr_(new WakeStubT[wake_capacity_]) { } @@ -385,10 +388,10 @@ class Reactor : public impl::Reactor { mutex().lock(); const UniqueId uid = this->popUid(*_ract); mutex().unlock(); - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(uid, _revent, std::move(_ract), &_rsvc); @@ -414,10 +417,10 @@ class Reactor : public impl::Reactor { mutex().lock(); const UniqueId uid = this->popUid(*_ract); mutex().unlock(); - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(uid, std::move(_revent), std::move(_ract), &_rsvc); @@ -442,10 +445,10 @@ class Reactor : public impl::Reactor { { bool notify = false; { - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(_ractuid, _revent); @@ -468,10 +471,10 @@ class Reactor : public impl::Reactor { { bool notify = false; { - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(_ractuid, std::move(_revent)); @@ -533,7 +536,7 @@ class Reactor : public impl::Reactor { while (true) { const size_t index = pop_wake_index_ % wake_capacity_; auto& rstub = wake_arr_[index]; - if (rstub.isFilled()) { + if (rstub.isFilled(pop_wake_index_, wake_capacity_)) { if (rstub.actor_ptr_) [[unlikely]] { ++actor_count_; rstatistic_.actorCount(actor_count_); diff --git a/solid/frame/aio/src/aioreactor.cpp b/solid/frame/aio/src/aioreactor.cpp index f25b7923..852aa28e 100644 --- a/solid/frame/aio/src/aioreactor.cpp +++ b/solid/frame/aio/src/aioreactor.cpp @@ -343,7 +343,7 @@ Reactor::Reactor( SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _idx, const size_t _wake_capacity) : ReactorBase(_rsched, _idx) - , wake_capacity_(_wake_capacity) + , wake_capacity_(std::bit_ceil(_wake_capacity)) , rstatistic_(_rstatistic) { solid_log(logger, Verbose, ""); diff --git a/solid/frame/aio/test/test_echo_tcp_stress.cpp b/solid/frame/aio/test/test_echo_tcp_stress.cpp index 5e00b430..78de3829 100644 --- a/solid/frame/aio/test/test_echo_tcp_stress.cpp +++ b/solid/frame/aio/test/test_echo_tcp_stress.cpp @@ -487,7 +487,7 @@ int test_echo_tcp_stress(int argc, char* argv[]) frame::Manager srv_mgr; SecureContextT srv_secure_ctx{SecureContextT::create()}; frame::ServiceT srv_svc{srv_mgr}; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); async_resolver(&resolver); @@ -695,8 +695,7 @@ void Listener::onAccept(frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) break; } --repeatcnt; - } while (repeatcnt != 0u && sock.accept( - _rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); + } while (repeatcnt != 0u && sock.accept(_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); if (repeatcnt == 0u) { sock.postAccept( @@ -1004,8 +1003,7 @@ void Listener::onAccept(frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) break; } --repeatcnt; - } while (repeatcnt != 0u && sock.accept( - _rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); + } while (repeatcnt != 0u && sock.accept(_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); if (repeatcnt == 0u) { sock.postAccept( diff --git a/solid/frame/aio/test/test_event_stress_wp.cpp b/solid/frame/aio/test/test_event_stress_wp.cpp index 818f9404..c68a12c4 100644 --- a/solid/frame/aio/test/test_event_stress_wp.cpp +++ b/solid/frame/aio/test/test_event_stress_wp.cpp @@ -224,11 +224,11 @@ int test_event_stress_wp(int argc, char* argv[]) gctx.stopping_ = false; account_cp.start( - thread_count, account_count, 0, [](const size_t, AccountContext&) {}, [](const size_t, AccountContext&) {}, std::ref(acc_ctx)); + {thread_count, account_count, 0}, [](const size_t, AccountContext&) {}, [](const size_t, AccountContext&) {}, std::ref(acc_ctx)); connection_cp.start( - thread_count, account_count * account_connection_count, 0, [](const size_t, ConnectionContext&) {}, [](const size_t, ConnectionContext&) {}, std::ref(conn_ctx)); + {thread_count, account_count * account_connection_count, 0}, [](const size_t, ConnectionContext&) {}, [](const size_t, ConnectionContext&) {}, std::ref(conn_ctx)); device_cp.start( - thread_count, account_count * account_connection_count, 0, [](const size_t, DeviceContext&) {}, [](const size_t, DeviceContext&) {}, std::ref(dev_ctx)); + {thread_count, account_count * account_connection_count, 0}, [](const size_t, DeviceContext&) {}, [](const size_t, DeviceContext&) {}, std::ref(dev_ctx)); conn_ctx.conn_cnt_ = (account_connection_count * account_count); auto produce_lambda = [&]() { diff --git a/solid/frame/aio/test/test_perf_threadpool_lockfree.cpp b/solid/frame/aio/test/test_perf_threadpool_lockfree.cpp index afb976e9..7d02254f 100644 --- a/solid/frame/aio/test/test_perf_threadpool_lockfree.cpp +++ b/solid/frame/aio/test/test_perf_threadpool_lockfree.cpp @@ -48,7 +48,7 @@ int test_perf_threadpool_lockfree(int argc, char* argv[]) (void)context_count; auto lambda = [&]() { ThreadPoolT wp{ - thread_count, 10000, 0, [](const size_t) {}, [](const size_t) {}, + {thread_count, 10000, 0}, [](const size_t) {}, [](const size_t) {}, [&](EventBase& _event) { if (_event == generic_event) { ++received_events; diff --git a/solid/frame/aio/test/test_perf_threadpool_synch_context.cpp b/solid/frame/aio/test/test_perf_threadpool_synch_context.cpp index 7d85e46c..c7bb692f 100644 --- a/solid/frame/aio/test/test_perf_threadpool_synch_context.cpp +++ b/solid/frame/aio/test/test_perf_threadpool_synch_context.cpp @@ -51,7 +51,7 @@ int test_perf_threadpool_synch_context(int argc, char* argv[]) auto lambda = [&]() { auto start = std::chrono::steady_clock::now(); ThreadPoolT wp{ - thread_count, 10000, 0, [](const size_t) {}, [](const size_t) {}, + {thread_count, 10000, 0}, [](const size_t) {}, [](const size_t) {}, [&](EventBase& _event) { if (_event == generic_event) { ++received_events; diff --git a/solid/frame/mprpc/src/mprpclistener.cpp b/solid/frame/mprpc/src/mprpclistener.cpp index b5b56545..0186b184 100644 --- a/solid/frame/mprpc/src/mprpclistener.cpp +++ b/solid/frame/mprpc/src/mprpclistener.cpp @@ -71,8 +71,7 @@ void Listener::onAccept(frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) } --repeatcnt; } while ( - repeatcnt != 0u && sock.accept( - _rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); + repeatcnt != 0u && sock.accept(_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd)); if (repeatcnt == 0u) { sock.postAccept( diff --git a/solid/frame/mprpc/src/mprpcmessagewriter.cpp b/solid/frame/mprpc/src/mprpcmessagewriter.cpp index 3224ea70..b465ba55 100644 --- a/solid/frame/mprpc/src/mprpcmessagewriter.cpp +++ b/solid/frame/mprpc/src/mprpcmessagewriter.cpp @@ -72,7 +72,7 @@ void MessageWriter::doWriteQueuePushBack(const size_t _msgidx, const int _line) } MessageStub& rmsgstub(message_vec_[_msgidx]); - solid_log(logger, Info, this << " code line = " << _line << " idx = " << _msgidx << " is_relay = " << rmsgstub.isRelay()); + solid_log(logger, Info, this << " code line = " << _line << " idx = " << _msgidx << " is_relay = " << rmsgstub.isRelay() << " msg = " << rmsgstub.msgbundle_.message_ptr.get()); if (!rmsgstub.isRelay()) { ++write_queue_direct_count_; } @@ -141,7 +141,7 @@ bool MessageWriter::enqueue( order_inner_list_.pushBack(idx); doWriteQueuePushBack(idx, __LINE__); - solid_log(logger, Verbose, "is_relayed = " << Message::is_relayed(rmsgstub.msgbundle_.message_ptr->flags()) << ' ' << MessageWriterPrintPairT(*this, PrintInnerListsE) << " relay " << rmsgstub.msgbundle_.message_relay_header_ << " " << _rmsgbundle.message_relay_header_); + solid_log(logger, Verbose, "messgeptr = " << rmsgstub.msgbundle_.message_ptr.get() << " is_relayed = " << Message::is_relayed(rmsgstub.msgbundle_.message_ptr->flags()) << ' ' << MessageWriterPrintPairT(*this, PrintInnerListsE) << " relay " << rmsgstub.msgbundle_.message_relay_header_ << " " << _rmsgbundle.message_relay_header_); return true; } @@ -676,9 +676,11 @@ char* MessageWriter::doWriteMessageHead( _rsender.context().request_id.unique = rmsgstub.unique_; _rsender.context().message_flags = rmsgstub.msgbundle_.message_flags; if (rmsgstub.msgbundle_.message_relay_header_.has_value()) { + // solid_assert_log(_rsender.context().message_flags.isSet(MessageFlagsE::Relayed), logger, ""<&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientfrontback_upload.cpp b/solid/frame/mprpc/test/test_clientfrontback_upload.cpp index 93bf69e0..112876ea 100644 --- a/solid/frame/mprpc/test/test_clientfrontback_upload.cpp +++ b/solid/frame/mprpc/test/test_clientfrontback_upload.cpp @@ -346,7 +346,7 @@ int test_clientfrontback_upload(int argc, char* argv[]) frame::mprpc::ServiceT mprpc_back_client(m); frame::mprpc::ServiceT mprpc_back_server(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_basic.cpp b/solid/frame/mprpc/test/test_clientserver_basic.cpp index 2d2fe3df..962f9292 100644 --- a/solid/frame/mprpc/test/test_clientserver_basic.cpp +++ b/solid/frame/mprpc/test/test_clientserver_basic.cpp @@ -326,7 +326,7 @@ int test_clientserver_basic(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_cancel_client.cpp b/solid/frame/mprpc/test/test_clientserver_cancel_client.cpp index d842e647..4c15651a 100644 --- a/solid/frame/mprpc/test/test_clientserver_cancel_client.cpp +++ b/solid/frame/mprpc/test/test_clientserver_cancel_client.cpp @@ -309,7 +309,7 @@ int test_clientserver_cancel_client(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_cancel_server.cpp b/solid/frame/mprpc/test/test_clientserver_cancel_server.cpp index 42a90a1b..b2dcc782 100644 --- a/solid/frame/mprpc/test/test_clientserver_cancel_server.cpp +++ b/solid/frame/mprpc/test/test_clientserver_cancel_server.cpp @@ -321,7 +321,7 @@ int test_clientserver_cancel_server(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_delayed.cpp b/solid/frame/mprpc/test/test_clientserver_delayed.cpp index 81cb69f9..1c82a97e 100644 --- a/solid/frame/mprpc/test/test_clientserver_delayed.cpp +++ b/solid/frame/mprpc/test/test_clientserver_delayed.cpp @@ -307,7 +307,7 @@ int test_clientserver_delayed(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_download.cpp b/solid/frame/mprpc/test/test_clientserver_download.cpp index 60ed60b2..b9fc7688 100644 --- a/solid/frame/mprpc/test/test_clientserver_download.cpp +++ b/solid/frame/mprpc/test/test_clientserver_download.cpp @@ -233,7 +233,7 @@ int test_clientserver_download(int argc, char* argv[]) frame::mprpc::ServiceT mprpc_client(m); frame::mprpc::ServiceT mprpc_server(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_idempotent.cpp b/solid/frame/mprpc/test/test_clientserver_idempotent.cpp index 51776734..84acbcfa 100644 --- a/solid/frame/mprpc/test/test_clientserver_idempotent.cpp +++ b/solid/frame/mprpc/test/test_clientserver_idempotent.cpp @@ -324,7 +324,7 @@ int test_clientserver_idempotent(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_noserver.cpp b/solid/frame/mprpc/test/test_clientserver_noserver.cpp index 8d2ca81f..30424232 100644 --- a/solid/frame/mprpc/test/test_clientserver_noserver.cpp +++ b/solid/frame/mprpc/test/test_clientserver_noserver.cpp @@ -220,7 +220,7 @@ int test_clientserver_noserver(int argc, char* argv[]) frame::Manager m; frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_oneshot.cpp b/solid/frame/mprpc/test/test_clientserver_oneshot.cpp index 6b592f31..c16569fb 100644 --- a/solid/frame/mprpc/test/test_clientserver_oneshot.cpp +++ b/solid/frame/mprpc/test/test_clientserver_oneshot.cpp @@ -221,7 +221,7 @@ int test_clientserver_oneshot(int argc, char* argv[]) frame::Manager m; frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_sendrequest.cpp b/solid/frame/mprpc/test/test_clientserver_sendrequest.cpp index a5283f4b..387a74f5 100644 --- a/solid/frame/mprpc/test/test_clientserver_sendrequest.cpp +++ b/solid/frame/mprpc/test/test_clientserver_sendrequest.cpp @@ -388,7 +388,7 @@ int test_clientserver_sendrequest(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_split.cpp b/solid/frame/mprpc/test/test_clientserver_split.cpp index 53d38dbf..15085570 100644 --- a/solid/frame/mprpc/test/test_clientserver_split.cpp +++ b/solid/frame/mprpc/test/test_clientserver_split.cpp @@ -353,7 +353,7 @@ int test_clientserver_split(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_timeout_secure.cpp b/solid/frame/mprpc/test/test_clientserver_timeout_secure.cpp index 7dabe3ba..274102f8 100644 --- a/solid/frame/mprpc/test/test_clientserver_timeout_secure.cpp +++ b/solid/frame/mprpc/test/test_clientserver_timeout_secure.cpp @@ -212,7 +212,7 @@ int test_clientserver_timeout_secure(int argc, char* argv[]) frame::Manager m; frame::mprpc::ServiceT mprpcserver(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); frame::ServiceT svc_client{m}; async_resolver(&resolver); diff --git a/solid/frame/mprpc/test/test_clientserver_topic.cpp b/solid/frame/mprpc/test/test_clientserver_topic.cpp index 3874ed25..69d90f7e 100644 --- a/solid/frame/mprpc/test/test_clientserver_topic.cpp +++ b/solid/frame/mprpc/test/test_clientserver_topic.cpp @@ -3,9 +3,9 @@ #include #include #include -#ifdef __cpp_lib_ranges +// #ifdef __cpp_lib_ranges #include -#endif +// #endif #include "solid/frame/mprpc/mprpcsocketstub_openssl.hpp" @@ -333,7 +333,7 @@ int test_clientserver_topic(int argc, char* argv[]) frame::aio::Resolver resolver([&resolve_pool](std::function&& _fnc) { resolve_pool.pushOne(std::move(_fnc)); }); worker_pool.start( - thread_count, 10000, 100, + {thread_count, 10000, 100}, [](const size_t) { set_current_thread_affinity(); local_thread_pool_context_ptr = std::make_unique(); @@ -341,7 +341,7 @@ int test_clientserver_topic(int argc, char* argv[]) [](const size_t) {}); resolve_pool.start( - 1, 100, 0, [](const size_t) {}, [](const size_t) {}); + {1, 100, 0}, [](const size_t) {}, [](const size_t) {}); sch_client.start([]() {set_current_thread_affinity();return true; }, []() {}, 1); sch_server.start([]() { set_current_thread_affinity(); diff --git a/solid/frame/mprpc/test/test_clientserver_upload.cpp b/solid/frame/mprpc/test/test_clientserver_upload.cpp index e9c94686..b1be9b1d 100644 --- a/solid/frame/mprpc/test/test_clientserver_upload.cpp +++ b/solid/frame/mprpc/test/test_clientserver_upload.cpp @@ -223,7 +223,7 @@ int test_clientserver_upload(int argc, char* argv[]) frame::mprpc::ServiceT mprpc_client(m); frame::mprpc::ServiceT mprpc_server(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_upload_single.cpp b/solid/frame/mprpc/test/test_clientserver_upload_single.cpp index 6fb5e5e0..e7ae6864 100644 --- a/solid/frame/mprpc/test/test_clientserver_upload_single.cpp +++ b/solid/frame/mprpc/test/test_clientserver_upload_single.cpp @@ -227,7 +227,7 @@ int test_clientserver_upload_single(int argc, char* argv[]) frame::mprpc::ServiceT mprpc_client(m); frame::mprpc::ServiceT mprpc_server(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_clientserver_versioning.cpp b/solid/frame/mprpc/test/test_clientserver_versioning.cpp index 4fc09652..1f1372c9 100644 --- a/solid/frame/mprpc/test/test_clientserver_versioning.cpp +++ b/solid/frame/mprpc/test/test_clientserver_versioning.cpp @@ -77,7 +77,7 @@ int test_clientserver_versioning(int argc, char* argv[]) AioSchedulerT scheduler; frame::Manager manager; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); frame::mprpc::ServiceT service(manager); frame::mprpc::ServiceT service_v1(manager); diff --git a/solid/frame/mprpc/test/test_connection_close.cpp b/solid/frame/mprpc/test/test_connection_close.cpp index c9eba4b4..c2b2e8f7 100644 --- a/solid/frame/mprpc/test/test_connection_close.cpp +++ b/solid/frame/mprpc/test/test_connection_close.cpp @@ -319,7 +319,7 @@ int test_connection_close(int argc, char* argv[]) frame::Manager m; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); diff --git a/solid/frame/mprpc/test/test_keepalive_fail.cpp b/solid/frame/mprpc/test/test_keepalive_fail.cpp index 4cbfc0ea..f34d6bb1 100644 --- a/solid/frame/mprpc/test/test_keepalive_fail.cpp +++ b/solid/frame/mprpc/test/test_keepalive_fail.cpp @@ -307,7 +307,7 @@ int test_keepalive_fail(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_keepalive_success.cpp b/solid/frame/mprpc/test/test_keepalive_success.cpp index c2f585f0..791ffd16 100644 --- a/solid/frame/mprpc/test/test_keepalive_success.cpp +++ b/solid/frame/mprpc/test/test_keepalive_success.cpp @@ -288,7 +288,7 @@ int test_keepalive_success(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_multiprotocol_basic.cpp b/solid/frame/mprpc/test/test_multiprotocol_basic.cpp index de9d248a..3209c490 100644 --- a/solid/frame/mprpc/test/test_multiprotocol_basic.cpp +++ b/solid/frame/mprpc/test/test_multiprotocol_basic.cpp @@ -86,7 +86,7 @@ int test_multiprotocol_basic(int argc, char* argv[]) frame::Manager m; frame::mprpc::ServiceT mprpcserver(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_pool_basic.cpp b/solid/frame/mprpc/test/test_pool_basic.cpp index cc5a63fe..63c0d3a0 100644 --- a/solid/frame/mprpc/test/test_pool_basic.cpp +++ b/solid/frame/mprpc/test/test_pool_basic.cpp @@ -317,7 +317,7 @@ int test_pool_basic(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_pool_delay_close.cpp b/solid/frame/mprpc/test/test_pool_delay_close.cpp index 02fa4044..70853c55 100644 --- a/solid/frame/mprpc/test/test_pool_delay_close.cpp +++ b/solid/frame/mprpc/test/test_pool_delay_close.cpp @@ -288,7 +288,7 @@ int test_pool_delay_close(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_pool_force_close.cpp b/solid/frame/mprpc/test/test_pool_force_close.cpp index 176e21d8..f189d77f 100644 --- a/solid/frame/mprpc/test/test_pool_force_close.cpp +++ b/solid/frame/mprpc/test/test_pool_force_close.cpp @@ -257,7 +257,7 @@ int test_pool_force_close(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_raw_basic.cpp b/solid/frame/mprpc/test/test_raw_basic.cpp index b0b324fe..b3b2270b 100644 --- a/solid/frame/mprpc/test/test_raw_basic.cpp +++ b/solid/frame/mprpc/test/test_raw_basic.cpp @@ -359,7 +359,7 @@ int test_raw_basic(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_raw_proxy.cpp b/solid/frame/mprpc/test/test_raw_proxy.cpp index 638f61c0..f9229ef7 100644 --- a/solid/frame/mprpc/test/test_raw_proxy.cpp +++ b/solid/frame/mprpc/test/test_raw_proxy.cpp @@ -358,7 +358,7 @@ int test_raw_proxy(int argc, char* argv[]) frame::mprpc::ServiceT mprpcserver(m); frame::mprpc::ServiceT mprpcclient(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_client.start(1); diff --git a/solid/frame/mprpc/test/test_relay_basic.cpp b/solid/frame/mprpc/test/test_relay_basic.cpp index 7232d429..8582aa53 100644 --- a/solid/frame/mprpc/test/test_relay_basic.cpp +++ b/solid/frame/mprpc/test/test_relay_basic.cpp @@ -284,7 +284,7 @@ void peerb_complete_message( ErrorConditionT const& _rerror) { if (_rrecv_msg_ptr) { - solid_dbg(generic_logger, Info, _rctx.recipientId() << " received message with id on sender " << _rrecv_msg_ptr->senderRequestId() << " datasz = " << _rrecv_msg_ptr->str.size()); + solid_dbg(generic_logger, Info, _rctx.recipientId() << ' ' << _rrecv_msg_ptr.get() << " received message with id on sender " << _rrecv_msg_ptr->senderRequestId() << " datasz = " << _rrecv_msg_ptr->str.size() << " isRelay = " << _rrecv_msg_ptr->isRelayed()); if (!_rrecv_msg_ptr->check()) { solid_assert(false); @@ -388,7 +388,7 @@ int test_relay_basic(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_cancel_request.cpp b/solid/frame/mprpc/test/test_relay_cancel_request.cpp index b6b32eeb..030f197c 100644 --- a/solid/frame/mprpc/test/test_relay_cancel_request.cpp +++ b/solid/frame/mprpc/test/test_relay_cancel_request.cpp @@ -437,7 +437,7 @@ int test_relay_cancel_request(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_cancel_response.cpp b/solid/frame/mprpc/test/test_relay_cancel_response.cpp index 268e015b..1667c344 100644 --- a/solid/frame/mprpc/test/test_relay_cancel_response.cpp +++ b/solid/frame/mprpc/test/test_relay_cancel_response.cpp @@ -434,7 +434,7 @@ int test_relay_cancel_response(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_close_request.cpp b/solid/frame/mprpc/test/test_relay_close_request.cpp index dfc6c941..8f21c6e3 100644 --- a/solid/frame/mprpc/test/test_relay_close_request.cpp +++ b/solid/frame/mprpc/test/test_relay_close_request.cpp @@ -394,7 +394,7 @@ int test_relay_close_request(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_close_response.cpp b/solid/frame/mprpc/test/test_relay_close_response.cpp index 0eb832c3..a7df6bec 100644 --- a/solid/frame/mprpc/test/test_relay_close_response.cpp +++ b/solid/frame/mprpc/test/test_relay_close_response.cpp @@ -394,7 +394,7 @@ int test_relay_close_response(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_detect_close.cpp b/solid/frame/mprpc/test/test_relay_detect_close.cpp index 73b19471..1c2bc749 100644 --- a/solid/frame/mprpc/test/test_relay_detect_close.cpp +++ b/solid/frame/mprpc/test/test_relay_detect_close.cpp @@ -267,7 +267,7 @@ int test_relay_detect_close(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_detect_close_while_response.cpp b/solid/frame/mprpc/test/test_relay_detect_close_while_response.cpp index 7b483745..048b53ab 100644 --- a/solid/frame/mprpc/test/test_relay_detect_close_while_response.cpp +++ b/solid/frame/mprpc/test/test_relay_detect_close_while_response.cpp @@ -306,7 +306,7 @@ int test_relay_detect_close_while_response(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_disabled.cpp b/solid/frame/mprpc/test/test_relay_disabled.cpp index 09270a30..fba6eb06 100644 --- a/solid/frame/mprpc/test/test_relay_disabled.cpp +++ b/solid/frame/mprpc/test/test_relay_disabled.cpp @@ -360,7 +360,7 @@ int test_relay_disabled(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/mprpc/test/test_relay_split.cpp b/solid/frame/mprpc/test/test_relay_split.cpp index ba15aca7..a5fbd7eb 100644 --- a/solid/frame/mprpc/test/test_relay_split.cpp +++ b/solid/frame/mprpc/test/test_relay_split.cpp @@ -413,7 +413,7 @@ int test_relay_split(int argc, char* argv[]) frame::mprpc::ServiceT mprpcpeera(m); frame::mprpc::ServiceT mprpcpeerb(m); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); sch_peera.start(1); diff --git a/solid/frame/reactor.hpp b/solid/frame/reactor.hpp index 64e8b758..2b1c34c3 100644 --- a/solid/frame/reactor.hpp +++ b/solid/frame/reactor.hpp @@ -120,9 +120,10 @@ class Reactor : public frame::ReactorBase { Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _schedidx, const size_t _wake_capacity); ~Reactor(); - size_t pushWakeIndex() noexcept + std::tuple pushWakeIndex() noexcept { - return push_wake_index_.fetch_add(1) % wake_capacity_; + const auto index = push_wake_index_.fetch_add(1); + return {index % wake_capacity_, computeCounter(index, wake_capacity_)}; } template @@ -339,7 +340,7 @@ class Reactor : public impl::Reactor { Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _sched_idx, const size_t _wake_capacity) : impl::Reactor(_rsched, _rstatistic, _sched_idx, _wake_capacity) - , wake_arr_(new WakeStubT[_wake_capacity]) + , wake_arr_(new WakeStubT[wake_capacity_]) { } @@ -379,10 +380,10 @@ class Reactor : public impl::Reactor { mutex().lock(); const UniqueId uid = this->popUid(*_ract); mutex().unlock(); - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(uid, std::move(_revent), std::move(_ract), &_rsvc); @@ -407,10 +408,10 @@ class Reactor : public impl::Reactor { { bool notify = false; { - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(_ractuid, _revent); @@ -432,10 +433,10 @@ class Reactor : public impl::Reactor { { bool notify = false; { - const auto index = pushWakeIndex(); - auto& rstub = wake_arr_[index]; + const auto [index, count] = pushWakeIndex(); + auto& rstub = wake_arr_[index]; - rstub.waitWhilePush(rstatistic_); + rstub.waitWhilePush(rstatistic_, count); rstub.reset(_ractuid, std::move(_revent)); @@ -492,7 +493,7 @@ class Reactor : public impl::Reactor { while (true) { const size_t index = pop_wake_index_ % wake_capacity_; auto& rstub = wake_arr_[index]; - if (rstub.isFilled()) { + if (rstub.isFilled(pop_wake_index_, wake_capacity_)) { if (rstub.actor_ptr_) [[unlikely]] { ++actor_count_; rstatistic_.actorCount(actor_count_); diff --git a/solid/frame/reactorbase.hpp b/solid/frame/reactorbase.hpp index b6683891..51075404 100644 --- a/solid/frame/reactorbase.hpp +++ b/solid/frame/reactorbase.hpp @@ -45,74 +45,54 @@ struct ReactorStatisticBase : solid::Statistic { }; namespace impl { +using AtomicIndexT = std::atomic_size_t; +using AtomicIndexValueT = std::atomic_size_t::value_type; +using AtomicCounterT = std::atomic; +using AtomicCounterValueT = AtomicCounterT::value_type; + +template +inline constexpr static auto computeCounter(const IndexT _index, const size_t _capacity) noexcept +{ + return (_index / _capacity) & std::numeric_limits::max(); +} struct WakeStubBase { - enum struct LockE : uint8_t { - Empty = 0, - Pushing, - Filled, - }; -#if defined(__cpp_lib_atomic_wait) - std::atomic_flag pushing_ = ATOMIC_FLAG_INIT; -#else - std::atomic_bool pushing_ = {false}; -#endif - std::atomic_uint8_t lock_ = {to_underlying(LockE::Empty)}; + AtomicCounterT produce_count_{0}; + AtomicCounterT consume_count_{static_cast(-1)}; template - void waitWhilePush(Statistic& _rstats) noexcept + void waitWhilePush(Statistic& _rstats, const AtomicCounterValueT _count, const size_t _spin_count = 1) noexcept { + auto spin = _spin_count; while (true) { -#if defined(__cpp_lib_atomic_wait) - const bool already_pushing = pushing_.test_and_set(std::memory_order_acquire); -#else - bool expected = false; - const bool already_pushing = !pushing_.compare_exchange_strong(expected, true, std::memory_order_acquire); -#endif - if (!already_pushing) { - // wait for lock to be 0. - uint8_t value = to_underlying(LockE::Empty); - - if (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))) { - do { - std::atomic_wait(&lock_, value); - value = to_underlying(LockE::Empty); - } while (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))); - _rstats.pushWhileWaitLock(); - } - return; - } else { -#if defined(__cpp_lib_atomic_wait) - pushing_.wait(true); -#else - std::atomic_wait(&pushing_, true); -#endif - _rstats.pushWhileWaitPushing(); + const auto cnt = produce_count_.load(); + if (cnt == _count) { + break; + } else if (_spin_count && !spin--) { + _rstats.pushWhileWaitLock(); + std::atomic_wait_explicit(&produce_count_, cnt, std::memory_order_relaxed); + spin = _spin_count; } } } void notifyWhilePush() noexcept { - lock_.store(to_underlying(LockE::Filled)); -#if defined(__cpp_lib_atomic_wait) - pushing_.clear(std::memory_order_release); - pushing_.notify_one(); -#else - pushing_.store(false, std::memory_order_release); - std::atomic_notify_one(&pushing_); -#endif + ++consume_count_; + std::atomic_notify_all(&consume_count_); } void notifyWhilePop() noexcept { - lock_.store(to_underlying(LockE::Empty)); - std::atomic_notify_one(&lock_); + ++produce_count_; + std::atomic_notify_all(&produce_count_); } - bool isFilled() const noexcept + bool isFilled(const uint64_t _id, const size_t _capacity) const { - return lock_.load() == to_underlying(LockE::Filled); + const auto count = consume_count_.load(std::memory_order_relaxed); + const AtomicCounterValueT expected_count = computeCounter(_id, _capacity); + return count == expected_count; } }; diff --git a/solid/frame/src/manager.cpp b/solid/frame/src/manager.cpp index 52c9d6cf..94c7cac6 100644 --- a/solid/frame/src/manager.cpp +++ b/solid/frame/src/manager.cpp @@ -515,12 +515,12 @@ Manager::Manager( const size_t _actor_mutex_count, const size_t _chunk_mutex_count) : pimpl_( - _service_capacity, - _actor_capacity, - _actor_bucket_size == 0 ? (memory_page_size() - sizeof(ActorChunk) + sizeof(ActorStub)) / sizeof(ActorStub) : _actor_bucket_size, - _service_mutex_count == 0 ? _service_capacity : _service_mutex_count, - _actor_mutex_count == 0 ? 1024 : _actor_mutex_count, - _chunk_mutex_count == 0 ? 1024 : _chunk_mutex_count) + _service_capacity, + _actor_capacity, + _actor_bucket_size == 0 ? (memory_page_size() - sizeof(ActorChunk) + sizeof(ActorStub)) / sizeof(ActorStub) : _actor_bucket_size, + _service_mutex_count == 0 ? _service_capacity : _service_mutex_count, + _actor_mutex_count == 0 ? 1024 : _actor_mutex_count, + _chunk_mutex_count == 0 ? 1024 : _chunk_mutex_count) { solid_log(frame_logger, Verbose, "" << this); } diff --git a/solid/frame/src/reactor.cpp b/solid/frame/src/reactor.cpp index 507592f1..10b25ac3 100644 --- a/solid/frame/src/reactor.cpp +++ b/solid/frame/src/reactor.cpp @@ -7,6 +7,7 @@ // Distributed under the Boost Software License, Version 1.0. // See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt. // +#include #include #include #include @@ -154,7 +155,7 @@ Reactor::Reactor( SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _idx, const size_t _wake_capacity) : ReactorBase(_rsched, _idx) - , wake_capacity_(_wake_capacity) + , wake_capacity_(std::bit_ceil(_wake_capacity)) , rstatistic_(_rstatistic) { solid_log(frame_logger, Verbose, ""); diff --git a/solid/system/configuration_impl.hpp.in b/solid/system/configuration_impl.hpp.in index 5c396d03..9ec992ff 100644 --- a/solid/system/configuration_impl.hpp.in +++ b/solid/system/configuration_impl.hpp.in @@ -1,6 +1,7 @@ #pragma once #cmakedefine SOLID_USE_PTHREAD +#cmakedefine SOLID_USE_PTHREAD_SPINLOCK #cmakedefine SOLID_USE_EVENTFD #cmakedefine SOLID_USE_EPOLL #cmakedefine SOLID_USE_KQUEUE diff --git a/solid/system/spinlock.hpp b/solid/system/spinlock.hpp index 65080458..77ec8505 100644 --- a/solid/system/spinlock.hpp +++ b/solid/system/spinlock.hpp @@ -10,6 +10,12 @@ #pragma once #include "solid/system/common.hpp" + +#ifdef SOLID_USE_PTHREAD_SPINLOCK + +#include + +#else #include #include @@ -25,6 +31,7 @@ #include #endif #endif +#endif namespace solid { @@ -45,8 +52,34 @@ inline void cpu_pause() #endif } +#if defined(SOLID_USE_PTHREAD_SPINLOCK) +class SpinLock : NonCopyable { + pthread_spinlock_t spin_; + +public: + SpinLock() + { + solid_check(pthread_spin_init(&spin_, PTHREAD_PROCESS_PRIVATE) == 0); + } + void lock() noexcept + { + pthread_spin_lock(&spin_); + } + + bool try_lock() noexcept + { + return pthread_spin_trylock(&spin_) == 0; + } + + void unlock() noexcept + { + pthread_spin_unlock(&spin_); + } +}; +#else + #if defined(__cpp_lib_atomic_flag_test) -class SpinLock { +class SpinLock : NonCopyable { std::atomic_flag atomic_flag = ATOMIC_FLAG_INIT; public: @@ -58,8 +91,6 @@ class SpinLock { } while (atomic_flag.test(std::memory_order_relaxed)) { cpu_pause(); - //_mm_pause(); - // std::this_thread::yield(); } } } @@ -78,7 +109,7 @@ class SpinLock { }; #else // https://rigtorp.se/spinlock/ -class SpinLock { +class SpinLock : NonCopyable { std::atomic lock_ = {0}; public: @@ -111,6 +142,7 @@ class SpinLock { } }; #endif +#endif using SpinGuardT = std::lock_guard; diff --git a/solid/system/src/stacktrace_windows.cpp b/solid/system/src/stacktrace_windows.cpp index 9fa19dbd..1ad678a4 100644 --- a/solid/system/src/stacktrace_windows.cpp +++ b/solid/system/src/stacktrace_windows.cpp @@ -30,8 +30,7 @@ #define g3_MAP_PAIR_STRINGIFY(x) \ { \ - x, #x \ - } + x, #x} namespace { thread_local size_t g_thread_local_recursive_crash_check = 0; diff --git a/solid/utility/common.hpp b/solid/utility/common.hpp index fbadf69e..deeb38b3 100644 --- a/solid/utility/common.hpp +++ b/solid/utility/common.hpp @@ -12,6 +12,9 @@ #include "solid/system/common.hpp" #include +#ifdef __cpp_lib_bitops +#include +#endif namespace solid { @@ -72,11 +75,23 @@ inline constexpr size_t fast_padded_size(const size_t _sz, const size_t _bitpad) return _sz + pad; } +#ifdef __cpp_lib_bitops + +template +size_t bit_count(const T _v) +{ + return static_cast(std::popcount(_v)); +} + +#else + size_t bit_count(const uint8_t _v); size_t bit_count(const uint16_t _v); size_t bit_count(const uint32_t _v); size_t bit_count(const uint64_t _v); +#endif + inline size_t leading_zero_count(uint64_t x) { x = x | (x >> 1); @@ -162,7 +177,7 @@ inline uint64_t bit_revert(const uint64_t _v) struct InvalidIndex { template - operator SizeT() const + constexpr operator SizeT() const { return (std::numeric_limits::max)(); } diff --git a/solid/utility/src/utility.cpp b/solid/utility/src/utility.cpp index 59f2179b..1fd43b44 100644 --- a/solid/utility/src/utility.cpp +++ b/solid/utility/src/utility.cpp @@ -200,6 +200,10 @@ std::ostream& ThreadPoolStatistic::print(std::ostream& _ros) const _ros << " pop_one_wait_popping_count = " << pop_one_wait_popping_count_.load(std::memory_order_relaxed); _ros << " push_all_wait_lock_count = " << push_all_wait_lock_count_.load(std::memory_order_relaxed); _ros << " push_all_wait_pushing_count = " << push_all_wait_pushing_count_.load(std::memory_order_relaxed); + _ros << " push_one_latency_max_us = " << push_one_latency_max_us_.load(std::memory_order_relaxed); + _ros << " push_one_latency_min_us = " << push_one_latency_min_us_.load(std::memory_order_relaxed); + const auto sum_ones = push_one_count_[0].load(std::memory_order_relaxed) + push_one_count_[1].load(std::memory_order_relaxed); + _ros << " push_one_latency_avg_us = " << sum_ones ? push_one_latency_sum_us_.load(std::memory_order_relaxed) / sum_ones : 0; return _ros; } void ThreadPoolStatistic::clear() {} diff --git a/solid/utility/test/CMakeLists.txt b/solid/utility/test/CMakeLists.txt index afb951c9..835317c1 100644 --- a/solid/utility/test/CMakeLists.txt +++ b/solid/utility/test/CMakeLists.txt @@ -31,6 +31,7 @@ set( ThreadPoolTestSuite test_threadpool_basic.cpp test_threadpool_chain.cpp test_threadpool_pattern.cpp + test_threadpool_batch.cpp #test_threadpool_try.cpp ) diff --git a/solid/utility/test/test_callpool_multicast_basic.cpp b/solid/utility/test/test_callpool_multicast_basic.cpp index c1e525f6..42750f26 100644 --- a/solid/utility/test/test_callpool_multicast_basic.cpp +++ b/solid/utility/test/test_callpool_multicast_basic.cpp @@ -61,7 +61,7 @@ int test_callpool_multicast_basic(int argc, char* argv[]) ++worker_stop_count; }; { - CallPoolT cp{worker_count, 100000, 1000, worker_start, worker_stop, std::ref(context), std::ref(record_dq)}; + CallPoolT cp{{worker_count, 100000, 1000}, worker_start, worker_stop, std::ref(context), std::ref(record_dq)}; pwp = &cp; std::promise barrier; diff --git a/solid/utility/test/test_callpool_multicast_pattern.cpp b/solid/utility/test/test_callpool_multicast_pattern.cpp index 7aae8e76..e277f685 100644 --- a/solid/utility/test/test_callpool_multicast_pattern.cpp +++ b/solid/utility/test/test_callpool_multicast_pattern.cpp @@ -48,7 +48,7 @@ int test_callpool_multicast_pattern(int argc, char* argv[]) auto lambda = [&]() { for (int i = 0; i < loop_cnt; ++i) { { - CallPoolT cp{2, 100000, 100000, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, std::ref(context)}; + CallPoolT cp{{2, 100000, 100000}, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, std::ref(context)}; pwp = &cp; for (size_t j = 0; j < cnt; ++j) { diff --git a/solid/utility/test/test_collapse.cpp b/solid/utility/test/test_collapse.cpp index 8ea93fcd..2af573ab 100644 --- a/solid/utility/test/test_collapse.cpp +++ b/solid/utility/test/test_collapse.cpp @@ -54,7 +54,7 @@ using SharedMessageT = IntrusivePtr; int test_collapse(int argc, char* argv[]) { - solid::log_start(std::cerr, {".*:VIEWXS"}); + solid::log_start(std::cerr, {".*:EWXS"}); char choice = 'B'; // B = basic, p = speed shared_ptr, b = speed SharedBuffer size_t repeat_count = 100; @@ -114,7 +114,7 @@ int test_collapse(int argc, char* argv[]) } } } else if (choice == 'p') { - CallPoolT wp{thread_count, 10000, 100, + CallPoolT wp{{thread_count, 10000, 100}, [](const size_t) { set_current_thread_affinity(); }, @@ -138,7 +138,7 @@ int test_collapse(int argc, char* argv[]) p.set_value(std::move(tmp_sm)); } { - if (f.wait_for(chrono::seconds(5)) != future_status::ready) { + if (f.wait_for(chrono::seconds(5000)) != future_status::ready) { solid_throw("Waited for too long"); } sm = f.get(); @@ -148,7 +148,7 @@ int test_collapse(int argc, char* argv[]) const auto stop_time = chrono::high_resolution_clock::now(); cout << "Duration: " << chrono::duration_cast(stop_time - start_time).count() << "us" << endl; } else if (choice == 'b') { - CallPoolT wp{thread_count, 10000, 100, + CallPoolT wp{{thread_count, 10000, 100}, [](const size_t) { set_current_thread_affinity(); }, diff --git a/solid/utility/test/test_threadpool.cpp b/solid/utility/test/test_threadpool.cpp index a05a4cdb..bd77d838 100644 --- a/solid/utility/test/test_threadpool.cpp +++ b/solid/utility/test/test_threadpool.cpp @@ -91,7 +91,7 @@ int test_threadpool(int argc, char* argv[]) // 1000 10 0 0 1 0 0 auto lambda = [&]() { ThreadPoolT wp{ - consumer_count, queue_size, 0, [](size_t, Context&&) {}, [](size_t, Context&&) {}, + {consumer_count, queue_size, 0}, [](size_t, Context&&) {}, [](size_t, Context&&) {}, [job_sleep_msecs](size_t _v, Context&& _rctx) { // solid_check(_rs == "this is a string", "failed string check"); diff --git a/solid/utility/test/test_threadpool_basic.cpp b/solid/utility/test/test_threadpool_basic.cpp index 46bc7763..a86c7f63 100644 --- a/solid/utility/test/test_threadpool_basic.cpp +++ b/solid/utility/test/test_threadpool_basic.cpp @@ -41,7 +41,7 @@ int test_threadpool_basic(int argc, char* argv[]) for (int i = 0; i < loop_cnt; ++i) { { ThreadPoolT wp{ - 2, 10000, 0, [](const size_t) {}, [](const size_t) {}, + {2, 10000, 0}, [](const size_t) {}, [](const size_t) {}, [&val](const size_t _v) { val += _v; }, diff --git a/solid/utility/test/test_threadpool_batch.cpp b/solid/utility/test/test_threadpool_batch.cpp new file mode 100644 index 00000000..c51195fa --- /dev/null +++ b/solid/utility/test/test_threadpool_batch.cpp @@ -0,0 +1,511 @@ +#include "solid/system/exception.hpp" +#include "solid/system/log.hpp" +#include "solid/system/statistic.hpp" +#include "solid/utility/function.hpp" +#include "solid/utility/threadpool.hpp" +#include +#include +#include +#include + +using namespace solid; +using namespace std; + +namespace { +const LoggerT logger("test"); +struct Context { + atomic min_{InvalidSize{}}; + atomic max_{0}; + atomic sum_{0}; + atomic count_{0}; + + ostream& print(ostream& _ros) const + { + const auto avg = count_ ? sum_ / count_ : 0; + _ros << "min " << min_ << " max " << max_ << " avg " << avg; + return _ros; + } +}; + +ostream& operator<<(ostream& _ros, const Context& _rctx) +{ + return _rctx.print(_ros); +} + +constexpr size_t one_task_size = 64; + +using CallPoolT = ThreadPool, Function>; +struct Entry { + CallPoolT::SynchronizationContextT ctx_; + + Entry(CallPoolT::SynchronizationContextT&& _ctx) + : ctx_(std::move(_ctx)) + { + } +}; + +constexpr size_t thread_count = 10; + +#ifdef SOLID_ON_LINUX +vector isolcpus = {/*3, 4, 5, 6,*/ 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + +void set_current_thread_affinity() +{ + if (std::thread::hardware_concurrency() < (thread_count + isolcpus[0])) { + return; + } + static std::atomic crtCore(0); + + const int isolCore = isolcpus[crtCore.fetch_add(1) % isolcpus.size()]; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(isolCore, &cpuset); + int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + // solid_check(rc == 0); + (void)rc; +} +#else +void set_current_thread_affinity() +{ +} +#endif + +template +void busy_for(const Dur& _dur) +{ + const auto stop = chrono::steady_clock::now() + _dur; + while (chrono::steady_clock::now() < stop) + ; +} +using AtomicCounterT = std::atomic; +using AtomicCounterValueT = AtomicCounterT::value_type; +atomic_size_t push_one_index{0}; +size_t capacity = 8 * 1024; + +template +inline constexpr static auto computeCounter(const IndexT _index, const size_t _capacity) noexcept +{ + return (_index / _capacity) & std::numeric_limits::max(); +} +std::tuple pushOneIndex() noexcept +{ + const auto index = push_one_index.fetch_add(1); + return {index % capacity, computeCounter(index, capacity)}; +} + +} // namespace + +int test_threadpool_batch(int argc, char* argv[]) +{ + solid::log_start(std::cerr, {".*:EWXS", "test:VIEWS"}); + int wait_seconds = 500; + size_t entry_count = 300; + size_t repeat_count = 1000000; + solid_log(logger, Verbose, "capacity " << capacity << " reminder " << (std::numeric_limits::max() % capacity)); + { + vector cnt_vec(capacity, 0); + + for (size_t i = 0; i < (capacity * std::numeric_limits::max() + capacity); ++i) { + const auto [index, counter] = pushOneIndex(); + solid_check(cnt_vec[index] == counter, "" << (int)cnt_vec[index] << " == " << (int)counter << " index = " << index << " i = " << i); + ++cnt_vec[index]; + } + + push_one_index = std::numeric_limits::max() - (10 * capacity) + 1; + + for (size_t i = 0; i < capacity; ++i) { + const auto [index, counter] = pushOneIndex(); + cnt_vec[index] = counter; + ++cnt_vec[index]; + } + + for (size_t i = 0; i < (capacity * std::numeric_limits::max() + capacity); ++i) { + const auto [index, counter] = pushOneIndex(); + solid_check(cnt_vec[index] == counter, "" << (int)cnt_vec[index] << " == " << (int)counter << " index = " << index << " i = " << i << " one_index = " << push_one_index); + ++cnt_vec[index]; + } + } + +#if 1 + auto lambda = [&]() { + Context ctx; + { + solid_log(logger, Verbose, "start"); + CallPoolT wp{ + {thread_count, 12000, 100}, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, + std::ref(ctx)}; + solid_log(logger, Verbose, "TP capacity: one " << wp.capacityOne() << " all " << wp.capacityAll()); + solid_log(logger, Verbose, "create contexts"); + vector entries; + for (size_t i = 0; i < entry_count; ++i) { + entries.emplace_back(wp.createSynchronizationContext()); + } + + solid_log(logger, Verbose, "wp started"); + uint64_t tmin{InvalidSize{}}; + uint64_t tmax{0}; + uint64_t tsum{0}; + uint64_t tcnt{0}; + for (size_t i = 0; i < 40; ++i) { + auto start = chrono::steady_clock::now(); + + for (size_t j = 0; j < entries.size(); ++j) { + auto& entry{entries[j]}; + auto lambda = [&entry, start, j](Context& _rctx) mutable { + const auto now = chrono::steady_clock::now(); + const uint64_t duration = chrono::duration_cast(now - start).count(); + store_min(_rctx.min_, duration); + store_max(_rctx.max_, duration); + _rctx.sum_ += duration; + ++_rctx.count_; + // this_thread::sleep_for(chrono::microseconds(1)); + }; + static_assert(sizeof(lambda) <= one_task_size); + entry.ctx_.push(std::move(lambda)); + // wp.pushOne(std::move(lambda)); + } + { + const uint64_t duration = chrono::duration_cast(chrono::steady_clock::now() - start).count(); + store_min(tmin, duration); + store_max(tmax, duration); + tsum += duration; + ++tcnt; + } + // this_thread::sleep_for(chrono::milliseconds(100)); + } + solid_log(logger, Verbose, "min " << tmin << " max " << tmax << " avg " << tsum / tcnt << " " << ctx); + solid_log(logger, Statistic, "ThreadPool statistic: " << wp.statistic()); + } + solid_log(logger, Verbose, "after loop"); + }; +#elif 0 + // clang-format off + vector busy_cons_vec{ + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, 30000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, + 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, + }; + vector busy_prod_vec{ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, + }; + // clang-format on + auto lambda = [&]() { + Context ctx; + { + solid_log(logger, Verbose, "start"); + CallPoolT wp{ + thread_count, 12000, 100, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, + std::ref(ctx)}; + solid_log(logger, Verbose, "create contexts"); + vector entries; + for (size_t i = 0; i < entry_count; ++i) { + entries.emplace_back(wp.createSynchronizationContext()); + } + + solid_log(logger, Verbose, "wp started"); + uint64_t tmin{InvalidSize{}}; + uint64_t tmax{0}; + uint64_t tsum{0}; + uint64_t tcnt{0}; + uint64_t prod_dur_ns = 0; + uint64_t cons_dur_ns = 0; + for (size_t i = 0; i < repeat_count; ++i) { + const auto prod_ns = busy_prod_vec[i % busy_prod_vec.size()]; + busy_for(std::chrono::nanoseconds(prod_ns)); + prod_dur_ns += prod_ns; + + auto start = chrono::steady_clock::now(); + auto& entry{entries[i % entries.size()]}; + const auto busy_ns = std::chrono::nanoseconds(busy_cons_vec[i % busy_cons_vec.size()]); + cons_dur_ns += busy_ns.count(); + auto lambda = [&entry, start, busy_ns](Context& _rctx) mutable { + const auto now = chrono::steady_clock::now(); + const uint64_t duration = chrono::duration_cast(now - start).count(); + store_min(_rctx.min_, duration); + store_max(_rctx.max_, duration); + _rctx.sum_ += duration; + ++_rctx.count_; + // this_thread::sleep_for(chrono::microseconds(1)); + busy_for(busy_ns); + }; + entry.ctx_.push(std::move(lambda)); + { + const uint64_t duration = chrono::duration_cast(chrono::steady_clock::now() - start).count(); + store_min(tmin, duration); + store_max(tmax, duration); + tsum += duration; + ++tcnt; + } + // this_thread::sleep_for(chrono::milliseconds(100)); + } + + // this_thread::sleep_for(chrono::seconds(100)); + chrono::microseconds prod_dur = chrono::duration_cast(chrono::nanoseconds(prod_dur_ns)); + chrono::microseconds cons_dur = chrono::duration_cast(chrono::nanoseconds(cons_dur_ns)); + solid_log(logger, Verbose, "min " << tmin << " max " << tmax << " avg " << tsum / tcnt << " " << ctx << " prod_dur(us): " << prod_dur.count() << " cons_dur(us): " << cons_dur.count()); + solid_log(logger, Statistic, "ThreadPool statistic: " << wp.statistic()); + } + solid_log(logger, Verbose, "after loop"); + }; +#elif 0 + using LockT = atomic; + static constexpr LockT::value_type stopping = InvalidIndex{}; + static constexpr LockT::value_type popping = 1; + static constexpr LockT::value_type pushing = 2; + static constexpr LockT::value_type empty = 3; + static constexpr LockT::value_type filled = 4; + struct /* alignas(std::hardware_destructive_interference_size) */ ThreadContext { + thread thr_; + atomic_size_t lock_ = {empty}; + size_t value_ = 0; + + void push() + { + size_t value; + while (!lock_.compare_exchange_weak(value = empty, pushing)) { + std::atomic_wait(&lock_, value); + } + + ++value_; + + lock_.store(filled); + std::atomic_notify_one(&lock_); + } + + void stop() + { + size_t value; + while (!lock_.compare_exchange_weak(value = empty, pushing)) { + std::atomic_wait(&lock_, value); + } + + value_ = stopping; + + lock_.store(filled); + std::atomic_notify_one(&lock_); + } + + bool pop(size_t& _expected_value) + { + size_t value; + while (!lock_.compare_exchange_weak(value = filled, popping)) { + std::atomic_wait(&lock_, value); + } + + if (value_ == stopping) + return false; + + solid_check(value_ == _expected_value); + ++_expected_value; + + lock_.store(empty); + std::atomic_notify_one(&lock_); + + return true; + } + }; + auto lambda = [&]() { + set_current_thread_affinity(); + unique_ptr ctxs{new ThreadContext[thread_count]}; + for (size_t i = 0; i < thread_count; ++i) { + auto& ctx = ctxs[i]; + ctx.thr_ = thread( + [](ThreadContext& _rctx) { + set_current_thread_affinity(); + size_t expected_val = 1; + while (_rctx.pop(expected_val)) + ; + }, + ref(ctx)); + } + uint64_t tmin{InvalidSize{}}; + uint64_t tmax{0}; + uint64_t tsum{0}; + uint64_t tcnt{0}; + for (size_t i = 0; i < 40; ++i) { + const auto start = chrono::steady_clock::now(); + for (size_t j = 0; j < entry_count; ++j) { + auto& rctx = ctxs[j % thread_count]; + rctx.push(); + } + { + const uint64_t duration = chrono::duration_cast(chrono::steady_clock::now() - start).count(); + store_min(tmin, duration); + store_max(tmax, duration); + tsum += duration; + ++tcnt; + } + } + for (size_t i = 0; i < thread_count; ++i) { + auto& ctx = ctxs[i]; + ctx.stop(); + ctx.thr_.join(); + } + solid_log(logger, Verbose, "min " << tmin << " max " << tmax << " avg " << tsum / tcnt); + }; +#else + static constexpr size_t stopping = InvalidIndex{}; + static constexpr size_t popping = 1; + static constexpr size_t pushing = 2; + static constexpr size_t empty = 3; + static constexpr size_t filled = 4; + struct alignas(std::hardware_destructive_interference_size) ThreadContext { + thread thr_; + binary_semaphore push_sem_{1}; + binary_semaphore pop_sem_{0}; + + size_t value_ = 0; + + void push() + { + push_sem_.acquire(); + + ++value_; + + pop_sem_.release(); + } + + void stop() + { + push_sem_.acquire(); + value_ = stopping; + pop_sem_.release(); + } + + bool pop(size_t& _expected_value) + { + pop_sem_.acquire(); + if (value_ == stopping) + return false; + + solid_check(value_ == _expected_value); + ++_expected_value; + push_sem_.release(); + return true; + } + }; + auto lambda = [&]() { + set_current_thread_affinity(); + unique_ptr ctxs{new ThreadContext[thread_count]}; + for (size_t i = 0; i < thread_count; ++i) { + auto& ctx = ctxs[i]; + ctx.thr_ = thread( + [](ThreadContext& _rctx) { + set_current_thread_affinity(); + size_t expected_val = 1; + while (_rctx.pop(expected_val)) + ; + }, + ref(ctx)); + } + uint64_t tmin{InvalidSize{}}; + uint64_t tmax{0}; + uint64_t tsum{0}; + uint64_t tcnt{0}; + for (size_t i = 0; i < 40; ++i) { + const auto start = chrono::steady_clock::now(); + for (size_t j = 0; j < entry_count; ++j) { + auto& rctx = ctxs[j % thread_count]; + rctx.push(); + } + { + const uint64_t duration = chrono::duration_cast(chrono::steady_clock::now() - start).count(); + store_min(tmin, duration); + store_max(tmax, duration); + tsum += duration; + ++tcnt; + } + } + for (size_t i = 0; i < thread_count; ++i) { + auto& ctx = ctxs[i]; + ctx.stop(); + ctx.thr_.join(); + } + solid_log(logger, Verbose, "min " << tmin << " max " << tmax << " avg " << tsum / tcnt); + }; +#endif + auto fut = async(launch::async, lambda); + if (fut.wait_for(chrono::seconds(wait_seconds)) != future_status::ready) { + solid_throw(" Test is taking too long - waited " << wait_seconds << " secs"); + } + fut.get(); + solid_log(logger, Verbose, "after async wait"); + + return 0; +} diff --git a/solid/utility/test/test_threadpool_chain.cpp b/solid/utility/test/test_threadpool_chain.cpp index bd1b4782..fb119440 100644 --- a/solid/utility/test/test_threadpool_chain.cpp +++ b/solid/utility/test/test_threadpool_chain.cpp @@ -47,13 +47,13 @@ int test_threadpool_chain(int argc, char* argv[]) for (int i = 0; i < loop_cnt; ++i) { { ThreadPoolT wp_b{ - thread_count, 10000, 1000, [](const size_t) {}, [](const size_t) {}, + {thread_count, 10000, 1000}, [](const size_t) {}, [](const size_t) {}, [&val](const size_t _v) { val += _v; }, [](const size_t) {}}; ThreadPoolT wp_f{ - thread_count, 10000, 1000, [](const size_t) {}, [](const size_t) {}, + {thread_count, 10000, 1000}, [](const size_t) {}, [](const size_t) {}, [&wp_b](const size_t _v) { wp_b.pushOne(_v); }, diff --git a/solid/utility/test/test_threadpool_context.cpp b/solid/utility/test/test_threadpool_context.cpp index 1c68acc8..1ec53a85 100644 --- a/solid/utility/test/test_threadpool_context.cpp +++ b/solid/utility/test/test_threadpool_context.cpp @@ -44,7 +44,7 @@ int test_threadpool_context(int argc, char* argv[]) Context ctx{"test", 1, cnt + 1}; { CallPoolT wp{ - 2, 10000, 100, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, + {2, 10000, 100}, [](const size_t, Context&) {}, [](const size_t, Context& _rctx) {}, std::ref(ctx)}; solid_log(generic_logger, Verbose, "wp started"); diff --git a/solid/utility/test/test_threadpool_multicast_basic.cpp b/solid/utility/test/test_threadpool_multicast_basic.cpp index 041cab0b..e33555f6 100644 --- a/solid/utility/test/test_threadpool_multicast_basic.cpp +++ b/solid/utility/test/test_threadpool_multicast_basic.cpp @@ -46,7 +46,7 @@ int test_threadpool_multicast_basic(int argc, char* argv[]) record_dq.resize(cnt, -1); { ThreadPoolT wp{ - 2, 10000, 1000, [](const size_t) {}, [](const size_t) {}, + {2, 10000, 1000}, [](const size_t) {}, [](const size_t) {}, [&val, &record_dq](const size_t _v) { val += _v; solid_check(record_dq[_v] == static_cast(-1)); diff --git a/solid/utility/test/test_threadpool_multicast_sleep.cpp b/solid/utility/test/test_threadpool_multicast_sleep.cpp index 4d96a322..d0164d61 100644 --- a/solid/utility/test/test_threadpool_multicast_sleep.cpp +++ b/solid/utility/test/test_threadpool_multicast_sleep.cpp @@ -46,7 +46,7 @@ int test_threadpool_multicast_sleep(int argc, char* argv[]) record_dq.resize(cnt, -1); { ThreadPoolT wp{ - 2, 10000, 1000, [](const size_t) {}, [](const size_t) {}, + {2, 10000, 1000}, [](const size_t) {}, [](const size_t) {}, [&val, &record_dq](const size_t _v) { val += _v; solid_check(record_dq[_v] == static_cast(-1)); diff --git a/solid/utility/test/test_threadpool_multicast_synchronization_context_basic.cpp b/solid/utility/test/test_threadpool_multicast_synchronization_context_basic.cpp index 391d0909..4dd1a3e3 100644 --- a/solid/utility/test/test_threadpool_multicast_synchronization_context_basic.cpp +++ b/solid/utility/test/test_threadpool_multicast_synchronization_context_basic.cpp @@ -59,7 +59,7 @@ int test_threadpool_multicast_synchronization_context_basic(int argc, char* argv record_dq.resize(count); ThreadPoolT wp{ - 2, 10000, 1000, [](const size_t) {}, [](const size_t) {}, + {2, 10000, 1000}, [](const size_t) {}, [](const size_t) {}, [&record_dq](const Record& _r) { solid_check(record_dq[_r.value_].multicast_value_ == static_cast(-1)); record_dq[_r.value_].multicast_value_ = thread_local_value; diff --git a/solid/utility/test/test_threadpool_pattern.cpp b/solid/utility/test/test_threadpool_pattern.cpp index 6c13c032..f1e9a270 100644 --- a/solid/utility/test/test_threadpool_pattern.cpp +++ b/solid/utility/test/test_threadpool_pattern.cpp @@ -59,7 +59,7 @@ int test_threadpool_pattern(int argc, char* argv[]) auto lambda = [&]() { ThreadPoolT wp{ - consumer_cnt, 10000, 0, [](const size_t) {}, [](const size_t) {}, + {consumer_cnt, 10000, 0}, [](const size_t) {}, [](const size_t) {}, [&sum, &consummer_pattern, loop = consummer_pattern[0].first, idx = 0](const size_t _v) mutable { sum += _v; --loop; diff --git a/solid/utility/test/test_threadpool_thread_context.cpp b/solid/utility/test/test_threadpool_thread_context.cpp index 39e22043..cfebf41d 100644 --- a/solid/utility/test/test_threadpool_thread_context.cpp +++ b/solid/utility/test/test_threadpool_thread_context.cpp @@ -70,7 +70,7 @@ int test_threadpool_thread_context(int argc, char* argv[]) auto start = chrono::steady_clock::now(); { CallPoolT wp{ - 2, 1000, 0, [](const size_t, Context&&) {}, [](const size_t, Context&&) {}, + {2, 1000, 0}, [](const size_t, Context&&) {}, [](const size_t, Context&&) {}, Context("simple text", 0UL)}; solid_log(logger, Verbose, "wp started"); diff --git a/solid/utility/threadpool.hpp b/solid/utility/threadpool.hpp index 789fb1cb..8838853a 100644 --- a/solid/utility/threadpool.hpp +++ b/solid/utility/threadpool.hpp @@ -10,6 +10,7 @@ #pragma once #include +#include #include #include #include @@ -50,6 +51,9 @@ struct ThreadPoolStatistic : solid::Statistic { std::atomic_uint_fast64_t pop_one_wait_popping_count_ = {0}; std::atomic_uint_fast64_t push_all_wait_lock_count_ = {0}; std::atomic_uint_fast64_t push_all_wait_pushing_count_ = {0}; + std::atomic_uint_fast64_t push_one_latency_min_us_ = {0}; + std::atomic_uint_fast64_t push_one_latency_max_us_ = {0}; + std::atomic_uint_fast64_t push_one_latency_sum_us_ = {0}; ThreadPoolStatistic(); @@ -98,9 +102,13 @@ struct ThreadPoolStatistic : solid::Statistic { solid_statistic_max(max_consume_all_count_, _count); } - void pushOne(const bool _with_context) + void pushOne(const bool _with_context, const uint64_t _duration_us) { ++push_one_count_[_with_context]; + + solid_statistic_min(push_one_latency_min_us_, _duration_us); + solid_statistic_max(push_one_latency_max_us_, _duration_us); + push_one_latency_sum_us_ += _duration_us; } void pushAll(const bool _should_wake) { @@ -161,6 +169,50 @@ struct EmptyThreadPoolStatistic : solid::Statistic { void clear() {} }; +struct ThreadPoolConfiguration { + static constexpr size_t default_one_capacity = 8 * 1024; + static constexpr size_t default_all_capacity = 1024; + + size_t thread_count_ = 1; + size_t one_capacity_ = default_one_capacity; + size_t all_capacity_ = default_all_capacity; + size_t spin_count_ = 1; + + ThreadPoolConfiguration( + const size_t _thread_count = 1, + const size_t _one_capacity = 10 * 1024, + const size_t _all_capacity = 1024, + const size_t _spin_count = 1) + : thread_count_(_thread_count) + , one_capacity_(_one_capacity) + , all_capacity_(_all_capacity) + , spin_count_(_spin_count) + { + } + + auto& threadCount(const size_t _value) + { + thread_count_ = _value; + return *this; + } + auto& oneCapacity(const size_t _value) + { + one_capacity_ = _value; + return *this; + } + auto& allCapacity(const size_t _value) + { + all_capacity_ = _value; + return *this; + } + + auto& spinCount(const size_t _value) + { + spin_count_ = _value; + return *this; + } +}; + template class ThreadPool; @@ -235,11 +287,11 @@ class SynchronizationContext { namespace tpimpl { template -class TaskData { +class alignas(hardware_destructive_interference_size) TaskData { std::aligned_storage_t data_; public: - Task& task() + Task& task() noexcept { return *std::launder(reinterpret_cast(&data_)); } @@ -342,6 +394,7 @@ struct LocalContext { template class ThreadPool : NonCopyable { public: + static constexpr size_t spin_count = 10; struct ContextStub { using TaskQueueT = TaskList; std::atomic_size_t use_count_{1}; @@ -372,30 +425,43 @@ class ThreadPool : NonCopyable { }; private: - enum struct LockE : uint8_t { - Empty = 0, - Pushing, - Filled, - Popping - }; enum struct EventE : uint8_t { Fill, Stop, Wake, }; - struct OneStub : TaskData { -#if defined(__cpp_lib_atomic_wait) - std::atomic_flag pushing_ = ATOMIC_FLAG_INIT; - std::atomic_flag popping_ = ATOMIC_FLAG_INIT; -#else - std::atomic_bool pushing_ = {false}; - std::atomic_bool popping_ = {false}; -#endif - std::atomic_uint8_t lock_ = {to_underlying(LockE::Empty)}; - std::uint8_t event_ = {to_underlying(EventE::Fill)}; - ContextStub* pcontext_ = nullptr; - uint64_t all_id_ = 0; - uint64_t context_produce_id_ = 0; + + using AtomicCounterT = std::atomic; + using AtomicCounterValueT = AtomicCounterT::value_type; + template + inline constexpr static auto computeCounter(const IndexT _index, const size_t _capacity) noexcept + { + return (_index / _capacity) & std::numeric_limits::max(); + } + + struct OneStub { + AtomicCounterT produce_count_{0}; + AtomicCounterT consume_count_{static_cast(-1)}; + std::uint8_t event_ = {to_underlying(EventE::Fill)}; + TaskData* data_ptr_ = nullptr; + ContextStub* pcontext_ = nullptr; + uint64_t all_id_ = 0; + uint64_t context_produce_id_ = 0; + + auto& task() noexcept + { + return data_ptr_->task(); + } + template + void task(T&& _rt) + { + data_ptr_->task(std::forward(_rt)); + } + + void destroy() + { + data_ptr_->destroy(); + } void clear() noexcept { @@ -404,179 +470,115 @@ class ThreadPool : NonCopyable { context_produce_id_ = 0; } - void waitWhilePushOne(Stats& _rstats) noexcept + void waitWhilePushOne(Stats& _rstats, const AtomicCounterValueT _count, const size_t _spin_count) noexcept { + auto spin = _spin_count; while (true) { -#if defined(__cpp_lib_atomic_wait) - const bool already_pushing = pushing_.test_and_set(std::memory_order_acquire); -#else - bool expected = false; - const bool already_pushing = !pushing_.compare_exchange_strong(expected, true, std::memory_order_acquire); -#endif - if (!already_pushing) { - // wait for lock to be 0. - uint8_t value = to_underlying(LockE::Empty); - - if (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))) { - do { - std::atomic_wait(&lock_, value); - value = to_underlying(LockE::Empty); - } while (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))); - _rstats.pushOneWaitLock(); - } - return; - } else { -#if defined(__cpp_lib_atomic_wait) - pushing_.wait(true); -#else - std::atomic_wait(&pushing_, true); -#endif - _rstats.pushOneWaitPushing(); + const auto cnt = produce_count_.load(); + if (cnt == _count) { + break; + } else if (_spin_count && !spin--) { + _rstats.pushOneWaitLock(); + std::atomic_wait_explicit(&produce_count_, cnt, std::memory_order_relaxed); + spin = _spin_count; } } } - void notifyWhilePushOne() noexcept + void notifyWhilePushOne(std::chrono::time_point const& _start, uint64_t& _rduration) noexcept { + using namespace std::chrono; event_ = to_underlying(EventE::Fill); - lock_.store(to_underlying(LockE::Filled)); - std::atomic_notify_one(&lock_); -#if defined(__cpp_lib_atomic_wait) - pushing_.clear(std::memory_order_release); - pushing_.notify_one(); -#else - pushing_.store(false, std::memory_order_release); - std::atomic_notify_one(&pushing_); -#endif + ++consume_count_; + std::atomic_notify_all(&consume_count_); + _rduration = duration_cast(steady_clock::now() - _start).count(); } - void waitWhileStop(Stats& _rstats) noexcept + void waitWhileStop(Stats& _rstats, const AtomicCounterValueT _count, const size_t _spin_count) noexcept { - waitWhilePushOne(_rstats); + waitWhilePushOne(_rstats, _count, _spin_count); } - void waitWhilePushAll(Stats& _rstats) noexcept + void waitWhilePushAll(Stats& _rstats, const AtomicCounterValueT _count, const size_t _spin_count) noexcept { - waitWhilePushOne(_rstats); + waitWhilePushOne(_rstats, _count, _spin_count); } void notifyWhileStop() noexcept { event_ = to_underlying(EventE::Stop); - lock_.store(to_underlying(LockE::Filled)); - std::atomic_notify_one(&lock_); -#if defined(__cpp_lib_atomic_wait) - pushing_.clear(std::memory_order_release); - pushing_.notify_one(); -#else - pushing_.store(false, std::memory_order_release); - std::atomic_notify_one(&pushing_); -#endif + ++consume_count_; + std::atomic_notify_all(&consume_count_); } void notifyWhilePushAll() noexcept { event_ = to_underlying(EventE::Wake); - lock_.store(to_underlying(LockE::Filled)); - std::atomic_notify_one(&lock_); -#if defined(__cpp_lib_atomic_wait) - pushing_.clear(std::memory_order_release); - pushing_.notify_one(); -#else - pushing_.store(false, std::memory_order_release); - std::atomic_notify_one(&pushing_); -#endif + ++consume_count_; + std::atomic_notify_all(&consume_count_); } template < class Fnc, class AllFnc, typename... Args> - EventE waitWhilePop(Stats& _rstats, const Fnc& _try_consume_an_all_fnc, AllFnc& _all_fnc, Args&&... _args) noexcept + EventE waitWhilePop(Stats& _rstats, const AtomicCounterValueT _count, const size_t _spin_count, const Fnc& _try_consume_an_all_fnc, AllFnc& _all_fnc, Args&&... _args) noexcept { + auto spin = _spin_count; while (true) { -#if defined(__cpp_lib_atomic_wait) - const bool already_popping = popping_.test_and_set(std::memory_order_acquire); -#else - bool expected = false; - const bool already_popping = !popping_.compare_exchange_strong(expected, true, std::memory_order_acquire); -#endif - if (!already_popping) { - // wait for lock to be 1 or 2. - uint8_t value = to_underlying(LockE::Filled); - - if (!lock_.compare_exchange_weak(value, to_underlying(LockE::Popping))) { - do { - if (!_try_consume_an_all_fnc(&lock_, _all_fnc, std::forward(_args)...)) { - std::atomic_wait(&lock_, value); - } - value = to_underlying(LockE::Filled); - } while (!lock_.compare_exchange_weak(value, to_underlying(LockE::Popping))); - _rstats.popOneWaitLock(); - } + const auto cnt = consume_count_.load(); + if (cnt == _count) { return static_cast(event_); - } else { -#if defined(__cpp_lib_atomic_wait) - popping_.wait(true); -#else - std::atomic_wait(&popping_, true); -#endif + } else if (!_try_consume_an_all_fnc(&consume_count_, _count, _all_fnc, std::forward(_args)...) && _spin_count && !spin--) { + + std::atomic_wait_explicit(&consume_count_, cnt, std::memory_order_relaxed); + _rstats.popOneWaitPopping(); + spin = _spin_count; } } } void notifyWhilePop() noexcept { - lock_.store(to_underlying(LockE::Empty)); - std::atomic_notify_one(&lock_); -#if defined(__cpp_lib_atomic_wait) - popping_.clear(std::memory_order_release); - popping_.notify_one(); -#else - popping_.store(false, std::memory_order_release); - std::atomic_notify_one(&popping_); -#endif + ++produce_count_; + std::atomic_notify_all(&produce_count_); } }; - struct AllStub : TaskData { -#if defined(__cpp_lib_atomic_wait) - std::atomic_flag pushing_ = ATOMIC_FLAG_INIT; -#else - std::atomic_bool pushing_ = {false}; -#endif - std::atomic_uint8_t lock_ = {to_underlying(LockE::Empty)}; + struct AllStub { + AtomicCounterT produce_count_{0}; + AtomicCounterT consume_count_{static_cast(-1)}; std::atomic_uint32_t use_count_ = {0}; std::atomic_uint64_t id_ = {0}; + TaskData* data_ptr_ = nullptr; - void waitWhilePushAll(Stats& _rstats) noexcept + auto& task() noexcept + { + return data_ptr_->task(); + } + template + void task(T&& _rt) { + data_ptr_->task(std::forward(_rt)); + } + + void destroy() + { + data_ptr_->destroy(); + } + + void waitWhilePushAll(Stats& _rstats, const AtomicCounterValueT _count, const size_t _spin_count) noexcept + { + auto spin = _spin_count; while (true) { -#if defined(__cpp_lib_atomic_wait) - const bool already_pushing = pushing_.test_and_set(std::memory_order_acquire); -#else - bool expected = false; - const bool already_pushing = !pushing_.compare_exchange_strong(expected, true, std::memory_order_acquire); -#endif - if (!already_pushing) { - uint8_t value = to_underlying(LockE::Empty); - - if (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))) { - do { - std::atomic_wait(&lock_, value); - value = to_underlying(LockE::Empty); - } while (!lock_.compare_exchange_weak(value, to_underlying(LockE::Pushing))); - _rstats.pushAllWaitLock(); - } - return; - } else { -#if defined(__cpp_lib_atomic_wait) - pushing_.wait(true); -#else - std::atomic_wait(&pushing_, true); -#endif - _rstats.pushAllWaitPushing(); + const auto cnt = produce_count_.load(); + if (cnt == _count) { + break; + } else if (_spin_count && !spin--) { + _rstats.pushOneWaitLock(); + std::atomic_wait_explicit(&produce_count_, cnt, std::memory_order_relaxed); + spin = _spin_count; } } } @@ -585,60 +587,65 @@ class ThreadPool : NonCopyable { { use_count_.store(_thread_count); id_.store(_id); - lock_.store(to_underlying(LockE::Filled)); -#if defined(__cpp_lib_atomic_wait) - pushing_.clear(std::memory_order_release); - pushing_.notify_one(); -#else - pushing_.store(false, std::memory_order_release); - std::atomic_notify_one(&pushing_); -#endif + ++consume_count_; } bool notifyWhilePop() noexcept { if (use_count_.fetch_sub(1) == 1) { - TaskData::destroy(); - lock_.store(to_underlying(LockE::Empty)); - std::atomic_notify_one(&lock_); + destroy(); + ++produce_count_; + std::atomic_notify_all(&produce_count_); return true; } return false; } - bool isFilled(const uint64_t _id) const + bool isFilled(const uint64_t _id, const size_t _capacity) const { - return lock_.load() == to_underlying(LockE::Filled) && id_.load() == _id; + const auto count = consume_count_.load(std::memory_order_relaxed); + const AtomicCounterValueT expected_count = computeCounter(_id, _capacity); + return count == expected_count && id_.load() == _id; } }; using AllStubT = AllStub; using OneStubT = OneStub; using ThreadVectorT = std::vector; + size_t spin_count_ = 1; /* alignas(hardware_constructive_interference_size) */ struct { - size_t capacity_{0}; - std::atomic_size_t pending_count_{0}; - std::atomic_uint_fast64_t push_index_{1}; - std::atomic_uint_fast64_t commited_index_{0}; - std::unique_ptr tasks_; - } all_; - /* alignas(hardware_constructive_interference_size) */ struct { - size_t capacity_{0}; - std::unique_ptr tasks_; + size_t capacity_{0}; + std::unique_ptr tasks_; + std::unique_ptr[]> datas_; } one_; + + /* alignas(hardware_constructive_interference_size) */ struct { + size_t capacity_{0}; + std::atomic_size_t pending_count_{0}; + std::atomic_uint_fast64_t push_index_{1}; + std::atomic_uint_fast64_t commited_index_{0}; + std::unique_ptr tasks_; + std::unique_ptr[]> datas_; + } all_; + Stats statistic_; - alignas(hardware_destructive_interference_size) std::atomic_size_t push_one_index_{0}; - alignas(hardware_destructive_interference_size) std::atomic_size_t pop_one_index_{0}; + using AtomicIndexT = std::atomic_size_t; + using AtomicIndexValueT = std::atomic_size_t::value_type; + + alignas(hardware_destructive_interference_size) AtomicIndexT push_one_index_{0}; + alignas(hardware_destructive_interference_size) AtomicIndexT pop_one_index_{0}; ThreadVectorT threads_; std::atomic running_{false}; - size_t pushOneIndex() noexcept + std::tuple pushOneIndex() noexcept { - return push_one_index_.fetch_add(1) % one_.capacity_; + const auto index = push_one_index_.fetch_add(1); + return {index % one_.capacity_, computeCounter(index, one_.capacity_)}; } - size_t popOneIndex() noexcept + std::tuple popOneIndex() noexcept { - return pop_one_index_.fetch_add(1) % one_.capacity_; + const auto index = pop_one_index_.fetch_add(1); + return {index % one_.capacity_, computeCounter(index, one_.capacity_)}; } auto pushAllId() noexcept @@ -665,13 +672,11 @@ class ThreadPool : NonCopyable { class AllFnc, typename... Args> void doStart( - const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, - OneFnc _one_fnc, - AllFnc _all_fnc, + const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, + OneFnc _one_fnc, + AllFnc _all_fnc, Args&&... _args); void doStop(); @@ -727,7 +732,8 @@ class ThreadPool : NonCopyable { template < class AllFnc, typename... Args> - bool tryConsumeAnAllTask(std::atomic_uint8_t* _plock, LocalContext& _rlocal_context, AllFnc& _all_fnc, Args&&... _args); + bool tryConsumeAnAllTask(AtomicCounterT* _pcounter, + const AtomicCounterValueT _count, LocalContext& _rlocal_context, AllFnc& _all_fnc, Args&&... _args); template < class AllFnc, typename... Args> @@ -748,6 +754,7 @@ class ThreadPool { public: using SynchronizationContextT = SynchronizationContext; + using ConfigurationT = ThreadPoolConfiguration; ThreadPool() = default; @@ -758,19 +765,15 @@ class ThreadPool { class AllFnc, typename... Args> ThreadPool( - const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, - OneFnc _one_fnc, - AllFnc _all_fnc, + const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, + OneFnc _one_fnc, + AllFnc _all_fnc, Args&&... _args) { impl_.doStart( - _thread_count, - _one_capacity, - _all_capacity, + _config, _start_fnc, _stop_fnc, _one_fnc, @@ -785,19 +788,15 @@ class ThreadPool { class AllFnc, typename... Args> void start( - const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, - OneFnc _one_fnc, - AllFnc _all_fnc, + const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, + OneFnc _one_fnc, + AllFnc _all_fnc, Args&&... _args) { impl_.doStart( - _thread_count, - _one_capacity, - _all_capacity, + _config, _start_fnc, _stop_fnc, _one_fnc, @@ -871,6 +870,7 @@ class ThreadPool, Function; + using ConfigurationT = ThreadPoolConfiguration; template static constexpr bool is_small_one_type() @@ -889,17 +889,13 @@ class ThreadPool, Function ThreadPool( - const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, + const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, Args&&... _args) { impl_.doStart( - _thread_count, - _one_capacity, - _all_capacity, + _config, _start_fnc, _stop_fnc, [](OneFunctionT& _rfnc, Args&&... _args) { @@ -914,16 +910,12 @@ class ThreadPool, Function - void start(const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, Args... _args) + void start(const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, Args... _args) { impl_.doStart( - _thread_count, - _one_capacity, - _all_capacity, + _config, _start_fnc, _stop_fnc, [](OneFunctionT& _rfnc, Args&&... _args) { @@ -991,20 +983,22 @@ class ThreadPool, Function -template + class AllFnc, + typename... Args> void ThreadPool::doStart( - const size_t _thread_count, - const size_t _one_capacity, - const size_t _all_capacity, - StartFnc _start_fnc, - StopFnc _stop_fnc, - OneFnc _one_fnc, - AllFnc _all_fnc, + const ThreadPoolConfiguration& _config, + StartFnc _start_fnc, + StopFnc _stop_fnc, + OneFnc _one_fnc, + AllFnc _all_fnc, Args&&... _args) { + static_assert( + (std::numeric_limits::max() % std::bit_ceil(ThreadPoolConfiguration::default_one_capacity)) == (std::bit_ceil(ThreadPoolConfiguration::default_one_capacity) - 1) && (std::numeric_limits::max() % std::bit_ceil(ThreadPoolConfiguration::default_all_capacity)) == (std::bit_ceil(ThreadPoolConfiguration::default_all_capacity) - 1)); bool expect = false; if (!running_.compare_exchange_strong(expect, true)) { @@ -1014,14 +1008,32 @@ void ThreadPool::doStart( solid_dbg(generic_logger, Error, "sizeof(OneStub) = " << sizeof(OneStubT) << " sizeof(AllStub) = " << sizeof(AllStubT)); threads_.clear(); - threads_.reserve(_thread_count); - const auto thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency(); + const auto thread_count = _config.thread_count_ ? _config.thread_count_ : std::thread::hardware_concurrency(); + threads_.reserve(thread_count); - one_.capacity_ = _one_capacity >= thread_count ? _one_capacity : std::max(static_cast(1024), thread_count); + one_.capacity_ = std::bit_ceil(std::max(_config.one_capacity_, thread_count)); one_.tasks_.reset(new OneStubT[one_.capacity_]); - all_.capacity_ = _all_capacity ? _all_capacity : 1; + one_.datas_.reset(new TaskData[one_.capacity_]); + + for (size_t i = 0; i < one_.capacity_; ++i) { + one_.tasks_[i].data_ptr_ = &one_.datas_[i]; + } + + all_.capacity_ = std::bit_ceil(_config.all_capacity_ ? _config.all_capacity_ : 1); all_.tasks_.reset(new AllStubT[all_.capacity_]); + all_.datas_.reset(new TaskData[all_.capacity_]); + + solid_check( + (std::numeric_limits::max() % one_.capacity_) == (one_.capacity_ - 1) && (std::numeric_limits::max() % all_.capacity_) == (all_.capacity_ - 1)); + + for (size_t i = 0; i < all_.capacity_; ++i) { + all_.tasks_[i].data_ptr_ = &all_.datas_[i]; + } + all_.tasks_[0].produce_count_ = 1; //+ + all_.tasks_[0].consume_count_ = 0; // first entry is skipped on the first iteration + + spin_count_ = _config.spin_count_; for (size_t i = 0; i < thread_count; ++i) { threads_.emplace_back( @@ -1046,9 +1058,10 @@ void ThreadPool::doStop() } for (size_t i = 0; i < threads_.size(); ++i) { - auto& rstub = one_.tasks_[pushOneIndex()]; + const auto [index, count] = pushOneIndex(); + auto& rstub = one_.tasks_[index]; - rstub.waitWhileStop(statistic_); + rstub.waitWhileStop(statistic_, count, spin_count_); rstub.notifyWhileStop(); } @@ -1071,18 +1084,22 @@ void ThreadPool::doRun( LocalContext local_context; while (true) { - const size_t index = popOneIndex(); - auto& rstub = one_.tasks_[index]; - uint64_t local_one_context_count = 0; - const auto event = rstub.waitWhilePop( + const auto [index, count] = popOneIndex(); + auto& rstub = one_.tasks_[index]; + uint64_t local_one_context_count = 0; + + const auto event = rstub.waitWhilePop( statistic_, + count, + spin_count_, [this, &local_context]( - std::atomic_uint8_t* _plock, - AllFnc& _all_fnc, + AtomicCounterT* _pcounter, + const AtomicCounterValueT _count, + AllFnc& _all_fnc, Args&&... _args) { // we need to make sure that, after processing an all_task, no new one_task can have // the all_id less than the all task that we have just processed. - return tryConsumeAnAllTask(_plock, local_context, _all_fnc, std::forward(_args)...); + return tryConsumeAnAllTask(_pcounter, _count, local_context, _all_fnc, std::forward(_args)...); }, _all_fnc, std::forward(_args)...); @@ -1160,6 +1177,9 @@ void ThreadPool::doRun( statistic_.runOneContextCount(local_one_context_count, local_context.one_context_count_); } else if (event == EventE::Wake) { + const auto all_id = rstub.all_id_; + consumeAll(local_context, all_id, _all_fnc, std::forward(_args)...); + ++local_context.wake_count_; statistic_.runWakeCount(local_context.wake_count_); rstub.notifyWhilePop(); @@ -1174,10 +1194,11 @@ template template < class AllFnc, typename... Args> -bool ThreadPool::tryConsumeAnAllTask(std::atomic_uint8_t* _plock, LocalContext& _rlocal_context, AllFnc& _all_fnc, Args&&... _args) +bool ThreadPool::tryConsumeAnAllTask(AtomicCounterT* _pcounter, + const AtomicCounterValueT _count, LocalContext& _rlocal_context, AllFnc& _all_fnc, Args&&... _args) { auto& rstub = all_.tasks_[_rlocal_context.next_all_id_ % all_.capacity_]; - if (rstub.isFilled(_rlocal_context.next_all_id_)) { + if (rstub.isFilled(_rlocal_context.next_all_id_, all_.capacity_)) { // NOTE: first we fetch the commited_all_index then we check if the // current stub is reserved (some thread is starting to push something) // - this is to ensure that we are not processing an all task prior to being @@ -1186,11 +1207,11 @@ bool ThreadPool::tryConsumeAnAllTask(std::atomic_uint8_ // we're atomicaly marking the one stub as Pushing. const auto commited_all_index = all_.commited_index_.load(); - if (_plock && *_plock != to_underlying(LockE::Empty)) { + if (_pcounter && _pcounter->load(/* std::memory_order_relaxed */) == _count) { // NOTE: this is to ensure that pushOnes and pushAlls from // the same producer are processed in the same order they // were produced. - return false; // will wait on lock + return true; } if (overflow_safe_less(commited_all_index, _rlocal_context.next_all_id_)) { @@ -1225,7 +1246,7 @@ void ThreadPool::consumeAll(LocalContext& _rlocal_conte { size_t repeat_count = 0; while (overflow_safe_less(_rlocal_context.next_all_id_, _all_id) || _rlocal_context.next_all_id_ == _all_id) { - tryConsumeAnAllTask(nullptr, _rlocal_context, _all_fnc, std::forward(_args)...); + tryConsumeAnAllTask(nullptr, 0, _rlocal_context, _all_fnc, std::forward(_args)...); ++repeat_count; } statistic_.consumeAll(repeat_count); @@ -1235,10 +1256,12 @@ template template void ThreadPool::doPushOne(Tsk&& _task, ContextStub* _pctx) { - const auto index = pushOneIndex(); - auto& rstub = one_.tasks_[index]; + using namespace std::chrono; + const auto start = steady_clock::now(); + const auto [index, count] = pushOneIndex(); + auto& rstub = one_.tasks_[index]; - rstub.waitWhilePushOne(statistic_); + rstub.waitWhilePushOne(statistic_, count, spin_count_); rstub.task(std::forward(_task)); rstub.pcontext_ = _pctx; @@ -1248,10 +1271,10 @@ void ThreadPool::doPushOne(Tsk&& _task, ContextStub* _p _pctx->acquire(); rstub.context_produce_id_ = _pctx->produce_id_.fetch_add(1); } - - rstub.notifyWhilePushOne(); - - statistic_.pushOne(_pctx != nullptr); + uint64_t duration; + rstub.notifyWhilePushOne(start, duration); + // const uint64_t duration = duration_cast(steady_clock::now() - start).count(); + statistic_.pushOne(_pctx != nullptr, duration); } //----------------------------------------------------------------------------- // NOTE: @@ -1265,7 +1288,7 @@ void ThreadPool::doPushAll(Tsk&& _task) const auto id = pushAllId(); auto& rstub = all_.tasks_[id % all_.capacity_]; - rstub.waitWhilePushAll(statistic_); + rstub.waitWhilePushAll(statistic_, computeCounter(id, all_.capacity_), spin_count_); rstub.task(std::forward(_task)); @@ -1277,9 +1300,10 @@ void ThreadPool::doPushAll(Tsk&& _task) if (should_wake_threads) { for (size_t i = 0; i < threads_.size(); ++i) { - auto& rstub = one_.tasks_[pushOneIndex()]; + const auto [index, count] = pushOneIndex(); // TODO: + auto& rstub = one_.tasks_[index]; - rstub.waitWhilePushAll(statistic_); + rstub.waitWhilePushAll(statistic_, count, spin_count_); rstub.all_id_ = id; @@ -1299,4 +1323,4 @@ typename ThreadPool::ContextStub* ThreadPool&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err; diff --git a/tutorials/mprpc_echo/mprpc_echo_client_pool.cpp b/tutorials/mprpc_echo/mprpc_echo_client_pool.cpp index 06cadf1c..15bca936 100644 --- a/tutorials/mprpc_echo/mprpc_echo_client_pool.cpp +++ b/tutorials/mprpc_echo/mprpc_echo_client_pool.cpp @@ -77,7 +77,7 @@ int main(int argc, char* argv[]) AioSchedulerT scheduler; frame::Manager manager; frame::mprpc::ServiceT rpcservice(manager); - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err; diff --git a/tutorials/mprpc_echo_relay/mprpc_echo_relay_client.cpp b/tutorials/mprpc_echo_relay/mprpc_echo_relay_client.cpp index ccb06fe3..05924f1c 100644 --- a/tutorials/mprpc_echo_relay/mprpc_echo_relay_client.cpp +++ b/tutorials/mprpc_echo_relay/mprpc_echo_relay_client.cpp @@ -71,7 +71,7 @@ int main(int argc, char* argv[]) frame::Manager manager; frame::mprpc::ServiceT rpcservice(manager); ErrorConditionT err; - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); scheduler.start(1); diff --git a/tutorials/mprpc_file/mprpc_file_client.cpp b/tutorials/mprpc_file/mprpc_file_client.cpp index 4c2c2ce2..4c201080 100644 --- a/tutorials/mprpc_file/mprpc_file_client.cpp +++ b/tutorials/mprpc_file/mprpc_file_client.cpp @@ -80,7 +80,7 @@ int main(int argc, char* argv[]) AioSchedulerT scheduler; frame::Manager manager; frame::mprpc::ServiceT rpcservice(manager); - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err; diff --git a/tutorials/mprpc_request/mprpc_request_client.cpp b/tutorials/mprpc_request/mprpc_request_client.cpp index 83870eb4..3767e842 100644 --- a/tutorials/mprpc_request/mprpc_request_client.cpp +++ b/tutorials/mprpc_request/mprpc_request_client.cpp @@ -78,7 +78,7 @@ int main(int argc, char* argv[]) AioSchedulerT scheduler; frame::Manager manager; frame::mprpc::ServiceT rpcservice(manager); - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err; diff --git a/tutorials/mprpc_request_ssl/mprpc_request_client.cpp b/tutorials/mprpc_request_ssl/mprpc_request_client.cpp index 427e5095..717d426b 100644 --- a/tutorials/mprpc_request_ssl/mprpc_request_client.cpp +++ b/tutorials/mprpc_request_ssl/mprpc_request_client.cpp @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) AioSchedulerT scheduler; frame::Manager manager; frame::mprpc::ServiceT rpcservice(manager); - CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}}; + CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}}; frame::aio::Resolver resolver([&cwp](std::function&& _fnc) { cwp.pushOne(std::move(_fnc)); }); ErrorConditionT err;