Skip to content

Commit

Permalink
Delegate pool (#106)
Browse files Browse the repository at this point in the history
* add PoolingDelegateTest, indicates it does not work

* refactor delegates so they work with pooling
  • Loading branch information
kortemik authored Mar 15, 2024
1 parent 5a15485 commit 9a1a343
Show file tree
Hide file tree
Showing 15 changed files with 503 additions and 210 deletions.
51 changes: 1 addition & 50 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ Dependencies for examples
</project>
----

Server with shared handler for all connections
Server with shared handler for all connections. See ExampleRelpClient.java for client.

[source, java]
----
package com.teragrep.rlp_03.readme;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.Server;
import com.teragrep.rlp_03.ServerFactory;
Expand All @@ -78,10 +76,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -186,49 +180,6 @@ public class ReadmeTest {
throw new RuntimeException(e);
}
}
/**
* ExampleRelpClient using rlp_01 for demonstration
*/
private class ExampleRelpClient {
private final int port;
ExampleRelpClient(int port) {
this.port = port;
}
public void send(String record) {
RelpConnection relpConnection = new RelpConnection();
try {
relpConnection.connect("localhost", port);
}
catch (IOException | TimeoutException exception) {
throw new RuntimeException(exception);
}
RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(record.getBytes(StandardCharsets.UTF_8));
while (!relpBatch.verifyTransactionAll()) {
relpBatch.retryAllFailed();
try {
relpConnection.commit(relpBatch);
}
catch (IOException | TimeoutException exception) {
throw new RuntimeException(exception);
}
}
try {
relpConnection.disconnect();
}
catch (IOException | TimeoutException exception) {
throw new RuntimeException(exception);
}
finally {
relpConnection.tearDown();
}
}
}
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,76 +49,43 @@

import com.teragrep.rlp_01.RelpCommand;
import com.teragrep.rlp_03.FrameContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.teragrep.rlp_03.delegate.event.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;


public class DefaultFrameDelegate implements FrameDelegate {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFrameDelegate.class);

private final Map<String, RelpEvent> relpCommandConsumerMap;
private final RelpEvent relpEventServerClose;
private final AtomicInteger txId;
private final FrameDelegate frameDelegate;

public DefaultFrameDelegate(Consumer<FrameContext> cbFunction) {
this(new HashMap<>());
Map<String, RelpEvent> relpCommandConsumerMap = new HashMap<>();
relpCommandConsumerMap.put(RelpCommand.CLOSE, new RelpEventClose());
relpCommandConsumerMap.put(RelpCommand.OPEN, new RelpEventOpen());
relpCommandConsumerMap.put(RelpCommand.SYSLOG, new RelpEventSyslog(cbFunction));

this.frameDelegate = new SequencingDelegate(new EventDelegate(relpCommandConsumerMap));
}

public DefaultFrameDelegate(Map<String, RelpEvent> relpCommandConsumerMap) {
this.relpCommandConsumerMap = relpCommandConsumerMap;
this.relpEventServerClose = new RelpEventServerClose();
this.txId = new AtomicInteger();
this.frameDelegate = new SequencingDelegate(new EventDelegate(relpCommandConsumerMap));
}

@Override
public boolean accept(FrameContext frameContext) {
boolean rv = true;

int nextTxnId = txId.incrementAndGet();

if (nextTxnId == 999_999_999) {
// wraps around after 999999999
LOGGER.debug("txnId wrapped at <{}>", nextTxnId);
txId.set(0);
}

if (nextTxnId != frameContext.relpFrame().txn().toInt()) {
throw new IllegalArgumentException("frame txn not sequencing");
}

String relpCommand = frameContext.relpFrame().command().toString();

Consumer<FrameContext> commandConsumer = relpCommandConsumerMap.getOrDefault(relpCommand, relpEventServerClose);

commandConsumer.accept(frameContext);


if (RelpCommand.CLOSE.equals(relpCommand)) {
// TODO refactor commandConsumer to return indication of further reads
rv = false;
}

return rv;
return frameDelegate.accept(frameContext);
}

