Skip to content

Commit

Permalink
[cdc-base] Optimize pure binlog phase check logic to improve performa…
Browse files Browse the repository at this point in the history
…nce (#1620)

This closes #1620.
  • Loading branch information
ruanhang1993 authored Oct 18, 2022
1 parent 8df5b11 commit 806a850
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -56,6 +58,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSpl

private final JdbcSourceFetchTaskContext taskContext;
private final ExecutorService executor;
private final Set<TableId> pureBinlogPhaseTables;

private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Throwable readException;
Expand All @@ -72,6 +75,7 @@ public JdbcSourceStreamFetcher(JdbcSourceFetchTaskContext taskContext, int subTa
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.pureBinlogPhaseTables = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -178,9 +182,13 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
}

private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}

Expand Down Expand Up @@ -217,5 +225,6 @@ private void configureFilter() {
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
this.pureBinlogPhaseTables.clear();
}
}

0 comments on commit 806a850

Please sign in to comment.