Skip to content

Commit

Permalink
[fix][broker] Add expire check for replicator (#23975)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece authored Feb 28, 2025
1 parent 689c16a commit d0025e7
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ protected boolean replicateEntries(List<Entry> entries) {
continue;
}

if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0 /* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
replicatorId, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2009,6 +2009,10 @@ public void checkMessageExpiry() {
sub.expireMessages(messageTtlInSeconds);
}
});
replicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
shadowReplicators.forEach((__, replicator)
-> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ protected boolean replicateEntries(List<Entry> entries) {
continue;
}

if (msg.isExpired(messageTTLInSeconds)) {
msgExpired.recordEvent(0 /* no value stat */);
if (log.isDebugEnabled()) {
log.debug("[{}] Discarding expired message at position {}, replicateTo {}",
replicatorId, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
entry.release();
msg.recycle();
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,36 +1781,36 @@ public void testReplicatorWithTTL() throws Exception {

@Cleanup
Producer<byte[]> persistentProducer1 = client1.newProducer().topic(topic.toString()).create();
// Send V1 message, which will be replicated to the remote cluster by the replicator.
persistentProducer1.send("V1".getBytes());

waitReplicateFinish(topic, admin1);

// Pause replicator
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopicReference(topic.toString()).get();
persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
// Pause replicator
pauseReplicator(persistentReplicator);
});

// Send V2 and V3 messages, then let them expire. These messages will not be replicated to the remote cluster.
persistentProducer1.send("V2".getBytes());
persistentProducer1.send("V3".getBytes());

Thread.sleep(1000);

admin1.topics().expireMessagesForAllSubscriptions(topic.toString(), 1);

// Start replicator
persistentTopic.getReplicators().forEach((cluster, replicator) -> {
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
persistentReplicator.startProducer();
});

waitReplicateFinish(topic, admin1);

// Send V4 message, which will be replicated to the remote cluster.
persistentProducer1.send("V4".getBytes());

waitReplicateFinish(topic, admin1);

// Receive messages from the remote cluster: only V1 and V4 messages should be received.
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Expand All @@ -1828,7 +1828,7 @@ public void testReplicatorWithTTL() throws Exception {
result.add(new String(receive.getValue()));
}

assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4"));
assertEquals(result, Lists.newArrayList("V1", "V4"));
}

@Test
Expand Down

0 comments on commit d0025e7

Please sign in to comment.