diff --git a/src/main/java/com/teragrep/rlp_03/FrameContext.java b/src/main/java/com/teragrep/rlp_03/FrameContext.java index 3e5d0c03..32a4b440 100644 --- a/src/main/java/com/teragrep/rlp_03/FrameContext.java +++ b/src/main/java/com/teragrep/rlp_03/FrameContext.java @@ -52,6 +52,7 @@ public class FrameContext { private final ConnectionContext connectionContext; private final RelpFrame relpFrame; + public FrameContext(ConnectionContext connectionContext, RelpFrame relpFrame) { this.connectionContext = connectionContext; this.relpFrame = relpFrame; @@ -64,4 +65,6 @@ public ConnectionContext connectionContext() { public RelpFrame relpFrame() { return relpFrame; } + + } diff --git a/src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java b/src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java index 1462ea77..d745b0ac 100644 --- a/src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java +++ b/src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java @@ -111,7 +111,7 @@ public void run() { // FIXME this is quite stateful RelpFrameLeaseful relpFrame; if (relpFrames.isEmpty()) { - relpFrame = new RelpFrameLeaseful(new RelpFrameImpl()); + relpFrame = new RelpFrameLeaseful(bufferLeasePool, new RelpFrameImpl()); } else { relpFrame = relpFrames.remove(0); } @@ -221,13 +221,12 @@ private void processFrame(RelpFrameLeaseful relpFrame) { LOGGER.debug("close requested, not submitting next read runnable"); } - RelpFrameAccess frameAccess = new RelpFrameAccess(relpFrame); - + RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame); + FrameContext frameContext = new FrameContext(connectionContext, relpFrameAccess); FrameProcessor frameProcessor = frameProcessorPool.take(); // FIXME should this be locked to ensure visibility - if (!frameProcessor.isStub()) { - frameProcessor.accept(new FrameContext(connectionContext, frameAccess)); // this thread goes there + frameProcessor.accept(frameContext); // this thread goes there frameProcessorPool.offer(frameProcessor); } else { // TODO should this be IllegalState or should it just '0 serverclose 0' ? @@ -235,17 +234,9 @@ private void processFrame(RelpFrameLeaseful relpFrame) { connectionContext.close(); } - // terminate access - frameAccess.access().terminate(); - - // return buffers - List leases = relpFrame.release(); - for (BufferLease bufferLease : leases) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("releasing id <{}> with refs <{}>", bufferLease.id(), bufferLease.refs()); - } - bufferLeasePool.offer(bufferLease); - } + // TODO make relpFrame declare close() -> all envelopes close sub-envelope, when outer is closed + relpFrameAccess.close(); + relpFrame.close(); LOGGER.debug("processed txFrame. End of thread's processing."); } diff --git a/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameAccess.java b/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameAccess.java index 9766dafe..bf912df3 100644 --- a/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameAccess.java +++ b/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameAccess.java @@ -50,7 +50,7 @@ import com.teragrep.rlp_03.context.frame.fragment.Fragment; import com.teragrep.rlp_03.context.frame.fragment.FragmentAccess; -public class RelpFrameAccess implements RelpFrame { +public class RelpFrameAccess implements RelpFrame, AutoCloseable { private final Fragment txn; private final Fragment command; @@ -112,7 +112,8 @@ public String toString() { '}'; } - public Access access() { - return access; + @Override + public void close() { + access.terminate(); } } diff --git a/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameLeaseful.java b/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameLeaseful.java index ade15039..85d90a4c 100644 --- a/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameLeaseful.java +++ b/src/main/java/com/teragrep/rlp_03/context/frame/RelpFrameLeaseful.java @@ -47,18 +47,25 @@ package com.teragrep.rlp_03.context.frame; import com.teragrep.rlp_03.context.buffer.BufferLease; +import com.teragrep.rlp_03.context.buffer.BufferLeasePool; import com.teragrep.rlp_03.context.frame.fragment.Fragment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; -public class RelpFrameLeaseful implements RelpFrame { +public class RelpFrameLeaseful implements RelpFrame, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(RelpFrameLeaseful.class); + + private final BufferLeasePool bufferLeasePool; private final RelpFrameImpl relpFrame; private final List leases; - public RelpFrameLeaseful(RelpFrameImpl relpFrame) { + public RelpFrameLeaseful(BufferLeasePool bufferLeasePool, RelpFrameImpl relpFrame) { + this.bufferLeasePool = bufferLeasePool; this.relpFrame = relpFrame; this.leases = new LinkedList<>(); } @@ -98,8 +105,15 @@ public boolean submit(BufferLease bufferLease) { return relpFrame.submit(bufferLease.buffer()); } - public List release() { - return leases; + @Override + public void close() { + // return buffers + for (BufferLease bufferLease : leases) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("releasing id <{}> with refs <{}>", bufferLease.id(), bufferLease.refs()); + } + bufferLeasePool.offer(bufferLease); + } } @Override diff --git a/src/test/java/com/teragrep/rlp_03/context/frame/RelpFrameTest.java b/src/test/java/com/teragrep/rlp_03/context/frame/RelpFrameTest.java index af0753ce..fa717e80 100644 --- a/src/test/java/com/teragrep/rlp_03/context/frame/RelpFrameTest.java +++ b/src/test/java/com/teragrep/rlp_03/context/frame/RelpFrameTest.java @@ -73,7 +73,7 @@ public void testRelpFrameAssembly() { Assertions.assertArrayEquals(relpFrame.endOfTransfer().toBytes(), new byte[]{'\n'}); RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame); - relpFrameAccess.access().terminate(); + relpFrameAccess.close(); Assertions.assertThrows(IllegalStateException.class, relpFrameAccess::toString); }