From dbaf63056603aae4543e986d58d92effc96a5da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Silv=C3=A9rio?= Date: Sun, 25 Feb 2024 18:52:10 +0000 Subject: [PATCH] Upgrade dependencies, apply code style, retry to contact seeds, record classes, move to Apache Fury and fix for duplicated entries in lww set (#36) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Applied Google/AOSP code style. (#35) Refactored the way seeds are contacted, so we can retry again after some backoff time up to a maximum number of attempts. Turned some classes into record classes. Moved from Kryo to Apache Fury. Fixed issue Entry::equals that would lead to unwanted data duplication as ConcurrentSet::addAll wouldn't recognize duplicated entries. * Bump org.mapstruct.version from 1.5.3.Final to 1.5.5.Final (#30) Bumps `org.mapstruct.version` from 1.5.3.Final to 1.5.5.Final. Updates `org.mapstruct:mapstruct` from 1.5.3.Final to 1.5.5.Final - [Release notes](/~https://github.com/mapstruct/mapstruct/releases) - [Commits](/~https://github.com/mapstruct/mapstruct/compare/1.5.3.Final...1.5.5.Final) Updates `org.mapstruct:mapstruct-processor` from 1.5.3.Final to 1.5.5.Final - [Release notes](/~https://github.com/mapstruct/mapstruct/releases) - [Commits](/~https://github.com/mapstruct/mapstruct/compare/1.5.3.Final...1.5.5.Final) --- updated-dependencies: - dependency-name: org.mapstruct:mapstruct dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.mapstruct:mapstruct-processor dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Rafael Silvério --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- README.md | 16 +- pom.xml | 12 +- stundb/acceptance-tests/pom.xml | 4 +- .../src/main/resources/application.yml | 18 +- .../test/java/com/stundb/modules/Module.java | 5 +- .../modules/providers/PropertiesProvider.java | 5 +- .../java/com/stundb/steps/CacheSteps.java | 236 ++++++++++-------- .../main/java/com/stundb/ApplicationMain.java | 1 - .../main/java/com/stundb/modules/Module.java | 28 +-- .../modules/providers/CacheProvider.java | 7 +- .../providers/CommandHandlerProvider.java | 7 +- .../providers/MessageDigestProvider.java | 4 +- .../modules/providers/UniqueIdProvider.java | 12 +- .../java/com/stundb/server/TcpServerImpl.java | 18 +- .../handlers/nodes/DeregisterHandler.java | 9 +- .../server/handlers/nodes/ElectedHandler.java | 9 +- .../server/handlers/nodes/ListHandler.java | 7 +- .../handlers/nodes/RegisterHandler.java | 7 +- .../handlers/nodes/StartElectionHandler.java | 9 +- .../handlers/nodes/SynchronizeHandler.java | 9 +- .../handlers/store/CapacityHandler.java | 7 +- .../server/handlers/store/ClearHandler.java | 9 +- .../server/handlers/store/DelHandler.java | 9 +- .../server/handlers/store/ExistsHandler.java | 7 +- .../server/handlers/store/GetHandler.java | 7 +- .../server/handlers/store/IsEmptyHandler.java | 7 +- .../server/handlers/store/SetHandler.java | 9 +- .../java/com/stundb/service/SeedService.java | 6 + .../service/impl/ElectionServiceImpl.java | 97 ++++--- .../stundb/service/impl/NodeServiceImpl.java | 83 +++--- .../service/impl/ReplicationServiceImpl.java | 83 +++--- .../stundb/service/impl/SeedServiceImpl.java | 148 +++++++++++ .../stundb/service/impl/StoreServiceImpl.java | 7 +- .../stundb/timers/CoordinatorTimerTask.java | 132 +++++----- .../main/java/com/stundb/utils/NodeUtils.java | 13 +- .../src/main/resources/application.yml | 14 +- .../src/main/resources/logback.xml | 1 + .../test/java/com/stundb/StunDBClient.java | 85 ++----- .../com/stundb/server/TcpServerImplTest.java | 18 +- .../handlers/nodes/DeregisterHandlerTest.java | 15 +- .../handlers/nodes/ElectedHandlerTest.java | 32 +-- .../handlers/nodes/ListHandlerTest.java | 12 +- .../handlers/nodes/NodeHandlerTest.java | 35 ++- .../handlers/nodes/RegisterHandlerTest.java | 14 +- .../nodes/StartElectionHandlerTest.java | 11 +- .../nodes/SynchronizeHandlerTest.java | 15 +- .../handlers/store/CapacityHandlerTest.java | 16 +- .../handlers/store/ClearHandlerTest.java | 16 +- .../server/handlers/store/DelHandlerTest.java | 16 +- .../handlers/store/ExistsHandlerTest.java | 16 +- .../server/handlers/store/GetHandlerTest.java | 17 +- .../handlers/store/IsEmptyHandlerTest.java | 16 +- .../server/handlers/store/SetHandlerTest.java | 15 +- .../handlers/store/StoreHandlerTest.java | 35 ++- .../stundb/service/ElectionServiceTest.java | 8 +- .../service/impl/ElectionServiceImplTest.java | 138 +++++----- .../service/impl/NodeServiceImplTest.java | 80 +++--- .../service/impl/StoreServiceImplTest.java | 26 +- .../java/com/stundb/utils/NodeUtilsTest.java | 42 ++-- .../src/test/resources/application.yml | 16 +- .../src/test/resources/logback.xml | 51 ++++ stundb/core/pom.xml | 12 - .../java/com/stundb/core/cache/Cache.java | 8 + .../java/com/stundb/core/cache/FIFOCache.java | 6 +- .../java/com/stundb/core/codecs/Codec.java | 15 -- .../core/codecs/kryo/DefaultKryoContext.java | 77 ------ .../codecs/kryo/KryoClassRegistrator.java | 9 - .../stundb/core/codecs/kryo/KryoContext.java | 7 - .../core/codecs/kryo/KryoFactoryImpl.java | 18 -- .../configuration/ApplicationProperties.java | 11 +- .../configuration/ConfigurationLoader.java | 17 +- .../main/java/com/stundb/core/crdt/Entry.java | 11 +- .../stundb/core/crdt/LastWriterWinsSet.java | 3 - .../com/stundb/core/logging/Loggable.java | 3 +- .../stundb/core/logging/RequestLogger.java | 5 +- .../core/mappers/ApplicationConfigMapper.java | 14 +- .../stundb/core/mappers/ExecutorsMapper.java | 38 +++ .../stundb/core/models/ApplicationConfig.java | 5 +- .../java/com/stundb/core/models/Capacity.java | 14 +- .../java/com/stundb/core/models/Executor.java | 3 + .../com/stundb/core/models/Executors.java | 7 + .../java/com/stundb/core/models/Status.java | 12 +- .../com/stundb/core/models/TcpClient.java | 10 - .../java/com/stundb/core/models/Timeouts.java | 16 +- .../java/com/stundb/core/models/UniqueId.java | 14 +- .../stundb/net/client/StunDBClientImpl.java | 41 +-- .../providers/ExecutorServiceProvider.java | 6 +- stundb/net/core/pom.xml | 5 + .../com/stundb/net/core/codecs/Codec.java | 13 + .../net/core/codecs/fury/FuryCodec.java | 31 +++ .../com/stundb/net/core/models/Command.java | 42 ++-- .../com/stundb/net/core/models/Version.java | 2 - .../net/core/models/requests/CRDTRequest.java | 3 +- .../net/core/models/requests/DelRequest.java | 3 +- .../models/requests/DeregisterRequest.java | 3 +- .../core/models/requests/ElectedRequest.java | 3 +- .../core/models/requests/ExistsRequest.java | 3 +- .../net/core/models/requests/GetRequest.java | 3 +- .../core/models/requests/RegisterRequest.java | 3 +- .../models/responses/CapacityResponse.java | 3 +- .../core/models/responses/DumpResponse.java | 3 +- .../core/models/responses/EmptyResponse.java | 3 +- .../core/models/responses/ErrorResponse.java | 3 +- .../core/models/responses/ExistsResponse.java | 3 +- .../core/models/responses/GetResponse.java | 3 +- .../models/responses/IsEmptyResponse.java | 3 +- .../models/responses/ListNodesResponse.java | 4 +- .../models/responses/RegisterResponse.java | 3 +- .../core/modules/providers/CodecProvider.java | 109 ++++---- .../java/com/stundb/net/server/TcpServer.java | 169 +++++-------- ...oObjectDecoder.java => ObjectDecoder.java} | 18 +- ...oObjectEncoder.java => ObjectEncoder.java} | 17 +- .../net/server/handlers/CommandHandler.java | 1 + .../handlers/DefaultCommandHandler.java | 13 +- .../net/server/handlers/RequestHandler.java | 17 +- 115 files changed, 1401 insertions(+), 1274 deletions(-) create mode 100644 stundb/application/src/main/java/com/stundb/service/SeedService.java create mode 100644 stundb/application/src/main/java/com/stundb/service/impl/SeedServiceImpl.java create mode 100644 stundb/application/src/test/resources/logback.xml delete mode 100644 stundb/core/src/main/java/com/stundb/core/codecs/Codec.java delete mode 100644 stundb/core/src/main/java/com/stundb/core/codecs/kryo/DefaultKryoContext.java delete mode 100644 stundb/core/src/main/java/com/stundb/core/codecs/kryo/KryoClassRegistrator.java delete mode 100644 stundb/core/src/main/java/com/stundb/core/codecs/kryo/KryoContext.java delete mode 100644 stundb/core/src/main/java/com/stundb/core/codecs/kryo/KryoFactoryImpl.java create mode 100644 stundb/core/src/main/java/com/stundb/core/mappers/ExecutorsMapper.java create mode 100644 stundb/core/src/main/java/com/stundb/core/models/Executor.java create mode 100644 stundb/core/src/main/java/com/stundb/core/models/Executors.java delete mode 100644 stundb/core/src/main/java/com/stundb/core/models/TcpClient.java create mode 100644 stundb/net/core/src/main/java/com/stundb/net/core/codecs/Codec.java create mode 100644 stundb/net/core/src/main/java/com/stundb/net/core/codecs/fury/FuryCodec.java rename stundb/net/server/src/main/java/com/stundb/net/server/codecs/{KryoObjectDecoder.java => ObjectDecoder.java} (71%) rename stundb/net/server/src/main/java/com/stundb/net/server/codecs/{KryoObjectEncoder.java => ObjectEncoder.java} (58%) diff --git a/README.md b/README.md index 55a3d6b..5fc92e0 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,6 @@ ## Work in progress - Not ready for production yet -### Removed -- GRPC interface -- Async stubs for runners -- ~~Rename SyncService to something else (do we really need it?)~~ - we didn't - ### Done - FIFO cache - Locking @@ -15,18 +10,19 @@ - Leader election - Use Netty for TCP server - Use Java Sockets for TCP client +- Implement node statuses so that when a node becomes unreachable, it is ignored until it becomes available again and state is synchronized (remove leader status too) +- Retry contacting seeds until a node replies, up to a maximum number of attempts +- Move from Kryo to Apache Fury ### Doing -- Implement node statuses so that when a node becomes unreachable, it is ignored until it becomes available again and state is synchronized (remove leader status too) - TESTS (unit & acceptance)! ### TODO -- Retry contacting seeds until a node is retrieved - or should we just fail to initialize? -- LBing - Consistent hashing +- Deal with virtual clocks - TTL - Payload compression/decompression when talking between nodes - Authentication - Support multiple cache eviction policies - Change how synchronization works today - - +- Create a Spring Boot app to observe nodes and expose data through a REST API +- Create a node gateway app to receive requests from clients, and load balance requests between nodes diff --git a/pom.xml b/pom.xml index 610227d..aa73311 100644 --- a/pom.xml +++ b/pom.xml @@ -10,9 +10,11 @@ pom 17 - 1.5.3.Final + 1.5.5.Final jdt_apt + 5.10.2 + 3.2.5 1.18.30 0.8.10 false @@ -50,7 +52,7 @@ org.slf4j slf4j-api - 2.0.7 + 2.0.12 ch.qos.logback @@ -73,7 +75,7 @@ org.junit.jupiter junit-jupiter-engine - 5.10.1 + ${junit.version} test @@ -85,7 +87,7 @@ org.junit.jupiter junit-jupiter-params - 5.10.1 + ${junit.version} test @@ -150,7 +152,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.1.2 + ${surefire.version} ${skipTests} diff --git a/stundb/acceptance-tests/pom.xml b/stundb/acceptance-tests/pom.xml index 09f3426..9f181c3 100644 --- a/stundb/acceptance-tests/pom.xml +++ b/stundb/acceptance-tests/pom.xml @@ -17,7 +17,7 @@ org.junit junit-bom - 5.10.1 + ${junit.version} pom import @@ -147,7 +147,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.1.2 + ${surefire.version} [" + channel.id() + "] connected!"); } @@ -112,75 +98,38 @@ protected void initChannel(Channel channel) { private ChannelFutureListener onStartListener(ExecutorService executor) { return channelFuture -> { - initialize(executor) - .handle((response, err) -> { - if (!channelFuture.isSuccess()) { - logger.error("Server failed to start", channelFuture.cause()); - System.exit(1); - } else if (err != null) { - logger.error("Server failed to start", err); - System.exit(1); - } else if (internalCache.isEmpty() && !config.getSeeds().contains(config.getIp() + ":" + config.getPort())) { - logger.error("Node list is empty"); - System.exit(1); - } else if (internalCache.isEmpty() && config.getSeeds().contains(config.getIp() + ":" + config.getPort())) { - var myself = new Node( - config.getIp(), - config.getPort(), - uniqueId.getNumber(), - false, - com.stundb.core.models.Status.create(RUNNING)); - internalCache.put(uniqueId.getText(), myself); - } - - onStart(); - logger.info("Running {} on {}:{}", config.getName(), config.getIp(), config.getPort()); - return response; - }) - .get(); + if (!seedsExcludingCurrentNode().isEmpty()) { + CompletableFuture.runAsync(this::contactSeeds, executor); + } + + startServer(channelFuture); executor.shutdown(); }; } - private CompletableFuture initialize(ExecutorService executor) throws RuntimeException { - return CompletableFuture.runAsync(() -> { - List seeds = config.getSeeds(); - if (seeds.isEmpty()) { - throw new IllegalArgumentException("A list of seeds must be provided"); - } + private void startServer(ChannelFuture channelFuture) { + if (!channelFuture.isSuccess()) { + logger.error("Server failed to start", channelFuture.cause()); + System.exit(1); + } - seeds.stream() - .filter(seed -> !seed.equals(config.getIp() + ":" + config.getPort())) - .forEach(seed -> { - try { - var response = contactSeed(seed); - if (Status.ERROR.equals(response.status())) { - var code = ((ErrorResponse) response.payload()).code(); - throw new RuntimeException(format("Reply from seed was - %s", code)); - } - var data = (RegisterResponse) response.payload(); - data.nodes().forEach(entry -> internalCache.put(entry.uniqueId().toString(), entry)); - // TODO: perhaps we should change how synchronization works - synchronize(data.state()); - } catch (Exception e) { - logger.error("Failed to contact seed " + seed, e); - } - }); - }, executor); - } + var myself = + new Node( + config.getIp(), + config.getPort(), + uniqueId.number(), + false, + com.stundb.core.models.Status.create(RUNNING)); + internalCache.put(uniqueId.text(), myself); - private Response contactSeed(String seed) throws ExecutionException, InterruptedException { - var address = seed.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Invalid seed address " + seed); - } + onStart(); + } - return client.requestAsync(getRegisterRequest(), address[0], Integer.parseInt(address[1])).get(); + private List seedsExcludingCurrentNode() { + return config.getSeeds().stream().filter(seed -> !seed.equals(serverAddress())).toList(); } - private Request getRegisterRequest() { - return Request.buildRequest( - Command.REGISTER, - new RegisterRequest(config.getIp(), config.getPort(), uniqueId.getNumber())); + protected String serverAddress() { + return config.getIp() + ":" + config.getPort(); } } diff --git a/stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectDecoder.java b/stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectDecoder.java similarity index 71% rename from stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectDecoder.java rename to stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectDecoder.java index 0def69e..2ea207d 100644 --- a/stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectDecoder.java +++ b/stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectDecoder.java @@ -1,30 +1,32 @@ package com.stundb.net.server.codecs; -import com.stundb.core.codecs.Codec; import com.stundb.core.logging.Loggable; -import com.stundb.net.core.models.requests.Request; +import com.stundb.net.core.codecs.Codec; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; -public class KryoObjectDecoder extends ByteToMessageDecoder { +public class ObjectDecoder extends ByteToMessageDecoder { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Codec codec; - public KryoObjectDecoder(Codec codec) { + public ObjectDecoder(Codec codec) { setSingleDecode(false); this.codec = codec; } @Loggable @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) + throws IOException { if (in == null || in.readableBytes() <= 0) { return; } @@ -33,12 +35,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { in.readBytes(bytes); try { - var obj = codec.decode(bytes, Request.class); + var obj = codec.decode(bytes); if (obj != null) { out.add(obj); } } catch (RuntimeException e) { - logger.error("Error decoding request", e.getCause()); + logger.error("Error decoding request", e); } } } diff --git a/stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectEncoder.java b/stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectEncoder.java similarity index 58% rename from stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectEncoder.java rename to stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectEncoder.java index 3866723..96f9eae 100644 --- a/stundb/net/server/src/main/java/com/stundb/net/server/codecs/KryoObjectEncoder.java +++ b/stundb/net/server/src/main/java/com/stundb/net/server/codecs/ObjectEncoder.java @@ -1,29 +1,34 @@ package com.stundb.net.server.codecs; -import com.stundb.core.codecs.Codec; import com.stundb.core.logging.Loggable; +import com.stundb.net.core.codecs.Codec; +import com.stundb.net.core.models.responses.Response; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; + import lombok.AllArgsConstructor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + @AllArgsConstructor -public class KryoObjectEncoder extends MessageToByteEncoder { +public class ObjectEncoder extends MessageToByteEncoder { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Codec codec; @Loggable @Override - protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws RuntimeException { + protected void encode(ChannelHandlerContext ctx, Response msg, ByteBuf out) throws IOException { try { - byte[] encode = codec.encode(msg, out.capacity()); + byte[] encode = codec.encode(msg); out.writeBytes(encode); } catch (RuntimeException e) { - logger.error("Error encoding response", e.getCause()); + logger.error("Error encoding response", e); } } } diff --git a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/CommandHandler.java b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/CommandHandler.java index 4ee8e5a..0942625 100644 --- a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/CommandHandler.java +++ b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/CommandHandler.java @@ -3,6 +3,7 @@ import com.stundb.net.core.models.Status; import com.stundb.net.core.models.requests.Request; import com.stundb.net.core.models.responses.Response; + import io.netty.channel.Channel; public interface CommandHandler { diff --git a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/DefaultCommandHandler.java b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/DefaultCommandHandler.java index e8b4dc4..83a7bbf 100644 --- a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/DefaultCommandHandler.java +++ b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/DefaultCommandHandler.java @@ -4,13 +4,16 @@ import com.stundb.net.core.models.Status; import com.stundb.net.core.models.requests.Request; import com.stundb.net.core.models.responses.ErrorResponse; -import com.stundb.net.core.models.responses.Response; + import io.netty.channel.Channel; + +import jakarta.inject.Singleton; + import lombok.NoArgsConstructor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Singleton; import java.util.Arrays; @Singleton @@ -26,8 +29,10 @@ public boolean isSupported(Request request) { @Override public void execute(Request request, Channel channel) { - logger.error("No command handler found for: {} - If a new command handler has been introduced, " + - "make sure to update the SPI file.", request.command()); + logger.error( + "No command handler found for: {} - If a new command handler has been introduced, " + + "make sure to update the SPI file.", + request.command()); var result = new ErrorResponse("error.invalid.command"); channel.writeAndFlush(result); writeAndFlush(request, result, Status.ERROR, channel); diff --git a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/RequestHandler.java b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/RequestHandler.java index 1ad1d1b..ab45c31 100644 --- a/stundb/net/server/src/main/java/com/stundb/net/server/handlers/RequestHandler.java +++ b/stundb/net/server/src/main/java/com/stundb/net/server/handlers/RequestHandler.java @@ -2,8 +2,11 @@ import com.stundb.core.logging.Loggable; import com.stundb.net.core.models.requests.Request; + import io.netty.channel.*; + import lombok.AllArgsConstructor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +19,6 @@ public class RequestHandler extends SimpleChannelInboundHandler { private final Logger logger = LoggerFactory.getLogger(getClass()); private List handlers; - private DefaultCommandHandler defaultCommandHandler; @Loggable @@ -44,15 +46,18 @@ private void executeCommand(ChannelHandlerContext ctx, Request request, CommandH private void closeChannel(Channel channel) { logger.debug("----> [" + channel.id() + "] reply sent!"); - channel.close().addListener((ChannelFutureListener) future -> { - future.awaitUninterruptibly(); - logger.debug("----> [" + channel.id() + "] disconnected"); - }); + channel.close() + .addListener( + (ChannelFutureListener) + future -> { + future.awaitUninterruptibly(); + logger.debug("----> [" + channel.id() + "] disconnected"); + }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + logger.warn(cause.getMessage(), cause); ctx.close(); } }