diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java index a67d5a20e6..b52e9bd4f7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java @@ -17,11 +17,9 @@ package com.google.cloud.storage; import com.google.api.core.InternalApi; -import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; -import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.io.Serializable; import java.time.Clock; @@ -52,9 +50,15 @@ public abstract class BlobWriteSessionConfig implements Serializable { interface WriterFactory { @InternalApi WritableByteChannelSession writeSession( - StorageInternal s, - BlobInfo info, - Opts opts, - Decoder d); + StorageInternal s, BlobInfo info, Opts opts); } + + /** + * Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is + * compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC} + * + *

We could evaluate the annotations, but the code for that is more complicated and probably + * not worth the effort. + */ + interface GrpcCompatible {} } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index e3ca3390db..0f11cb19a4 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -21,6 +21,7 @@ import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy; import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -240,6 +241,7 @@ private BlobWriteSessionConfigs() {} * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility({Transport.GRPC}) public static DefaultBlobWriteSessionConfig getDefault() { return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB); } @@ -255,6 +257,7 @@ public static DefaultBlobWriteSessionConfig getDefault() { * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility({Transport.GRPC}) public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException { return bufferToDiskThenUpload( Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage")); @@ -271,6 +274,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility({Transport.GRPC}) public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException { return bufferToDiskThenUpload(ImmutableList.of(path)); } @@ -289,6 +293,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility({Transport.GRPC}) public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection paths) throws IOException { return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false); @@ -306,6 +311,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection pat * @since 2.27.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility(Transport.GRPC) public static JournalingBlobWriteSessionConfig journaling(Collection paths) { return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false); } @@ -321,6 +327,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection paths * @since 2.28.0 This new api is in preview and is subject to breaking changes. */ @BetaApi + @TransportCompatibility({Transport.GRPC}) public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() { return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java index 430f30bcd8..8c9cb7c107 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java @@ -21,15 +21,14 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; -import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory; import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; -import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -60,7 +59,9 @@ */ @Immutable @BetaApi -public final class BufferToDiskThenUpload extends BlobWriteSessionConfig { +@TransportCompatibility({Transport.GRPC}) +public final class BufferToDiskThenUpload extends BlobWriteSessionConfig + implements BlobWriteSessionConfig.GrpcCompatible { private static final long serialVersionUID = 9059242302276891867L; /** @@ -151,10 +152,7 @@ private Factory(RecoveryFileManager recoveryFileManager, Clock clock, Throughput @InternalApi @Override public WritableByteChannelSession writeSession( - StorageInternal storage, - BlobInfo info, - Opts opts, - Decoder d) { + StorageInternal storage, BlobInfo info, Opts opts) { return new Factory.WriteToFileThenUpload( storage, info, opts, recoveryFileManager.newRecoveryFile(info)); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 79e3759eaa..090f929e7e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -20,12 +20,15 @@ import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; import java.nio.channels.WritableByteChannel; import java.time.Clock; @@ -52,7 +55,9 @@ */ @Immutable @BetaApi -public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig { +@TransportCompatibility({Transport.GRPC}) +public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig + implements BlobWriteSessionConfig.GrpcCompatible { private static final long serialVersionUID = -6873740918589930633L; private final int chunkSize; @@ -102,6 +107,9 @@ WriterFactory createFactory(Clock clock) { @InternalApi private static final class Factory implements WriterFactory { + private static final Decoder + WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER = + Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); private final int chunkSize; @@ -112,21 +120,36 @@ private Factory(int chunkSize) { @InternalApi @Override public WritableByteChannelSession writeSession( - StorageInternal s, - BlobInfo info, - Opts opts, - Decoder d) { - // todo: invert this - // make GrpcBlobWriteChannel use this factory to produce its WriteSession + StorageInternal s, BlobInfo info, Opts opts) { if (s instanceof GrpcStorageImpl) { - GrpcStorageImpl g = (GrpcStorageImpl) s; - GrpcBlobWriteChannel writer = g.internalWriter(info, opts); - writer.setChunkSize(chunkSize); - WritableByteChannelSession session = - writer.newLazyWriteChannel().getSession(); - return new DecoratedWritableByteChannelSession<>(session, d); + return new DecoratedWritableByteChannelSession<>( + new LazySession<>( + new LazyWriteChannel<>( + () -> { + GrpcStorageImpl grpc = (GrpcStorageImpl) s; + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = grpc.getWriteObjectRequest(info, opts); + + ApiFuture startResumableWrite = + grpc.startResumableWrite(grpcCallContext, req); + return ResumableMedia.gapic() + .write() + .byteChannel(grpc.storageClient.writeObjectCallable()) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.copy()) + .resumable() + .withRetryConfig( + grpc.getOptions(), grpc.retryAlgorithmManager.idempotent()) + .buffered(BufferHandle.allocate(chunkSize)) + .setStartAsync(startResumableWrite) + .build(); + })), + WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER); + } else { + throw new IllegalStateException( + "Unknown Storage implementation: " + s.getClass().getName()); } - return CrossTransportUtils.throwGrpcOnly(DefaultBlobWriteSessionConfig.class, ""); } } @@ -162,4 +185,23 @@ public ApiFuture getResult() { delegate.getResult(), decoder::decode, MoreExecutors.directExecutor()); } } + + private static final class LazySession + implements WritableByteChannelSession { + private final LazyWriteChannel lazy; + + private LazySession(LazyWriteChannel lazy) { + this.lazy = lazy; + } + + @Override + public ApiFuture openAsync() { + return lazy.getSession().openAsync(); + } + + @Override + public ApiFuture getResult() { + return lazy.getSession().getResult(); + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 16963e4d16..d4c80596ba 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -176,7 +176,6 @@ final class GrpcStorageImpl extends BaseService final GrpcConversions codecs; final GrpcRetryAlgorithmManager retryAlgorithmManager; final SyntaxDecoders syntaxDecoders; - private final Decoder writeObjectResponseBlobInfoDecoder; // workaround for /~https://github.com/googleapis/java-storage/issues/1736 private final Opts defaultOpts; @@ -194,8 +193,6 @@ final class GrpcStorageImpl extends BaseService this.codecs = Conversions.grpc(); this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.syntaxDecoders = new SyntaxDecoders(); - this.writeObjectResponseBlobInfoDecoder = - codecs.blobInfo().compose(WriteObjectResponse::getResource); this.defaultProjectId = UnifiedOpts.projectId(options.getProjectId()); } @@ -719,14 +716,9 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. @Override public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - return internalWriter(blobInfo, opts); - } - - @Override - public GrpcBlobWriteChannel internalWriter(BlobInfo info, Opts opts) { GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(info, opts); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); Hasher hasher = Hasher.noop(); return new GrpcBlobWriteChannel( storageClient.writeObjectCallable(), @@ -1534,7 +1526,7 @@ public boolean deleteNotification(String bucket, String notificationId) { public BlobWriteSession blobWriteSession(BlobInfo info, BlobWriteOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(info); WritableByteChannelSession writableByteChannelSession = - writerFactory.writeSession(this, info, opts, writeObjectResponseBlobInfoDecoder); + writerFactory.writeSession(this, info, opts); return BlobWriteSessions.of(writableByteChannelSession); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index bee342f994..7f41bc9691 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -16,6 +16,7 @@ package com.google.cloud.storage; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import com.google.api.core.ApiClock; @@ -533,6 +534,9 @@ public GrpcStorageOptions.Builder setGrpcInterceptorProvider( public GrpcStorageOptions.Builder setBlobWriteSessionConfig( @NonNull BlobWriteSessionConfig blobWriteSessionConfig) { requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null"); + checkArgument( + blobWriteSessionConfig instanceof BlobWriteSessionConfig.GrpcCompatible, + "The provided instance of BlobWriteSessionConfig is not compatible with gRPC transport."); this.blobWriteSessionConfig = blobWriteSessionConfig; return this; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java index c8cc0e3dab..7978dd2b8b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java @@ -68,7 +68,8 @@ @Immutable @BetaApi @TransportCompatibility(Transport.GRPC) -public final class JournalingBlobWriteSessionConfig extends BlobWriteSessionConfig { +public final class JournalingBlobWriteSessionConfig extends BlobWriteSessionConfig + implements BlobWriteSessionConfig.GrpcCompatible { private static final long serialVersionUID = 9059242302276891867L; /** @@ -159,10 +160,7 @@ private Factory(RecoveryFileManager recoveryFileManager, Clock clock, Throughput @InternalApi @Override public WritableByteChannelSession writeSession( - StorageInternal storage, - BlobInfo info, - Opts opts, - Decoder d) { + StorageInternal storage, BlobInfo info, Opts opts) { if (storage instanceof GrpcStorageImpl) { GrpcStorageImpl grpcStorage = (GrpcStorageImpl) storage; RecoveryFile recoveryFile = recoveryFileManager.newRecoveryFile(info); @@ -189,7 +187,7 @@ public WritableByteChannelSession writeSession( .setStartAsync(start) .build(); - return new JournalingUpload<>(session, start, d); + return new JournalingUpload<>(session, start); } else { return CrossTransportUtils.throwHttpJsonOnly(BlobWriteSessionConfigs.class, "journaling"); } @@ -204,11 +202,10 @@ private final class JournalingUpload public JournalingUpload( WritableByteChannelSession session, - ApiFuture> start, - Decoder decoder) { + ApiFuture> start) { this.session = session; this.start = start; - this.decoder = decoder; + this.decoder = Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java index 7b9c78f58d..d024066908 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java @@ -26,9 +26,9 @@ import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; -import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.MetadataField.PartRange; import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.common.annotations.VisibleForTesting; @@ -36,7 +36,6 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; @@ -116,7 +115,9 @@ */ @Immutable @BetaApi -public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig { +@TransportCompatibility({Transport.GRPC}) +public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig + implements BlobWriteSessionConfig.GrpcCompatible { private static final int MAX_PARTS_PER_COMPOSE = 32; private final int maxPartsPerCompose; @@ -669,10 +670,7 @@ private ParallelCompositeUploadWriterFactory( @Override public WritableByteChannelSession writeSession( - StorageInternal s, - BlobInfo info, - Opts opts, - Decoder d) { + StorageInternal s, BlobInfo info, Opts opts) { return new PCUSession(s, info, opts); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java index 3f632aab15..0d700c46df 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -31,10 +31,6 @@ default BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts) { - throw new UnsupportedOperationException("not implemented"); - } - default BlobInfo internalDirectUpload(BlobInfo info, Opts opts, ByteBuffer buf) { throw new UnsupportedOperationException("not implemented"); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java index 60272fec44..1c9635227a 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java @@ -20,10 +20,8 @@ import static com.google.common.truth.Truth.assertThat; import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory; -import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; -import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -39,9 +37,6 @@ public final class BufferToDiskThenUploadTest { - private static final Decoder - WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER = - Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public final TestName testName = new TestName(); @@ -68,8 +63,7 @@ public BlobInfo internalCreateFrom( } }, blobInfo, - Opts.empty(), - WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER); + Opts.empty()); byte[] bytes = DataGenerator.base64Characters().genBytes(128); try (WritableByteChannel open = writeSession.open()) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java index 1113d0231a..bf8132b318 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -17,6 +17,7 @@ package com.google.cloud.storage.it; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static org.junit.Assert.assertThrows; import com.google.cloud.storage.BlobInfo; @@ -28,12 +29,12 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; -import com.google.cloud.storage.it.runner.annotations.SingleBackend; -import com.google.cloud.storage.it.runner.annotations.StorageFixture; import com.google.cloud.storage.it.runner.registry.Generator; import java.io.IOException; import java.nio.ByteBuffer; @@ -41,16 +42,21 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @RunWith(StorageITRunner.class) -@SingleBackend(Backend.PROD) +@CrossRun( + transports = {Transport.GRPC}, + backends = {Backend.PROD}) public final class ITBlobWriteSessionTest { - @Inject - @StorageFixture(Transport.GRPC) - public Storage storage; + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Inject public Storage storage; + @Inject public Transport transport; @Inject public BucketInfo bucket; @@ -63,11 +69,16 @@ public void allDefaults() throws Exception { @Test public void bufferToTempDirThenUpload() throws Exception { - GrpcStorageOptions options = - ((GrpcStorageOptions) storage.getOptions()) - .toBuilder() - .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload()) - .build(); + StorageOptions options = null; + if (transport == Transport.GRPC) { + options = + ((GrpcStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload()) + .build(); + } + assertWithMessage("unable to resolve options").that(options).isNotNull(); + //noinspection DataFlowIssue try (Storage s = options.getService()) { doTest(s); } @@ -75,12 +86,17 @@ public void bufferToTempDirThenUpload() throws Exception { @Test public void overrideDefaultBufferSize() throws Exception { - GrpcStorageOptions options = - ((GrpcStorageOptions) storage.getOptions()) - .toBuilder() - .setBlobWriteSessionConfig( - BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024)) - .build(); + StorageOptions options = null; + if (transport == Transport.GRPC) { + options = + ((GrpcStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig( + BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024)) + .build(); + } + assertWithMessage("unable to resolve options").that(options).isNotNull(); + //noinspection DataFlowIssue try (Storage s = options.getService()) { doTest(s); } @@ -95,7 +111,6 @@ public void closingAnOpenedSessionWithoutCallingWriteShouldMakeAnEmptyObject() WritableByteChannel open = session.open(); open.close(); BlobInfo gen1 = session.getResult().get(1, TimeUnit.SECONDS); - System.out.println("gen1 = " + gen1); assertThat(gen1.getSize()).isEqualTo(0); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java index 6d3157c024..012a700f6e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static org.junit.Assert.assertThrows; import com.google.api.gax.rpc.ApiExceptions; @@ -29,6 +30,7 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.DataGenerator; import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy; @@ -38,14 +40,14 @@ import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.BucketFixture; import com.google.cloud.storage.it.runner.annotations.BucketType; +import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; -import com.google.cloud.storage.it.runner.annotations.SingleBackend; -import com.google.cloud.storage.it.runner.annotations.StorageFixture; import com.google.cloud.storage.it.runner.registry.Generator; import com.google.cloud.storage.it.runner.registry.KmsFixture; import com.google.common.collect.ImmutableList; @@ -67,7 +69,9 @@ import org.junit.runner.RunWith; @RunWith(StorageITRunner.class) -@SingleBackend(Backend.PROD) +@CrossRun( + transports = {Transport.GRPC}, + backends = {Backend.PROD}) public final class ITParallelCompositeUploadBlobWriteSessionConfigTest { private static final int _1MiB = 1024 * 1024; @@ -79,9 +83,9 @@ public final class ITParallelCompositeUploadBlobWriteSessionConfigTest { @BucketFixture(BucketType.REQUESTER_PAYS) public BucketInfo rpBucket; - @Inject - @StorageFixture(Transport.GRPC) - public Storage injectedStorage; + @Inject public Storage injectedStorage; + + @Inject public Transport transport; @Inject public Generator generator; @Inject public KmsFixture kmsFixture; @@ -99,18 +103,25 @@ public static void beforeClass() { @Before public void setUp() throws Exception { - GrpcStorageOptions storageOptions = - ((GrpcStorageOptions) injectedStorage.getOptions()) - .toBuilder() - .setBlobWriteSessionConfig( - BlobWriteSessionConfigs.parallelCompositeUpload() - .withExecutorSupplier(ExecutorSupplier.useExecutor(exec)) - // define a max part size that is fairly small to aid in test speed - .withBufferAllocationStrategy(BufferAllocationStrategy.simple(_1MiB)) - .withPartNamingStrategy(PartNamingStrategy.prefix("prefix-a")) - // let our fixtures take care of cleaning things up - .withPartCleanupStrategy(PartCleanupStrategy.never())) - .build(); + ParallelCompositeUploadBlobWriteSessionConfig pcu = + BlobWriteSessionConfigs.parallelCompositeUpload() + .withExecutorSupplier(ExecutorSupplier.useExecutor(exec)) + // define a max part size that is fairly small to aid in test speed + .withBufferAllocationStrategy(BufferAllocationStrategy.simple(_1MiB)) + .withPartNamingStrategy(PartNamingStrategy.prefix("prefix-a")) + // let our fixtures take care of cleaning things up if an upload fails + .withPartCleanupStrategy(PartCleanupStrategy.onlyOnSuccess()); + + StorageOptions storageOptions = null; + if (transport == Transport.GRPC) { + storageOptions = + ((GrpcStorageOptions) injectedStorage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig(pcu) + .build(); + } + assertWithMessage("unable to resolve options").that(storageOptions).isNotNull(); + //noinspection DataFlowIssue storage = storageOptions.getService(); rand = new Random(); } @@ -144,7 +155,6 @@ public void errorRaisedByMethodAndFutureResult() throws IOException { // it is okay if the exception is raised during write itself or close, if it happens during // close we should get an AsynchronousCloseException } catch (AsynchronousCloseException ace) { - assertThat(ace).hasCauseThat().hasMessageThat().contains("NOT_FOUND"); assertThat(((StorageException) ace.getCause()).getCode()).isEqualTo(404); } catch (StorageException se) { assertThat(se.getCode()).isEqualTo(404);