Skip to content

Commit

Permalink
Auth Manager API part 5: SigV4 Auth Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra committed Jan 17, 2025
1 parent 4d0f40c commit 372fd66
Show file tree
Hide file tree
Showing 6 changed files with 743 additions and 16 deletions.
83 changes: 83 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.util.Map;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthSession;
import software.amazon.awssdk.auth.signer.Aws4Signer;

/**
* An AuthManager that authenticates requests with SigV4.
*
* <p>It takes a delegate AuthManager to handle double authentication cases, e.g. on top of OAuth2.
*/
@SuppressWarnings("unused") // loaded by reflection
public class RESTSigV4AuthManager implements AuthManager {

private final Aws4Signer signer = Aws4Signer.create();
private final AuthManager delegate;

private Map<String, String> catalogProperties = Map.of();

public RESTSigV4AuthManager(String name, AuthManager delegate) {
this.delegate = delegate;
}

@Override
public RESTSigV4AuthSession initSession(RESTClient initClient, Map<String, String> properties) {
return new RESTSigV4AuthSession(
signer, delegate.initSession(initClient, properties), new AwsProperties(properties));
}

@Override
public RESTSigV4AuthSession catalogSession(
RESTClient sharedClient, Map<String, String> properties) {
catalogProperties = properties;
AwsProperties awsProperties = new AwsProperties(catalogProperties);
return new RESTSigV4AuthSession(
signer, delegate.catalogSession(sharedClient, catalogProperties), awsProperties);
}

@Override
public AuthSession contextualSession(SessionCatalog.SessionContext context, AuthSession parent) {
AwsProperties contextProperties =
new AwsProperties(RESTUtil.merge(catalogProperties, context.properties()));
return new RESTSigV4AuthSession(
signer, delegate.contextualSession(context, parent), contextProperties);
}

@Override
public AuthSession tableSession(
TableIdentifier table, Map<String, String> properties, AuthSession parent) {
AwsProperties tableProperties =
new AwsProperties(RESTUtil.merge(catalogProperties, properties));
return new RESTSigV4AuthSession(
signer, delegate.tableSession(table, properties, parent), tableProperties);
}

@Override
public void close() {
delegate.close();
}
}
154 changes: 154 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/RESTSigV4AuthSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.iceberg.rest.HTTPHeaders;
import org.apache.iceberg.rest.HTTPHeaders.HTTPHeader;
import org.apache.iceberg.rest.HTTPRequest;
import org.apache.iceberg.rest.ImmutableHTTPHeaders;
import org.apache.iceberg.rest.ImmutableHTTPRequest;
import org.apache.iceberg.rest.auth.AuthSession;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.internal.SignerConstant;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.auth.signer.params.SignerChecksumParams;
import software.amazon.awssdk.core.checksums.Algorithm;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.regions.Region;

/**
* An AuthSession that signs requests with SigV4.
*
* <p>The request is first authenticated by the delegate AuthSession, then signed with SigV4. In
* case of conflicting headers, the Authorization header set by delegate AuthSession will be
* relocated, then included in the canonical headers to sign.
*
* <p>See <a href="https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_sigv.html">Signing AWS
* API requests</a> for details about the SigV4 protocol.
*/
public class RESTSigV4AuthSession implements AuthSession {

static final String EMPTY_BODY_SHA256 =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
static final String RELOCATED_HEADER_PREFIX = "Original-";

private final Aws4Signer signer;
private final AuthSession delegate;
private final Region signingRegion;
private final String signingName;
private final AwsCredentialsProvider credentialsProvider;

public RESTSigV4AuthSession(
Aws4Signer aws4Signer, AuthSession delegateAuthSession, AwsProperties awsProperties) {
this.signer = aws4Signer;
this.delegate = delegateAuthSession;
this.signingRegion = awsProperties.restSigningRegion();
this.signingName = awsProperties.restSigningName();
this.credentialsProvider = awsProperties.restCredentialsProvider();
}

@Override
public HTTPRequest authenticate(HTTPRequest request) {
return sign(delegate.authenticate(request));
}

@Override
public void close() {
delegate.close();
}

private HTTPRequest sign(HTTPRequest request) {
Aws4SignerParams params =
Aws4SignerParams.builder()
.signingName(signingName)
.signingRegion(signingRegion)
.awsCredentials(credentialsProvider.resolveCredentials())
.checksumParams(
SignerChecksumParams.builder()
.algorithm(Algorithm.SHA256)
.isStreamingRequest(false)
.checksumHeaderName(SignerConstant.X_AMZ_CONTENT_SHA256)
.build())
.build();

SdkHttpFullRequest.Builder sdkRequestBuilder = SdkHttpFullRequest.builder();

URI uri = request.requestUri();
sdkRequestBuilder
.method(SdkHttpMethod.fromValue(request.method().name()))
.protocol(uri.getScheme())
.uri(uri)
.headers(convertHeaders(request.headers()));

String body = request.encodedBody();
if (body == null) {
// This is a workaround for the signer implementation incorrectly producing
// an invalid content checksum for empty body requests.
sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, EMPTY_BODY_SHA256);
} else {
sdkRequestBuilder.contentStreamProvider(
() -> IOUtils.toInputStream(body, StandardCharsets.UTF_8));
}

SdkHttpFullRequest signedSdkRequest = signer.sign(sdkRequestBuilder.build(), params);
HTTPHeaders newHeaders = updateRequestHeaders(request.headers(), signedSdkRequest.headers());
return ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build();
}

