Skip to content

Commit

Permalink
chore: blob write session cleanup (#2280)
Browse files Browse the repository at this point in the history
1. Add TransportCompatibility annotations to factory methods in BlobWriteSessionConfigs
2. Add BlobWriteSessionConfig.GrpcCompatible marker interface for validation in GrpcStorageOptions
3. Add BlobWriteSessionConfig.GrpcCompatible to all existing BlobWriteSessionConfigs
4. Cleanup todo in DefaultBlobWriteSessionConfig that was reaching down into the GrpcBlobWriteChannel. Now, the WritableByteChannelSession will be constructed directly.
5. Add @CrossRun annotations to BlobWriteSession integration tests
  • Loading branch information
BenWhitehead authored Nov 14, 2023
1 parent d3b7bb3 commit c069aca
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package com.google.cloud.storage;

import com.google.api.core.InternalApi;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.storage.v2.WriteObjectResponse;
import java.io.IOException;
import java.io.Serializable;
import java.time.Clock;
Expand Down Expand Up @@ -52,9 +50,15 @@ public abstract class BlobWriteSessionConfig implements Serializable {
interface WriterFactory {
@InternalApi
WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s,
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Decoder<WriteObjectResponse, BlobInfo> d);
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts);
}

/**
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC}
*
* <p>We could evaluate the annotations, but the code for that is more complicated and probably
* not worth the effort.
*/
interface GrpcCompatible {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
Expand Down Expand Up @@ -240,6 +241,7 @@ private BlobWriteSessionConfigs() {}
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
public static DefaultBlobWriteSessionConfig getDefault() {
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
}
Expand All @@ -255,6 +257,7 @@ public static DefaultBlobWriteSessionConfig getDefault() {
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
return bufferToDiskThenUpload(
Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage"));
Expand All @@ -271,6 +274,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
return bufferToDiskThenUpload(ImmutableList.of(path));
}
Expand All @@ -289,6 +293,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
throws IOException {
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
Expand All @@ -306,6 +311,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> pat
* @since 2.27.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility(Transport.GRPC)
public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths) {
return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false);
}
Expand All @@ -321,6 +327,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() {
return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.WriteObjectResponse;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand Down Expand Up @@ -60,7 +59,9 @@
*/
@Immutable
@BetaApi
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig {
@TransportCompatibility({Transport.GRPC})
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = 9059242302276891867L;

/**
Expand Down Expand Up @@ -151,10 +152,7 @@ private Factory(RecoveryFileManager recoveryFileManager, Clock clock, Throughput
@InternalApi
@Override
public WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal storage,
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Decoder<WriteObjectResponse, BlobInfo> d) {
StorageInternal storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
return new Factory.WriteToFileThenUpload(
storage, info, opts, recoveryFileManager.newRecoveryFile(info));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.nio.channels.WritableByteChannel;
import java.time.Clock;
Expand All @@ -52,7 +55,9 @@
*/
@Immutable
@BetaApi
public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig {
@TransportCompatibility({Transport.GRPC})
public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = -6873740918589930633L;

private final int chunkSize;
Expand Down Expand Up @@ -102,6 +107,9 @@ WriterFactory createFactory(Clock clock) {

@InternalApi
private static final class Factory implements WriterFactory {
private static final Decoder<WriteObjectResponse, BlobInfo>
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);

private final int chunkSize;

Expand All @@ -112,21 +120,36 @@ private Factory(int chunkSize) {
@InternalApi
@Override
public WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s,
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Decoder<WriteObjectResponse, BlobInfo> d) {
// todo: invert this
// make GrpcBlobWriteChannel use this factory to produce its WriteSession
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts) {
if (s instanceof GrpcStorageImpl) {
GrpcStorageImpl g = (GrpcStorageImpl) s;
GrpcBlobWriteChannel writer = g.internalWriter(info, opts);
writer.setChunkSize(chunkSize);
WritableByteChannelSession<BufferedWritableByteChannel, WriteObjectResponse> session =
writer.newLazyWriteChannel().getSession();
return new DecoratedWritableByteChannelSession<>(session, d);
return new DecoratedWritableByteChannelSession<>(
new LazySession<>(
new LazyWriteChannel<>(
() -> {
GrpcStorageImpl grpc = (GrpcStorageImpl) s;
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = grpc.getWriteObjectRequest(info, opts);

ApiFuture<ResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
return ResumableMedia.gapic()
.write()
.byteChannel(grpc.storageClient.writeObjectCallable())
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.copy())
.resumable()
.withRetryConfig(
grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
.buffered(BufferHandle.allocate(chunkSize))
.setStartAsync(startResumableWrite)
.build();
})),
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
} else {
throw new IllegalStateException(
"Unknown Storage implementation: " + s.getClass().getName());
}
return CrossTransportUtils.throwGrpcOnly(DefaultBlobWriteSessionConfig.class, "");
}
}

Expand Down Expand Up @@ -162,4 +185,23 @@ public ApiFuture<BlobInfo> getResult() {
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
}
}

private static final class LazySession<R>
implements WritableByteChannelSession<BufferedWritableByteChannel, R> {
private final LazyWriteChannel<R> lazy;

private LazySession(LazyWriteChannel<R> lazy) {
this.lazy = lazy;
}

@Override
public ApiFuture<BufferedWritableByteChannel> openAsync() {
return lazy.getSession().openAsync();
}

@Override
public ApiFuture<R> getResult() {
return lazy.getSession().getResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
final GrpcConversions codecs;
final GrpcRetryAlgorithmManager retryAlgorithmManager;
final SyntaxDecoders syntaxDecoders;
private final Decoder<WriteObjectResponse, BlobInfo> writeObjectResponseBlobInfoDecoder;

// workaround for /~https://github.com/googleapis/java-storage/issues/1736
private final Opts<UserProject> defaultOpts;
Expand All @@ -194,8 +193,6 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
this.codecs = Conversions.grpc();
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
this.syntaxDecoders = new SyntaxDecoders();
this.writeObjectResponseBlobInfoDecoder =
codecs.blobInfo().compose(WriteObjectResponse::getResource);
this.defaultProjectId = UnifiedOpts.projectId(options.getProjectId());
}

Expand Down Expand Up @@ -719,14 +716,9 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.
@Override
public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
return internalWriter(blobInfo, opts);
}

@Override
public GrpcBlobWriteChannel internalWriter(BlobInfo info, Opts<ObjectTargetOpt> opts) {
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(info, opts);
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable(),
Expand Down Expand Up @@ -1534,7 +1526,7 @@ public boolean deleteNotification(String bucket, String notificationId) {
public BlobWriteSession blobWriteSession(BlobInfo info, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(info);
WritableByteChannelSession<?, BlobInfo> writableByteChannelSession =
writerFactory.writeSession(this, info, opts, writeObjectResponseBlobInfoDecoder);
writerFactory.writeSession(this, info, opts);
return BlobWriteSessions.of(writableByteChannelSession);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiClock;
Expand Down Expand Up @@ -533,6 +534,9 @@ public GrpcStorageOptions.Builder setGrpcInterceptorProvider(
public GrpcStorageOptions.Builder setBlobWriteSessionConfig(
@NonNull BlobWriteSessionConfig blobWriteSessionConfig) {
requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null");
checkArgument(
blobWriteSessionConfig instanceof BlobWriteSessionConfig.GrpcCompatible,
"The provided instance of BlobWriteSessionConfig is not compatible with gRPC transport.");
this.blobWriteSessionConfig = blobWriteSessionConfig;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
@Immutable
@BetaApi
@TransportCompatibility(Transport.GRPC)
public final class JournalingBlobWriteSessionConfig extends BlobWriteSessionConfig {
public final class JournalingBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = 9059242302276891867L;

/**
Expand Down Expand Up @@ -159,10 +160,7 @@ private Factory(RecoveryFileManager recoveryFileManager, Clock clock, Throughput
@InternalApi
@Override
public WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal storage,
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Decoder<WriteObjectResponse, BlobInfo> d) {
StorageInternal storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
if (storage instanceof GrpcStorageImpl) {
GrpcStorageImpl grpcStorage = (GrpcStorageImpl) storage;
RecoveryFile recoveryFile = recoveryFileManager.newRecoveryFile(info);
Expand All @@ -189,7 +187,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
.setStartAsync(start)
.build();

return new JournalingUpload<>(session, start, d);
return new JournalingUpload<>(session, start);
} else {
return CrossTransportUtils.throwHttpJsonOnly(BlobWriteSessionConfigs.class, "journaling");
}
Expand All @@ -204,11 +202,10 @@ private final class JournalingUpload<WBC extends WritableByteChannel>

public JournalingUpload(
WritableByteChannelSession<WBC, WriteObjectResponse> session,
ApiFuture<WriteCtx<ResumableWrite>> start,
Decoder<WriteObjectResponse, BlobInfo> decoder) {
ApiFuture<WriteCtx<ResumableWrite>> start) {
this.session = session;
this.start = start;
this.decoder = decoder;
this.decoder = Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.storage.v2.WriteObjectResponse;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
Expand Down Expand Up @@ -116,7 +115,9 @@
*/
@Immutable
@BetaApi
public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig {
@TransportCompatibility({Transport.GRPC})
public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {

private static final int MAX_PARTS_PER_COMPOSE = 32;
private final int maxPartsPerCompose;
Expand Down Expand Up @@ -669,10 +670,7 @@ private ParallelCompositeUploadWriterFactory(

@Override
public WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s,
BlobInfo info,
Opts<ObjectTargetOpt> opts,
Decoder<WriteObjectResponse, BlobInfo> d) {
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts) {
return new PCUSession(s, info, opts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetO
throw new UnsupportedOperationException("not implemented");
}

default StorageWriteChannel internalWriter(BlobInfo info, Opts<ObjectTargetOpt> opts) {
throw new UnsupportedOperationException("not implemented");
}

default BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
throw new UnsupportedOperationException("not implemented");
}
Expand Down
Loading

0 comments on commit c069aca

Please sign in to comment.