Skip to content

Commit

Permalink
Don't subscribe in AttestationTopicSubscriber if subscription is outd…
Browse files Browse the repository at this point in the history
…ated (Consensys#8837)
  • Loading branch information
zilm13 committed Dec 3, 2024
1 parent 247b0f7 commit bb5d070
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
Expand All @@ -39,6 +40,7 @@ public class AttestationTopicSubscriber implements SlotEventsChannel {
private final Eth2P2PNetwork eth2P2PNetwork;
private final Spec spec;
private final SettableLabelledGauge subnetSubscriptionsGauge;
private final AtomicReference<UInt64> currentSlot = new AtomicReference<>(null);

public AttestationTopicSubscriber(
final Spec spec,
Expand All @@ -56,6 +58,15 @@ public synchronized void subscribeToCommitteeForAggregation(
aggregationSlot, UInt64.valueOf(committeeIndex), committeesAtSlot);
final UInt64 currentUnsubscriptionSlot = subnetIdToUnsubscribeSlot.getOrDefault(subnetId, ZERO);
final UInt64 unsubscribeSlot = currentUnsubscriptionSlot.max(aggregationSlot);
final UInt64 maybeCurrentSlot = currentSlot.get();
if (maybeCurrentSlot != null && unsubscribeSlot.isLessThan(maybeCurrentSlot)) {
LOG.trace(
"Skipping outdated aggregation subnet {} with unsubscribe due at slot {}",
subnetId,
unsubscribeSlot);
return;
}

if (currentUnsubscriptionSlot.equals(ZERO)) {
eth2P2PNetwork.subscribeToAttestationSubnetId(subnetId);
toggleAggregateSubscriptionMetric(subnetId, false);
Expand Down Expand Up @@ -96,15 +107,25 @@ public synchronized void subscribeToPersistentSubnets(
boolean shouldUpdateENR = false;

for (SubnetSubscription subnetSubscription : newSubscriptions) {
int subnetId = subnetSubscription.subnetId();
final int subnetId = subnetSubscription.subnetId();
final UInt64 maybeCurrentSlot = currentSlot.get();
if (maybeCurrentSlot != null
&& subnetSubscription.unsubscriptionSlot().isLessThan(maybeCurrentSlot)) {
LOG.trace(
"Skipping outdated persistent subnet {} with unsubscribe due at slot {}",
subnetId,
subnetSubscription.unsubscriptionSlot());
continue;
}

shouldUpdateENR = persistentSubnetIdSet.add(subnetId) || shouldUpdateENR;
LOG.trace(
"Subscribing to persistent subnet {} with unsubscribe due at slot {}",
subnetId,
subnetSubscription.unsubscriptionSlot());
if (subnetIdToUnsubscribeSlot.containsKey(subnetId)) {
UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot.get(subnetId);
UInt64 unsubscriptionSlot =
final UInt64 existingUnsubscriptionSlot = subnetIdToUnsubscribeSlot.get(subnetId);
final UInt64 unsubscriptionSlot =
existingUnsubscriptionSlot.max(subnetSubscription.unsubscriptionSlot());
LOG.trace(
"Already subscribed to subnet {}, updating unsubscription slot to {}",
Expand All @@ -127,14 +148,15 @@ public synchronized void subscribeToPersistentSubnets(

@Override
public synchronized void onSlot(final UInt64 slot) {
currentSlot.set(slot);
boolean shouldUpdateENR = false;

final Iterator<Int2ObjectMap.Entry<UInt64>> iterator =
subnetIdToUnsubscribeSlot.int2ObjectEntrySet().iterator();
while (iterator.hasNext()) {
final Int2ObjectMap.Entry<UInt64> entry = iterator.next();
if (entry.getValue().compareTo(slot) < 0) {
int subnetId = entry.getIntKey();
final int subnetId = entry.getIntKey();
LOG.trace("Unsubscribing from subnet {}", subnetId);
eth2P2PNetwork.unsubscribeFromAttestationSubnetId(subnetId);
if (persistentSubnetIdSet.contains(subnetId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -47,6 +48,7 @@ public void shouldSubscribeToSubnet() {
final int committeeId = 10;
final int subnetId =
spec.computeSubnetForCommittee(ONE, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
subscriber.onSlot(ONE);
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, ONE);

verify(settableLabelledGaugeMock)
Expand Down Expand Up @@ -155,13 +157,52 @@ public void shouldPreserveLaterSubscriptionPeriodWhenEarlierSlotAdded() {
verify(eth2P2PNetwork).unsubscribeFromAttestationSubnetId(subnetId);
}

@Test
public void shouldNotSubscribeForExpiredAggregationSubnet() {
final int committeeId = 3;
final UInt64 slot = UInt64.valueOf(10);
final int subnetId =
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
// Sanity check second subscription is for the same subnet ID.
assertThat(subnetId)
.isEqualTo(
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT));

subscriber.onSlot(slot.plus(ONE));
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, slot);
verifyNoMoreInteractions(settableLabelledGaugeMock);
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(anyInt());
verify(eth2P2PNetwork, never()).unsubscribeFromAttestationSubnetId(anyInt());
}

@Test
public void shouldNotSubscribeForExpiredPersistentSubnet() {
Set<SubnetSubscription> subnetSubscriptions =
Set.of(
new SubnetSubscription(2, UInt64.valueOf(15)),
new SubnetSubscription(1, UInt64.valueOf(20)));

subscriber.onSlot(UInt64.valueOf(16));
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);

verify(settableLabelledGaugeMock)
.set(1, String.format(AttestationTopicSubscriber.GAUGE_PERSISTENT_SUBNETS_LABEL, 1));
verifyNoMoreInteractions(settableLabelledGaugeMock);

verify(eth2P2PNetwork).setLongTermAttestationSubnetSubscriptions(IntSet.of(1));

verify(eth2P2PNetwork).subscribeToAttestationSubnetId(1);
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(eq(2));
}

@Test
public void shouldSubscribeToNewSubnetsAndUpdateENR_forPersistentSubscriptions() {
Set<SubnetSubscription> subnetSubscriptions =
Set.of(
new SubnetSubscription(1, UInt64.valueOf(20)),
new SubnetSubscription(2, UInt64.valueOf(15)));

subscriber.onSlot(UInt64.valueOf(15));
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);

verify(settableLabelledGaugeMock)
Expand Down

0 comments on commit bb5d070

Please sign in to comment.