Skip to content

Commit

Permalink
Use SharedExecutorService in more locations (#42468)
Browse files Browse the repository at this point in the history
Use SharedExecutorService in more locations
  • Loading branch information
alzimmermsft authored Oct 23, 2024
1 parent c9b6501 commit 8c639ed
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.UserAgentUtil;
import com.azure.core.util.builder.ClientBuilderUtil;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -280,6 +281,8 @@ ConfidentialClientApplication getConfidentialClient(boolean enableCae) {

if (options.getExecutorService() != null) {
applicationBuilder.executorService(options.getExecutorService());
} else {
applicationBuilder.executorService(SharedExecutorService.getInstance());
}

TokenCachePersistenceOptions tokenCachePersistenceOptions = options.getTokenCacheOptions();
Expand Down Expand Up @@ -341,6 +344,8 @@ PublicClientApplication getPublicClient(boolean sharedTokenCacheCredential, bool

if (options.getExecutorService() != null) {
builder.executorService(options.getExecutorService());
} else {
builder.executorService(SharedExecutorService.getInstance());
}

if (enableCae) {
Expand Down Expand Up @@ -457,6 +462,8 @@ ConfidentialClientApplication getManagedIdentityConfidentialClient() {

if (options.getExecutorService() != null) {
applicationBuilder.executorService(options.getExecutorService());
} else {
applicationBuilder.executorService(SharedExecutorService.getInstance());
}

return applicationBuilder.build();
Expand Down Expand Up @@ -495,6 +502,8 @@ ManagedIdentityApplication getManagedIdentityMsalApplication() {

if (options.getExecutorService() != null) {
miBuilder.executorService(options.getExecutorService());
} else {
miBuilder.executorService(SharedExecutorService.getInstance());
}

return miBuilder.build();
Expand Down Expand Up @@ -537,6 +546,8 @@ ConfidentialClientApplication getWorkloadIdentityConfidentialClient() {

if (options.getExecutorService() != null) {
applicationBuilder.executorService(options.getExecutorService());
} else {
applicationBuilder.executorService(SharedExecutorService.getInstance());
}

return applicationBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.logging.ClientLogger;
import com.azure.monitor.ingestion.implementation.Batcher;
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesClient;
Expand All @@ -29,16 +30,13 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.azure.monitor.ingestion.implementation.Utils.GZIP;
import static com.azure.monitor.ingestion.implementation.Utils.createThreadPool;
import static com.azure.monitor.ingestion.implementation.Utils.getConcurrency;
import static com.azure.monitor.ingestion.implementation.Utils.gzipRequest;
import static com.azure.monitor.ingestion.implementation.Utils.registerShutdownHook;

/**
* <p>This class provides a synchronous client for uploading custom logs to an Azure Monitor Log Analytics workspace.
Expand Down Expand Up @@ -99,19 +97,13 @@ public final class LogsIngestionClient implements AutoCloseable {
private static final ClientLogger LOGGER = new ClientLogger(LogsIngestionClient.class);
private final IngestionUsingDataCollectionRulesClient client;

// dynamic thread pool that scales up and down on demand.
private final ExecutorService threadPool;
private final Thread shutdownHook;

/**
* Creates a {@link LogsIngestionClient} that sends requests to the data collection endpoint.
*
* @param client The {@link IngestionUsingDataCollectionRulesClient} that the client routes its request through.
*/
LogsIngestionClient(IngestionUsingDataCollectionRulesClient client) {
this.client = client;
this.threadPool = createThreadPool();
this.shutdownHook = registerShutdownHook(this.threadPool, 5);
}

/**
Expand Down Expand Up @@ -251,7 +243,7 @@ private Stream<UploadLogsResponseHolder> submit(Stream<UploadLogsResponseHolder>
}

try {
return threadPool.submit(() -> responseStream).get();
return SharedExecutorService.getInstance().submit(() -> responseStream).get();
} catch (InterruptedException | ExecutionException e) {
throw LOGGER.logExceptionAsError(new RuntimeException(e));
}
Expand Down Expand Up @@ -335,7 +327,5 @@ public Response<Void> uploadWithResponse(String ruleId, String streamName, Binar

@Override
public void close() {
threadPool.shutdown();
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,13 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

public final class Utils {
public static final long MAX_REQUEST_PAYLOAD_SIZE = 1024 * 1024; // 1 MB
public static final String GZIP = "gzip";

private static final ClientLogger LOGGER = new ClientLogger(Utils.class);
// similarly to Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, just puts a limit depending on logical processors count.
private static final int MAX_CONCURRENCY = 10 * Runtime.getRuntime().availableProcessors();

private Utils() {
}
Expand Down Expand Up @@ -50,41 +44,4 @@ public static int getConcurrency(LogsUploadOptions options) {

return 1;
}

/**
* Creates cached (that supports scaling) thread pool with shutdown hook to do best-effort graceful termination within timeout.
*
* @return {@link ExecutorService} instance.
*/
public static ExecutorService createThreadPool() {
return new ThreadPoolExecutor(0, MAX_CONCURRENCY, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
}

/**
* Registers {@link ExecutorService} shutdown hook which will be called when JVM terminates.
* First, stops accepting new tasks, then awaits their completion for
* half of timeout, cancels remaining tasks and waits another half of timeout for them to get cancelled.
*
* @param threadPool Thread pool to shut down.
* @param timeoutSec Timeout in seconds to wait for tasks to complete or terminate after JVM starting to shut down.
* @return hook thread instance that can be used to unregister hook.
*/
public static Thread registerShutdownHook(ExecutorService threadPool, int timeoutSec) {
// based on https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html
long halfTimeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSec) / 2;
Thread hook = new Thread(() -> {
try {
threadPool.shutdown();
if (!threadPool.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
threadPool.shutdownNow();
threadPool.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
threadPool.shutdownNow();
}
});
Runtime.getRuntime().addShutdownHook(hook);
return hook;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.monitor.ingestion.implementation;

import com.azure.core.util.SharedExecutorService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
Expand All @@ -13,8 +14,6 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -31,7 +30,6 @@
@Execution(ExecutionMode.SAME_THREAD)
public class ConcurrencyLimitingSpliteratorTest {
private static final int TEST_TIMEOUT_SEC = 30;
private static final ExecutorService TEST_THREAD_POOL = Executors.newCachedThreadPool();

@Test
public void invalidParams() {
Expand All @@ -53,7 +51,7 @@ public void concurrentCalls(int concurrency) throws ExecutionException, Interrup

int effectiveConcurrency = Math.min(list.size(), concurrency);
CountDownLatch latch = new CountDownLatch(effectiveConcurrency);
List<Integer> processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> {
List<Integer> processed = SharedExecutorService.getInstance().submit(() -> stream.map(r -> {
latch.countDown();
try {
Thread.sleep(10);
Expand All @@ -78,7 +76,7 @@ public void concurrencyHigherThanItemsCount() throws ExecutionException, Interru

AtomicInteger parallel = new AtomicInteger(0);
AtomicInteger maxParallel = new AtomicInteger(0);
List<Integer> processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> {
List<Integer> processed = SharedExecutorService.getInstance().submit(() -> stream.map(r -> {
int cur = parallel.incrementAndGet();
int curMax = maxParallel.get();
while (cur > curMax && !maxParallel.compareAndSet(curMax, cur)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
Expand All @@ -29,8 +30,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -54,7 +53,6 @@
*/
public final class SearchIndexingPublisher<T> {
private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
private static final ExecutorService EXECUTOR = getThreadPoolWithShutdownHook();

private final SearchIndexClientImpl restClient;
private final JsonSerializer serializer;
Expand Down Expand Up @@ -154,7 +152,8 @@ public void flush(boolean awaitLock, boolean isClose, Duration timeout, Context
private void flushLoop(boolean isClosed, Duration timeout, Context context) {
if (timeout != null && !timeout.isNegative() && !timeout.isZero()) {
final AtomicReference<List<TryTrackingIndexAction<T>>> batchActions = new AtomicReference<>();
Future<?> future = EXECUTOR.submit(() -> flushLoopHelper(isClosed, context, batchActions));
Future<?> future = SharedExecutorService.getInstance()
.submit(() -> flushLoopHelper(isClosed, context, batchActions));

try {
CoreUtils.getResultWithTimeout(future, timeout);
Expand Down Expand Up @@ -361,8 +360,4 @@ private static void sleep(long millis) {
} catch (InterruptedException ignored) {
}
}

private static ExecutorService getThreadPoolWithShutdownHook() {
return CoreUtils.addShutdownHookSafely(Executors.newCachedThreadPool(), Duration.ofSeconds(5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.SharedExecutorService;
import com.azure.core.util.UrlBuilder;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobClient;
Expand Down Expand Up @@ -49,8 +50,7 @@
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -130,8 +130,8 @@ public void downloadToFileWithFaultInjection() throws IOException, InterruptedEx
StandardOpenOption.TRUNCATE_EXISTING, // If the file already exists and it is opened for WRITE access, then its length is truncated to 0.
StandardOpenOption.READ, StandardOpenOption.WRITE));

ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
executorService.invokeAll(files.stream().map(it -> (Callable<Void>) () -> {
CountDownLatch countDownLatch = new CountDownLatch(500);
SharedExecutorService.getInstance().invokeAll(files.stream().map(it -> (Callable<Void>) () -> {
try {
downloadClient.downloadToFileWithResponse(new BlobDownloadToFileOptions(it.getAbsolutePath())
.setOpenOptions(overwriteOptions)
Expand All @@ -148,13 +148,14 @@ public void downloadToFileWithFaultInjection() throws IOException, InterruptedEx
LOGGER.atWarning()
.addKeyValue("downloadFile", it.getAbsolutePath())
.log("Failed to complete download.", ex);
} finally {
countDownLatch.countDown();
}

return null;
}).collect(Collectors.toList()));

executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
countDownLatch.await(10, TimeUnit.MINUTES);

assertTrue(successCount.get() >= 450);
// cleanup
Expand Down
Loading

0 comments on commit 8c639ed

Please sign in to comment.