Skip to content

Commit

Permalink
s3: rework endpoint & host_header resolution.
Browse files Browse the repository at this point in the history
Separates the inital AWS s3 request into the s3_endpoint file,
and reworks the logic to better handle spawned localstack,
already running localstack backends, and actual services.

renames the "host" driver spec option to "host_header".

PiperOrigin-RevId: 576183368
Change-Id: Ie812abfd61f62b8523d94ef00644a2fd5bae8542
  • Loading branch information
laramiel authored and copybara-github committed Oct 24, 2023
1 parent 78f685f commit 1b7417d
Show file tree
Hide file tree
Showing 11 changed files with 620 additions and 167 deletions.
1 change: 1 addition & 0 deletions tensorstore/internal/http/http_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <vector>

#include "absl/functional/function_ref.h"
#include "absl/strings/str_format.h"
#include "absl/time/time.h"
#include "tensorstore/internal/uri_utils.h"
#include "tensorstore/kvstore/byte_range.h"
Expand Down
1 change: 1 addition & 0 deletions tensorstore/kvstore/ocdbt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ tensorstore_cc_test(
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/strings:cord",
"@com_google_googletest//:gtest_main",
],
Expand Down
1 change: 1 addition & 0 deletions tensorstore/kvstore/ocdbt/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/cord.h"
#include <nlohmann/json.hpp>
#include "tensorstore/context.h"
#include "tensorstore/internal/global_initializer.h"
#include "tensorstore/internal/json_fwd.h"
Expand Down
39 changes: 39 additions & 0 deletions tensorstore/kvstore/s3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ tensorstore_cc_library(
local_defines = DEBUG_LOCAL_DEFINES,
deps = [
":aws_credential_provider",
":s3_endpoint",
":s3_request_builder",
":s3_resource",
":s3_uri_utils",
Expand Down Expand Up @@ -320,17 +321,55 @@ tensorstore_cc_test(
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
],
)

tensorstore_cc_library(
name = "s3_endpoint",
srcs = ["s3_endpoint.cc"],
hdrs = ["s3_endpoint.h"],
deps = [
":validate",
"//tensorstore/internal:uri_utils",
"//tensorstore/internal/http",
"//tensorstore/util:future",
"//tensorstore/util:quote_string",
"//tensorstore/util:str_cat",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
],
)

cc_test(
name = "s3_endpoint_test",
srcs = ["s3_endpoint_test.cc"],
deps = [
":s3_endpoint",
"//tensorstore/internal/http",
"//tensorstore/util:future",
"//tensorstore/util:status_testutil",
"//tensorstore/util:str_cat",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
],
)

# NOTE: For the localstack_test, it seems simples to add a proxy py_binary like:
# py_binary(
# name = "localstack_cli",
Expand Down
85 changes: 49 additions & 36 deletions tensorstore/kvstore/s3/localstack_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
#include "absl/log/absl_log.h"
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/match.h"
#include "absl/strings/str_format.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <nlohmann/json.hpp>
#include "tensorstore/context.h"
#include "tensorstore/internal/env.h"
#include "tensorstore/internal/http/curl_transport.h"
Expand All @@ -52,10 +54,15 @@
ABSL_FLAG(std::string, localstack_endpoint, "", "Localstack endpoint");
ABSL_FLAG(std::string, localstack_binary, "", "Path to the localstack");

// --host_header can override the host: header used for signing.
// It can be, for example, s3.af-south-1.localstack.localhost.com
ABSL_FLAG(std::string, host_header, "", "Host header to use for signing");

namespace kvstore = ::tensorstore::kvstore;

using ::tensorstore::Context;
using ::tensorstore::MatchesJson;
using ::tensorstore::internal::GetEnv;
using ::tensorstore::internal::GetEnvironmentMap;
using ::tensorstore::internal::SetEnv;
using ::tensorstore::internal::SpawnSubprocess;
Expand Down Expand Up @@ -139,20 +146,33 @@ class LocalStackFixture : public ::testing::Test {
static LocalStackProcess process;

static void SetUpTestSuite() {
SetEnv("AWS_ACCESS_KEY_ID", kAwsAccessKeyId);
SetEnv("AWS_SECRET_KEY_ID", kAwsSecretKeyId);
if (!GetEnv("AWS_ACCESS_KEY_ID") || !GetEnv("AWS_SECRET_KEY_ID")) {
SetEnv("AWS_ACCESS_KEY_ID", kAwsAccessKeyId);
SetEnv("AWS_SECRET_KEY_ID", kAwsSecretKeyId);
}

if (absl::GetFlag(FLAGS_localstack_endpoint).empty()) {
ABSL_CHECK(!absl::GetFlag(FLAGS_localstack_binary).empty());

process.SpawnProcess();
} else {
// Don't connect to Amazon; the test uses fixed buckets, etc.
ABSL_CHECK(!absl::StrContains(absl::GetFlag(FLAGS_localstack_endpoint),
"amazonaws.com"));
}

MaybeCreateBucket();
}

static void TearDownTestSuite() { process.StopProcess(); }

static std::string endpoint_url() {
if (absl::GetFlag(FLAGS_localstack_endpoint).empty()) {
return process.endpoint_url();
}
return absl::GetFlag(FLAGS_localstack_endpoint);
}

// Attempts to create the kBucket bucket on the localstack host.
static void MaybeCreateBucket() {
auto value = absl::Cord{absl::StrFormat(
Expand All @@ -162,11 +182,17 @@ class LocalStackFixture : public ::testing::Test {
R"(</CreateBucketConfiguration>)",
kAwsRegion)};

auto request =
S3RequestBuilder("PUT", endpoint_url())
.BuildRequest(absl::StrFormat("%s.s3.amazonaws.com", kBucket),
AwsCredentials{}, kAwsRegion, kEmptySha256,
absl::Now());
// localstack or other test service should accept s3.<region>.amazonaws.com
// as a signing string.
std::string my_host_header = absl::GetFlag(FLAGS_host_header);
if (my_host_header.empty()) {
my_host_header = absl::StrFormat("s3.%s.amazonaws.com", kAwsRegion);
}

auto request = S3RequestBuilder(
"PUT", absl::StrFormat("%s/%s", endpoint_url(), kBucket))
.BuildRequest(my_host_header, AwsCredentials{},
kAwsRegion, kEmptySha256, absl::Now());

::tensorstore::Future<HttpResponse> response;
for (auto deadline = absl::Now() + absl::Seconds(5);;) {
Expand All @@ -189,18 +215,6 @@ class LocalStackFixture : public ::testing::Test {
<< response.value();
}
}

static std::string endpoint_url() {
if (absl::GetFlag(FLAGS_localstack_endpoint).empty()) {
return process.endpoint_url();
}
return absl::GetFlag(FLAGS_localstack_endpoint);
}

static std::string host() {
return absl::StrFormat("%s.s3.%s.localstack.localhost.com", kBucket,
kAwsRegion);
}
};

LocalStackProcess LocalStackFixture::process;
Expand All @@ -216,25 +230,24 @@ Context DefaultTestContext() {

TEST_F(LocalStackFixture, Basic) {
auto context = DefaultTestContext();
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, kvstore::Open({{"aws_region", kAwsRegion},
{"driver", "s3"},
{"bucket", kBucket},
{"endpoint", endpoint_url()},
{"host", host()},
{"path", "tensorstore/test/"}},
context)
.result());
::nlohmann::json json_spec{
{"aws_region", kAwsRegion}, //
{"driver", "s3"}, //
{"bucket", kBucket}, //
{"endpoint", endpoint_url()}, //
{"path", "tensorstore/test/"}, //
};

if (!absl::GetFlag(FLAGS_host_header).empty()) {
json_spec["host_header"] = absl::GetFlag(FLAGS_host_header);
}

TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto store,
kvstore::Open(json_spec, context).result());

TENSORSTORE_ASSERT_OK_AND_ASSIGN(auto spec, store.spec());
EXPECT_THAT(
spec.ToJson(tensorstore::IncludeDefaults{false}),
::testing::Optional(MatchesJson({{"aws_region", kAwsRegion},
{"driver", "s3"},
{"bucket", kBucket},
{"endpoint", endpoint_url()},
{"host", host()},
{"path", "tensorstore/test/"}})));
EXPECT_THAT(spec.ToJson(tensorstore::IncludeDefaults{false}),
::testing::Optional(MatchesJson(json_spec)));

tensorstore::internal::TestKeyValueReadWriteOps(store);
}
Expand Down
Loading

0 comments on commit 1b7417d

Please sign in to comment.