@Override
public void close() throws Exception {
for (AutoCloseable autoCloseable : relpCommandConsumerMap.values()) {
autoCloseable.close();
}
frameDelegate.close();
}

@Override
public boolean isStub() {
return false;
return frameDelegate.isStub();
}

}
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/com/teragrep/rlp_03/delegate/EventDelegate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Java Reliable Event Logging Protocol Library Server Implementation RLP-03
* Copyright (C) 2021,2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.delegate;

import com.teragrep.rlp_01.RelpCommand;
import com.teragrep.rlp_03.FrameContext;
import com.teragrep.rlp_03.delegate.event.RelpEvent;
import com.teragrep.rlp_03.delegate.event.RelpEventServerClose;

import java.util.Map;
import java.util.function.Consumer;

public final class EventDelegate implements FrameDelegate {

private final Map<String, RelpEvent> relpEventMap;
private final RelpEvent relpEventServerClose;

public EventDelegate(Map<String, RelpEvent> relpEventMap) {
this.relpEventMap = relpEventMap;
this.relpEventServerClose = new RelpEventServerClose();
}

@Override
public boolean accept(FrameContext frameContext) {
boolean rv = true;
String relpCommand = frameContext.relpFrame().command().toString();

Consumer<FrameContext> commandConsumer = relpEventMap.getOrDefault(relpCommand, relpEventServerClose);

commandConsumer.accept(frameContext);


if (RelpCommand.CLOSE.equals(relpCommand)) {
// TODO refactor commandConsumer to return indication of further reads
rv = false;
}
return rv;
}

@Override
public void close() throws Exception {
for (AutoCloseable autoCloseable : relpEventMap.values()) {
autoCloseable.close();
}
}

@Override
public boolean isStub() {
return false;
}
}
103 changes: 103 additions & 0 deletions src/main/java/com/teragrep/rlp_03/delegate/SequencingDelegate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Java Reliable Event Logging Protocol Library Server Implementation RLP-03
* Copyright (C) 2021,2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.delegate;

import com.teragrep.rlp_03.FrameContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Decorator for FrameDelegate that ensures relp frame transaction numbers increment
*/
public final class SequencingDelegate implements FrameDelegate {

private static final Logger LOGGER = LoggerFactory.getLogger(SequencingDelegate.class);

private final FrameDelegate frameDelegate;
private final AtomicInteger txId;

public SequencingDelegate(FrameDelegate frameDelegate) {
this.frameDelegate = frameDelegate;

this.txId = new AtomicInteger();
}


@Override
public boolean accept(FrameContext frameContext) {

final int frameTxnId = frameContext.relpFrame().txn().toInt();

// zero id is ignored, it is special by relp specification
if (frameTxnId != 0) {
int nextTxnId = txId.incrementAndGet();

if (nextTxnId == 999_999_999) {
// wraps around after 999999999
LOGGER.debug("txnId wrapped at <{}>", nextTxnId);
txId.set(0);
}

if (nextTxnId != frameTxnId) {
throw new IllegalArgumentException("frame txn not sequencing");
}
}

return frameDelegate.accept(frameContext);
}

@Override
public void close() throws Exception {
frameDelegate.close();
}

@Override
public boolean isStub() {
return frameDelegate.isStub();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.delegate;
package com.teragrep.rlp_03.delegate.event;

import com.teragrep.rlp_01.RelpFrameTX;
import com.teragrep.rlp_03.FrameContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.delegate;
package com.teragrep.rlp_03.delegate.event;

import com.teragrep.rlp_01.RelpCommand;
import com.teragrep.rlp_01.RelpFrameTX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.delegate;
package com.teragrep.rlp_03.delegate.event;

import com.teragrep.rlp_01.RelpCommand;
import com.teragrep.rlp_01.RelpFrameTX;
Expand Down
Loading

0 comments on commit 9a1a343

Please sign in to comment.