From 9a5bd39d4c89ee7bb1607143a8755d5ab68e7f2f Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 3 Mar 2025 10:10:04 -0500 Subject: [PATCH] Revert "Use `LegacySSHStore`" There were some hangs caused by this. Need to fix them, ideally reproducing the issue in a test, before trying this again. This reverts commit 4a4a0f901c70676ee47f830d2ff6a72789ba1baf. --- src/hydra-queue-runner/build-remote.cc | 180 ++++++++++++++++--------- src/hydra-queue-runner/state.hh | 8 +- 2 files changed, 119 insertions(+), 69 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 39970bd39..77bde2c45 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -9,10 +9,13 @@ #include "path.hh" #include "legacy-ssh-store.hh" #include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "state.hh" #include "current-process.hh" #include "processes.hh" #include "util.hh" +#include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "ssh.hh" #include "finally.hh" #include "url.hh" @@ -36,6 +39,38 @@ bool ::Machine::isLocalhost() const namespace nix::build_remote { +static std::unique_ptr openConnection( + ::Machine::ptr machine, SSHMaster & master) +{ + Strings command = {"nix-store", "--serve", "--write"}; + if (machine->isLocalhost()) { + command.push_back("--builders"); + command.push_back(""); + } else { + auto remoteStore = machine->storeUri.params.find("remote-store"); + if (remoteStore != machine->storeUri.params.end()) { + command.push_back("--store"); + command.push_back(shellEscape(remoteStore->second)); + } + } + + auto ret = master.startCommand(std::move(command), { + "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" + }); + + // XXX: determine the actual max value we can use from /proc. + + // FIXME: Should this be upstreamed into `startCommand` in Nix? + + int pipesize = 1024 * 1024; + + fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize); + fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize); + + return ret; +} + + static void copyClosureTo( ::Machine::Connection & conn, Store & destStore, @@ -52,8 +87,8 @@ static void copyClosureTo( // FIXME: substitute output pollutes our build log /* Get back the set of paths that are already valid on the remote host. */ - auto present = conn.store->queryValidPaths( - closure, true, useSubstitutes); + auto present = conn.queryValidPaths( + destStore, true, closure, useSubstitutes); if (present.size() == closure.size()) return; @@ -68,7 +103,12 @@ static void copyClosureTo( std::unique_lock sendLock(conn.machine->state->sendLock, std::chrono::seconds(600)); - conn.store->addMultipleToStoreLegacy(destStore, missing); + conn.to << ServeProto::Command::ImportPaths; + destStore.exportPaths(missing, conn.to); + conn.to.flush(); + + if (readInt(conn.from) != 1) + throw Error("remote machine failed to import closure"); } @@ -188,7 +228,7 @@ static BuildResult performBuild( counter & nrStepsBuilding ) { - auto kont = conn.store->buildDerivationAsync(drvPath, drv, options); + conn.putBuildDerivationRequest(localStore, drvPath, drv, options); BuildResult result; @@ -197,10 +237,7 @@ static BuildResult performBuild( startTime = time(0); { MaintainCount mc(nrStepsBuilding); - result = kont(); - // Without proper call-once functions, we need to manually - // delete after calling. - kont = {}; + result = ServeProto::Serialise::read(localStore, conn); } stopTime = time(0); @@ -216,7 +253,7 @@ static BuildResult performBuild( // If the protocol was too old to give us `builtOutputs`, initialize // it manually by introspecting the derivation. - if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) + if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) { // If the remote is too old to handle CA derivations, we can’t get this // far anyways @@ -249,25 +286,26 @@ static void copyPathFromRemote( const ValidPathInfo & info ) { - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - conn.store->narFromPath(info.path, [&](Source & source) { - TeeSource tee{source, sink}; - extractNarData(tee, conn.store->printStorePath(info.path), narMembers); - }); - }); - - destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); + /* Receive the NAR from the remote and add it to the + destination store. Meanwhile, extract all the info from the + NAR that getBuildOutput() needs. */ + auto source2 = sinkToSource([&](Sink & sink) + { + /* Note: we should only send the command to dump the store + path to the remote if the NAR is actually going to get read + by the destination store, which won't happen if this path + is already valid on the destination store. Since this + lambda function only gets executed if someone tries to read + from source2, we will send the command from here rather + than outside the lambda. */ + conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); + conn.to.flush(); + + TeeSource tee(conn.from, sink); + extractNarData(tee, localStore.printStorePath(info.path), narMembers); + }); + + destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); } static void copyPathsFromRemote( @@ -366,39 +404,30 @@ void State::buildRemote(ref destStore, updateStep(ssConnecting); - // FIXME: rewrite to use Store. - ::Machine::Connection conn { - .machine = machine, - .store = [&]{ - auto * pSpecified = std::get_if(&machine->storeUri.variant); - if (!pSpecified || pSpecified->scheme != "ssh") { - throw Error("Currently, only (legacy-)ssh stores are supported!"); - } - - auto remoteStore = machine->openStore().dynamic_pointer_cast(); - assert(remoteStore); - - remoteStore->connPipeSize = 1024 * 1024; - - if (machine->isLocalhost()) { - auto rp_new = remoteStore->remoteProgram.get(); - rp_new.push_back("--builders"); - rp_new.push_back(""); - const_cast &>(remoteStore->remoteProgram).assign(rp_new); - } - remoteStore->extraSshArgs = { - "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" - }; - const_cast &>(remoteStore->logFD).assign(logFD.get()); - - return nix::ref{remoteStore}; - }(), + auto storeRef = machine->completeStoreReference(); + + auto * pSpecified = std::get_if(&storeRef.variant); + if (!pSpecified || pSpecified->scheme != "ssh") { + throw Error("Currently, only (legacy-)ssh stores are supported!"); + } + + LegacySSHStoreConfig storeConfig { + pSpecified->scheme, + pSpecified->authority, + storeRef.params }; + auto master = storeConfig.createSSHMaster( + false, // no SSH master yet + logFD.get()); + + // FIXME: rewrite to use Store. + auto child = build_remote::openConnection(machine, master); + { auto activeStepState(activeStep->state_.lock()); if (activeStepState->cancelled) throw Error("step cancelled"); - activeStepState->pid = conn.store->getConnectionPid(); + activeStepState->pid = child->sshPid; } Finally clearPid([&]() { @@ -413,12 +442,35 @@ void State::buildRemote(ref destStore, process. Meh. */ }); + ::Machine::Connection conn { + { + .to = child->in.get(), + .from = child->out.get(), + /* Handshake. */ + .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize + }, + /*.machine =*/ machine, + }; + Finally updateStats([&]() { - auto stats = conn.store->getConnectionStats(); - bytesReceived += stats.bytesReceived; - bytesSent += stats.bytesSent; + bytesReceived += conn.from.read; + bytesSent += conn.to.written; }); + constexpr ServeProto::Version our_version = 0x206; + + try { + conn.remoteVersion = decltype(conn)::handshake( + conn.to, + conn.from, + our_version, + machine->storeUri.render()); + } catch (EndOfFile & e) { + child->sshPid.wait(); + std::string s = chomp(readFile(result.logFile)); + throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); + } + { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; @@ -487,7 +539,7 @@ void State::buildRemote(ref destStore, auto now1 = std::chrono::steady_clock::now(); - auto infos = conn.store->queryPathInfosUncached(outputs); + auto infos = conn.queryPathInfos(*localStore, outputs); size_t totalNarSize = 0; for (auto & [_, info] : infos) totalNarSize += info.narSize; @@ -522,11 +574,9 @@ void State::buildRemote(ref destStore, } } - /* Shut down the connection done by RAII. - - Only difference is kill() instead of wait() (i.e. send signal - then wait()) - */ + /* Shut down the connection. */ + child->in = -1; + child->sshPid.wait(); } catch (Error & e) { /* Disable this machine until a certain period of time has diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index e2d31434f..30e01c746 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -20,7 +20,9 @@ #include "store-api.hh" #include "sync.hh" #include "nar-extractor.hh" -#include "legacy-ssh-store.hh" +#include "serve-protocol.hh" +#include "serve-protocol-impl.hh" +#include "serve-protocol-connection.hh" #include "machines.hh" @@ -290,11 +292,9 @@ struct Machine : nix::Machine bool isLocalhost() const; // A connection to a machine - struct Connection { + struct Connection : nix::ServeProto::BasicClientConnection { // Backpointer to the machine ptr machine; - // Opened store - nix::ref store; }; };