Skip to content

Commit

Permalink
reassign responsibilities of releasing acquired BufferLeases to RelpF…
Browse files Browse the repository at this point in the history
…rameLeaseful, rename terminate to close (#88)
  • Loading branch information
kortemik authored Mar 7, 2024
1 parent a5c744e commit 0805648
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 24 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/teragrep/rlp_03/FrameContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
public class FrameContext {
private final ConnectionContext connectionContext;
private final RelpFrame relpFrame;

public FrameContext(ConnectionContext connectionContext, RelpFrame relpFrame) {
this.connectionContext = connectionContext;
this.relpFrame = relpFrame;
Expand All @@ -64,4 +65,6 @@ public ConnectionContext connectionContext() {
public RelpFrame relpFrame() {
return relpFrame;
}


}
23 changes: 7 additions & 16 deletions src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void run() {
// FIXME this is quite stateful
RelpFrameLeaseful relpFrame;
if (relpFrames.isEmpty()) {
relpFrame = new RelpFrameLeaseful(new RelpFrameImpl());
relpFrame = new RelpFrameLeaseful(bufferLeasePool, new RelpFrameImpl());
} else {
relpFrame = relpFrames.remove(0);
}
Expand Down Expand Up @@ -221,31 +221,22 @@ private void processFrame(RelpFrameLeaseful relpFrame) {
LOGGER.debug("close requested, not submitting next read runnable");
}

RelpFrameAccess frameAccess = new RelpFrameAccess(relpFrame);

RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame);
FrameContext frameContext = new FrameContext(connectionContext, relpFrameAccess);
FrameProcessor frameProcessor = frameProcessorPool.take(); // FIXME should this be locked to ensure visibility


if (!frameProcessor.isStub()) {
frameProcessor.accept(new FrameContext(connectionContext, frameAccess)); // this thread goes there
frameProcessor.accept(frameContext); // this thread goes there
frameProcessorPool.offer(frameProcessor);
} else {
// TODO should this be IllegalState or should it just '0 serverclose 0' ?
LOGGER.warn("FrameProcessorPool closing, rejecting frame and closing connection for PeerAddress <{}> PeerPort <{}>", connectionContext.socket().getTransportInfo().getPeerAddress(), connectionContext.socket().getTransportInfo().getPeerPort());
connectionContext.close();
}

// terminate access
frameAccess.access().terminate();

// return buffers
List<BufferLease> leases = relpFrame.release();
for (BufferLease bufferLease : leases) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("releasing id <{}> with refs <{}>", bufferLease.id(), bufferLease.refs());
}
bufferLeasePool.offer(bufferLease);
}
// TODO make relpFrame declare close() -> all envelopes close sub-envelope, when outer is closed
relpFrameAccess.close();
relpFrame.close();

LOGGER.debug("processed txFrame. End of thread's processing.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import com.teragrep.rlp_03.context.frame.fragment.Fragment;
import com.teragrep.rlp_03.context.frame.fragment.FragmentAccess;

public class RelpFrameAccess implements RelpFrame {
public class RelpFrameAccess implements RelpFrame, AutoCloseable {

private final Fragment txn;
private final Fragment command;
Expand Down Expand Up @@ -112,7 +112,8 @@ public String toString() {
'}';
}

public Access access() {
return access;
@Override
public void close() {
access.terminate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,25 @@
package com.teragrep.rlp_03.context.frame;

import com.teragrep.rlp_03.context.buffer.BufferLease;
import com.teragrep.rlp_03.context.buffer.BufferLeasePool;
import com.teragrep.rlp_03.context.frame.fragment.Fragment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;

public class RelpFrameLeaseful implements RelpFrame {
public class RelpFrameLeaseful implements RelpFrame, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(RelpFrameLeaseful.class);

private final BufferLeasePool bufferLeasePool;
private final RelpFrameImpl relpFrame;

private final List<BufferLease> leases;

public RelpFrameLeaseful(RelpFrameImpl relpFrame) {
public RelpFrameLeaseful(BufferLeasePool bufferLeasePool, RelpFrameImpl relpFrame) {
this.bufferLeasePool = bufferLeasePool;
this.relpFrame = relpFrame;
this.leases = new LinkedList<>();
}
Expand Down Expand Up @@ -98,8 +105,15 @@ public boolean submit(BufferLease bufferLease) {
return relpFrame.submit(bufferLease.buffer());
}

public List<BufferLease> release() {
return leases;
@Override
public void close() {
// return buffers
for (BufferLease bufferLease : leases) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("releasing id <{}> with refs <{}>", bufferLease.id(), bufferLease.refs());
}
bufferLeasePool.offer(bufferLease);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testRelpFrameAssembly() {
Assertions.assertArrayEquals(relpFrame.endOfTransfer().toBytes(), new byte[]{'\n'});

RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame);
relpFrameAccess.access().terminate();
relpFrameAccess.close();
Assertions.assertThrows(IllegalStateException.class, relpFrameAccess::toString);
}

Expand Down

0 comments on commit 0805648

Please sign in to comment.