Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create State implementation with registering all of the specific ReadableKVStates #9700

Merged
merged 13 commits into from
Nov 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state;

import static com.swirlds.state.StateChangeListener.StateType.MAP;
import static com.swirlds.state.StateChangeListener.StateType.QUEUE;
import static com.swirlds.state.StateChangeListener.StateType.SINGLETON;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.hedera.mirror.web3.state.core.ListReadableQueueState;
import com.hedera.mirror.web3.state.core.ListWritableQueueState;
import com.hedera.mirror.web3.state.core.MapReadableKVState;
import com.hedera.mirror.web3.state.core.MapReadableStates;
import com.hedera.mirror.web3.state.core.MapWritableKVState;
import com.hedera.mirror.web3.state.core.MapWritableStates;
import com.swirlds.state.State;
import com.swirlds.state.StateChangeListener;
import com.swirlds.state.spi.EmptyWritableStates;
import com.swirlds.state.spi.KVChangeListener;
import com.swirlds.state.spi.QueueChangeListener;
import com.swirlds.state.spi.ReadableSingletonStateBase;
import com.swirlds.state.spi.ReadableStates;
import com.swirlds.state.spi.WritableKVStateBase;
import com.swirlds.state.spi.WritableQueueStateBase;
import com.swirlds.state.spi.WritableSingletonStateBase;
import com.swirlds.state.spi.WritableStates;
import edu.umd.cs.findbugs.annotations.NonNull;
import jakarta.annotation.Nonnull;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

