diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index ff8adfceec0b..508ace390565 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -492,16 +492,33 @@ private void batch(TableName tableName, Collection> allRows, int batch } futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList())); } + // Here we will always wait until all futures are finished, even if there are failures when + // getting from a future in the middle. This is because this method may be called in a rpc call, + // so the batch operations may reference some off heap cells(through CellScanner). If we return + // earlier here, the rpc call may be finished and they will release the off heap cells before + // some of the batch operations finish, and then cause corrupt data or even crash the region + // server. See HBASE-28584 and HBASE-28850 for more details. + IOException error = null; for (Future future : futures) { try { FutureUtils.get(future); } catch (RetriesExhaustedException e) { + IOException ioe; if (e.getCause() instanceof TableNotFoundException) { - throw new TableNotFoundException("'" + tableName + "'"); + ioe = new TableNotFoundException("'" + tableName + "'"); + } else { + ioe = e; + } + if (error == null) { + error = ioe; + } else { + error.addSuppressed(ioe); } - throw e; } } + if (error != null) { + throw error; + } } private AsyncClusterConnection getConnection() throws IOException {