Skip to content

Commit

Permalink
[cdc-base] Release the scan fetcher thread resource after reading fin…
Browse files Browse the repository at this point in the history
…ished (#1619)
  • Loading branch information
ruanhang1993 authored Oct 18, 2022
1 parent 806a850 commit f44d209
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.formatMessageTimestamp;
Expand All @@ -64,7 +65,7 @@ public class JdbcSourceScanFetcher implements Fetcher<SourceRecords, SourceSplit
public AtomicBoolean reachEnd;

private final JdbcSourceFetchTaskContext taskContext;
private final ExecutorService executor;
private final ExecutorService executorService;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Throwable readException;

Expand All @@ -73,13 +74,15 @@ public class JdbcSourceScanFetcher implements Fetcher<SourceRecords, SourceSplit
private SnapshotSplit currentSnapshotSplit;
private SchemaNameAdjuster nameAdjuster;

private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;

public JdbcSourceScanFetcher(JdbcSourceFetchTaskContext taskContext, int subtaskId) {
this.taskContext = taskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("debezium-snapshot-reader-" + subtaskId)
.build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.hasNextElement = new AtomicBoolean(false);
this.reachEnd = new AtomicBoolean(false);
}
Expand All @@ -93,7 +96,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
this.queue = taskContext.getQueue();
this.hasNextElement.set(true);
this.reachEnd.set(false);
executor.submit(
executorService.submit(
() -> {
try {
snapshotSplitReadTask.execute(taskContext);
Expand Down Expand Up @@ -190,7 +193,21 @@ private void checkReadException() {
}

@Override
public void close() {}
public void close() {
try {
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the scan fetcher in {} seconds.",
READER_CLOSE_TIMEOUT_SECONDS);
}
}
} catch (Exception e) {
LOG.error("Close scan fetcher error", e);
}
}

private void assertLowWatermark(SourceRecord lowWatermark) {
checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
Expand All @@ -57,7 +58,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSpl
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceStreamFetcher.class);

private final JdbcSourceFetchTaskContext taskContext;
private final ExecutorService executor;
private final ExecutorService executorService;
private final Set<TableId> pureBinlogPhaseTables;

private volatile ChangeEventQueue<DataChangeEvent> queue;
Expand All @@ -70,11 +71,13 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSpl
private Map<TableId, Offset> maxSplitHighWatermarkMap;
private Tables.TableFilter capturedTableFilter;

private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;

public JdbcSourceStreamFetcher(JdbcSourceFetchTaskContext taskContext, int subTaskId) {
this.taskContext = taskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.pureBinlogPhaseTables = new HashSet<>();
}

Expand All @@ -85,7 +88,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
executor.submit(
executorService.submit(
() -> {
try {
streamFetchTask.execute(taskContext);
Expand Down Expand Up @@ -134,7 +137,21 @@ private void checkReadException() {
}

@Override
public void close() {}
public void close() {
try {
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the stream fetcher in {} seconds.",
READER_CLOSE_TIMEOUT_SECONDS);
}
}
} catch (Exception e) {
LOG.error("Close stream fetcher error", e);
}
}

/**
* Returns the record should emit or not.
Expand Down

0 comments on commit f44d209

Please sign in to comment.