@SuppressWarnings({"rawtypes", "unchecked"})
@Named
public class MirrorNodeState implements State {

private final Map<String, ReadableStates> readableStates = new ConcurrentHashMap<>();
private final Map<String, WritableStates> writableStates = new ConcurrentHashMap<>();

// Key is Service, value is Map of state name to state datasource
private final Map<String, Map<String, Object>> states = new ConcurrentHashMap<>();
private final List<StateChangeListener> listeners = new ArrayList<>();

public MirrorNodeState addService(@NonNull final String serviceName, @NonNull final Map<String, ?> dataSources) {
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
final var serviceStates = this.states.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
dataSources.forEach((k, b) -> {
if (!serviceStates.containsKey(k)) {
serviceStates.put(k, b);
}
});

// Purge any readable states whose state definitions are now stale,
// since they don't include the new data sources we just added
readableStates.remove(serviceName);
writableStates.remove(serviceName);
return this;
}

/**
* Removes the state with the given key for the service with the given name.
*
* @param serviceName the name of the service
* @param stateKey the key of the state
*/
public void removeServiceState(@NonNull final String serviceName, @NonNull final String stateKey) {
requireNonNull(serviceName);
requireNonNull(stateKey);
this.states.computeIfPresent(serviceName, (k, v) -> {
v.remove(stateKey);
// Purge any readable states whose state definitions are now stale,
// since they still include the data sources we just removed
readableStates.remove(serviceName);
writableStates.remove(serviceName);
return v;
});
}

@Nonnull
@Override
public ReadableStates getReadableStates(@Nonnull String serviceName) {
return readableStates.computeIfAbsent(serviceName, s -> {
final var serviceStates = this.states.get(s);
if (serviceStates == null) {
return new MapReadableStates(new HashMap<>());
}
final Map<String, Object> data = new ConcurrentHashMap<>();
for (final var entry : serviceStates.entrySet()) {
final var stateName = entry.getKey();
final var state = entry.getValue();
if (state instanceof Queue queue) {
data.put(stateName, new ListReadableQueueState(stateName, queue));
} else if (state instanceof Map map) {
data.put(stateName, new MapReadableKVState(stateName, map));
} else if (state instanceof AtomicReference ref) {
data.put(stateName, new ReadableSingletonStateBase<>(stateName, ref::get));
}
}
return new MapReadableStates(data);
});
}

@Nonnull
@Override
public WritableStates getWritableStates(@Nonnull String serviceName) {
return writableStates.computeIfAbsent(serviceName, s -> {
final var serviceStates = states.get(s);
if (serviceStates == null) {
return new EmptyWritableStates();
}
final Map<String, Object> data = new ConcurrentHashMap<>();
for (final var entry : serviceStates.entrySet()) {
final var stateName = entry.getKey();
final var state = entry.getValue();
if (state instanceof Queue<?> queue) {
data.put(
stateName,
withAnyRegisteredListeners(serviceName, new ListWritableQueueState<>(stateName, queue)));
} else if (state instanceof Map<?, ?> map) {
data.put(
stateName,
withAnyRegisteredListeners(serviceName, new MapWritableKVState<>(stateName, map)));
} else if (state instanceof AtomicReference<?> ref) {
data.put(stateName, withAnyRegisteredListeners(serviceName, stateName, ref));
}
}
return new MapWritableStates(data, () -> readableStates.remove(serviceName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause a commit to flush the readable state and cause it to get re-created for every request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will. I tested it without the runnable and it seems to work fine for 1 request. This was added in the example code probably for a reason. I will remove it for now and we will investigate the behaviour on parallel requests later.

});
}

@Override
public void registerCommitListener(@NonNull final StateChangeListener listener) {
requireNonNull(listener);
listeners.add(listener);
}

@Override
public void unregisterCommitListener(@NonNull final StateChangeListener listener) {
requireNonNull(listener);
listeners.remove(listener);
}

public void commit() {
writableStates.values().forEach(writableStatesValue -> {
if (writableStatesValue instanceof MapWritableStates mapWritableStates) {
mapWritableStates.commit();
}
});
}

private <V> WritableSingletonStateBase<V> withAnyRegisteredListeners(
@NonNull final String serviceName, @NonNull final String stateKey, @NonNull final AtomicReference<V> ref) {
final var state = new WritableSingletonStateBase<>(stateKey, ref::get, ref::set);
listeners.forEach(listener -> {
if (listener.stateTypes().contains(SINGLETON)) {
registerSingletonListener(serviceName, state, listener);
}
});
return state;
}

private <K, V> MapWritableKVState<K, V> withAnyRegisteredListeners(
@NonNull final String serviceName, @NonNull final MapWritableKVState<K, V> state) {
listeners.forEach(listener -> {
if (listener.stateTypes().contains(MAP)) {
registerKVListener(serviceName, state, listener);
}
});
return state;
}

private <T> ListWritableQueueState<T> withAnyRegisteredListeners(
@NonNull final String serviceName, @NonNull final ListWritableQueueState<T> state) {
listeners.forEach(listener -> {
if (listener.stateTypes().contains(QUEUE)) {
registerQueueListener(serviceName, state, listener);
}
});
return state;
}

private <V> void registerSingletonListener(
@NonNull final String serviceName,
@NonNull final WritableSingletonStateBase<V> singletonState,
@NonNull final StateChangeListener listener) {
final var stateId = listener.stateIdFor(serviceName, singletonState.getStateKey());
singletonState.registerListener(value -> listener.singletonUpdateChange(stateId, value));
}

private <V> void registerQueueListener(
@NonNull final String serviceName,
@NonNull final WritableQueueStateBase<V> queueState,
@NonNull final StateChangeListener listener) {
final var stateId = listener.stateIdFor(serviceName, queueState.getStateKey());
queueState.registerListener(new QueueChangeListener<>() {
@Override
public void queuePushChange(@NonNull final V value) {
listener.queuePushChange(stateId, value);
}

@Override
public void queuePopChange() {
listener.queuePopChange(stateId);
}
});
}

private <K, V> void registerKVListener(
@NonNull final String serviceName, WritableKVStateBase<K, V> state, StateChangeListener listener) {
final var stateId = listener.stateIdFor(serviceName, state.getStateKey());
state.registerListener(new KVChangeListener<>() {
@Override
public void mapUpdateChange(@NonNull final K key, @NonNull final V value) {
listener.mapUpdateChange(stateId, key, value);
}

@Override
public void mapDeleteChange(@NonNull final K key) {
listener.mapDeleteChange(stateId, key);
}
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MirrorNodeState that = (MirrorNodeState) o;
return Objects.equals(readableStates, that.readableStates)
&& Objects.equals(writableStates, that.writableStates)
&& Objects.equals(states, that.states)
&& Objects.equals(listeners, that.listeners);
}

@Override
public int hashCode() {
return Objects.hash(readableStates, writableStates, states, listeners);
}
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved

@VisibleForTesting
void setWritableStates(final Map<String, WritableStates> writableStates) {
this.writableStates.putAll(writableStates);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.core;

import com.swirlds.state.spi.ReadableStates;
import jakarta.annotation.Nonnull;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

abstract class AbstractMapReadableState implements ReadableStates {

protected final Map<String, ?> states;

protected AbstractMapReadableState(@Nonnull final Map<String, ?> states) {
this.states = Objects.requireNonNull(states);
}

@Override
public boolean contains(@Nonnull String stateKey) {
return states.containsKey(stateKey);
}

@Nonnull
@Override
public Set<String> stateKeys() {
return Collections.unmodifiableSet(states.keySet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.core;

import com.swirlds.state.spi.ReadableQueueStateBase;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;

public class ListReadableQueueState<E> extends ReadableQueueStateBase<E> {

/** Represents the backing storage for this state */
private final Queue<E> backingStore;

/**
* Create an instance using the given Queue as the backing store. This is useful when you want to
* pre-populate the queue, or if you want to use Mockito to mock it or cause it to throw
* exceptions when certain keys are accessed, etc.
*
* @param stateKey The state key for this state
* @param backingStore The backing store to use
*/
public ListReadableQueueState(@Nonnull final String stateKey, @Nonnull final Queue<E> backingStore) {
super(stateKey);
this.backingStore = Objects.requireNonNull(backingStore);
}

@Nullable
@Override
protected E peekOnDataSource() {
return backingStore.peek();
}

@Nonnull
@Override
protected Iterator<E> iterateOnDataSource() {
return backingStore.iterator();
}
}
Loading
Loading