private Map<String, List<String>> convertHeaders(HTTPHeaders headers) {
return headers.entries().stream()
.collect(
Collectors.groupingBy(
// Relocate Authorization header as SigV4 takes precedence
header ->
header.name().equalsIgnoreCase("Authorization")
? RELOCATED_HEADER_PREFIX + header.name()
: header.name(),
Collectors.mapping(HTTPHeader::value, Collectors.toList())));
}

private HTTPHeaders updateRequestHeaders(
HTTPHeaders originalHeaders, Map<String, List<String>> signedHeaders) {
ImmutableHTTPHeaders.Builder newHeaders = ImmutableHTTPHeaders.builder();
signedHeaders.forEach(
(name, signedValues) -> {
if (originalHeaders.contains(name)) {
for (HTTPHeader originalHeader : originalHeaders.entries(name)) {
// Relocate headers if there is a conflict with signed headers
if (!signedValues.contains(originalHeader.value())) {
newHeaders.addEntry(
HTTPHeader.of(RELOCATED_HEADER_PREFIX + name, originalHeader.value()));
}
}
}

signedValues.forEach(value -> newHeaders.addEntry(HTTPHeader.of(name, value)));
});

return newHeaders.build();
}
}
159 changes: 159 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/TestRESTSigV4AuthManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.util.Map;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
import org.apache.iceberg.rest.auth.AuthProperties;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.NoopAuthManager;
import org.apache.iceberg.rest.auth.OAuth2Manager;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class TestRESTSigV4AuthManager {

@Test
void create() {
AuthManager manager =
AuthManagers.loadAuthManager(
"test", Map.of(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_SIGV4));
assertThat(manager)
.isInstanceOf(RESTSigV4AuthManager.class)
.extracting("delegate")
.isInstanceOf(OAuth2Manager.class);
}

@Test
void createLegacy() {
AuthManager manager =
AuthManagers.loadAuthManager("test", Map.of("rest.sigv4-enabled", "true"));
assertThat(manager)
.isInstanceOf(RESTSigV4AuthManager.class)
.extracting("delegate")
.isInstanceOf(OAuth2Manager.class);
}

@Test
void createCustomDelegate() {
AuthManager manager =
AuthManagers.loadAuthManager(
"test",
Map.of(
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_SIGV4,
AuthProperties.SIGV4_DELEGATE_AUTH_TYPE,
AuthProperties.AUTH_TYPE_NONE));
assertThat(manager)
.isInstanceOf(RESTSigV4AuthManager.class)
.extracting("delegate")
.isInstanceOf(NoopAuthManager.class);
}

@Test
@SuppressWarnings("resource")
void createInvalidCustomDelegate() {
assertThatThrownBy(
() ->
AuthManagers.loadAuthManager(
"test",
Map.of(
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_SIGV4,
AuthProperties.SIGV4_DELEGATE_AUTH_TYPE,
AuthProperties.AUTH_TYPE_SIGV4)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot delegate a SigV4 auth manager to another SigV4 auth manager");
}

@Test
void initSession() {
AuthManager delegate = Mockito.mock(AuthManager.class);
when(delegate.initSession(any(), any())).thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
RESTClient client = Mockito.mock(RESTClient.class);
AuthManager manager = new RESTSigV4AuthManager("test", delegate);
AuthSession authSession = manager.initSession(client, Map.of());
assertThat(authSession)
.isInstanceOf(RESTSigV4AuthSession.class)
.extracting("delegate")
.isInstanceOf(OAuth2Util.AuthSession.class);
}

@Test
void catalogSession() {
AuthManager delegate = Mockito.mock(AuthManager.class);
when(delegate.catalogSession(any(), any()))
.thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
RESTClient client = Mockito.mock(RESTClient.class);
AuthManager manager = new RESTSigV4AuthManager("test", delegate);
AuthSession authSession = manager.catalogSession(client, Map.of());
assertThat(authSession)
.isInstanceOf(RESTSigV4AuthSession.class)
.extracting("delegate")
.isInstanceOf(OAuth2Util.AuthSession.class);
}

@Test
void contextualSession() {
AuthManager delegate = Mockito.mock(AuthManager.class);
when(delegate.contextualSession(any(), any()))
.thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
AuthManager manager = new RESTSigV4AuthManager("test", delegate);
AuthSession authSession =
manager.contextualSession(
Mockito.mock(SessionCatalog.SessionContext.class), Mockito.mock(AuthSession.class));
assertThat(authSession)
.isInstanceOf(RESTSigV4AuthSession.class)
.extracting("delegate")
.isInstanceOf(OAuth2Util.AuthSession.class);
}

@Test
void tableSession() {
AuthManager delegate = Mockito.mock(AuthManager.class);
when(delegate.tableSession(any(), any(), any()))
.thenReturn(Mockito.mock(OAuth2Util.AuthSession.class));
AuthManager manager = new RESTSigV4AuthManager("test", delegate);
AuthSession authSession =
manager.tableSession(
Mockito.mock(TableIdentifier.class), Map.of(), Mockito.mock(AuthSession.class));
assertThat(authSession)
.isInstanceOf(RESTSigV4AuthSession.class)
.extracting("delegate")
.isInstanceOf(OAuth2Util.AuthSession.class);
}

@Test
void close() {
AuthManager delegate = Mockito.mock(AuthManager.class);
AuthManager manager = new RESTSigV4AuthManager("test", delegate);
manager.close();
Mockito.verify(delegate).close();
}
}
Loading

0 comments on commit 372fd66

Please sign in to comment.