Skip to content

Commit

Permalink
implement a fix for 198 (#201)
Browse files Browse the repository at this point in the history
* implement a fix for 198

* clarify errors when NeedsReadException or NeedsWriteException interest op changes throw unexpectedly
  • Loading branch information
kortemik authored Jun 24, 2024
1 parent da8347f commit 85d71ae
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Java Reliable Event Logging Protocol Library Server Implementation RLP-03
* Copyright (C) 2021-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.channel.context;

import java.io.IOException;

public class EndOfStreamException extends IOException {

public EndOfStreamException() {
super();
}

public EndOfStreamException(String message) {
super(message);
}

public EndOfStreamException(String message, Throwable cause) {
super(message, cause);
}

public EndOfStreamException(Throwable cause) {
super(cause);
}
}
145 changes: 67 additions & 78 deletions src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public void run() {
// fill buffers for read
long readBytes = readData();

if (readBytesToOperation(readBytes)) { // TODO should this just throw?
LOGGER.debug("readBytesToOperation(readBytes) forces return");
if (!isDataAvailable(readBytes)) {
break;
}

Expand Down Expand Up @@ -139,6 +138,50 @@ public void run() {
}
}
}
catch (NeedsReadException nre) {
LOGGER.debug("need read", nre);
try {
establishedContext.interestOps().add(OP_READ);
}
catch (CancelledKeyException cke) {
LOGGER.debug("Connection already closed for need read.", cke);
establishedContext.close();
}
catch (Throwable t) {
LOGGER.error("unexpected error while changing socket interest operations to OP_READ", t);
}
}
catch (NeedsWriteException nwe) {
LOGGER.debug("need write", nwe);
needWrite.set(true);
try {
establishedContext.interestOps().add(OP_WRITE);
}
catch (CancelledKeyException cke) {
LOGGER.debug("Connection already closed for need write.", cke);
establishedContext.close();
}
catch (Throwable t) {
LOGGER.error("unexpected error while changing socket interest operations to OP_WRITE", t);
}
}
catch (EndOfStreamException eose) {
// close connection
try {
LOGGER
.warn(
"End of stream for PeerAddress <{}> PeerPort <{}>. Closing Connection.",
establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
}
catch (Exception ignored) {

}
finally {
establishedContext.close();
}
}
catch (Throwable t) {
LOGGER.error("run() threw", t);
establishedContext.close();
Expand All @@ -151,96 +194,42 @@ public void run() {
}
}

private boolean readBytesToOperation(long readBytes) {
private boolean isDataAvailable(long readBytes) throws IOException {
boolean rv;
if (readBytes == 0) {
// socket needs to read more
try {
establishedContext.interestOps().add(OP_READ);
}
catch (CancelledKeyException cke) {
LOGGER
.warn(
"CancelledKeyException <{}>. Closing connection for PeerAddress <{}> PeerPort <{}>",
cke.getMessage(), establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
establishedContext.close();
}
establishedContext.interestOps().add(OP_READ);
LOGGER.debug("more bytes requested from socket");
return true;
rv = false;
}
else if (readBytes < 0) {
LOGGER
.warn(
"socket.read returned <{}>. Closing connection for PeerAddress <{}> PeerPort <{}>",
readBytes, establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
// close connection
establishedContext.close();
return true;
throw new EndOfStreamException("negative readBytes <" + readBytes + ">");
}
else {
rv = true;
}
return false;
return rv;
}

private long readData() {
private long readData() throws IOException {
long readBytes = 0;
try {
List<BufferLease> bufferLeases = bufferLeasePool.take(4);

List<ByteBuffer> byteBufferList = new LinkedList<>();
for (BufferLease bufferLease : bufferLeases) {
if (bufferLease.isStub()) {
continue;
}
byteBufferList.add(bufferLease.buffer());
List<BufferLease> bufferLeases = bufferLeasePool.take(4);

List<ByteBuffer> byteBufferList = new LinkedList<>();
for (BufferLease bufferLease : bufferLeases) {
if (bufferLease.isStub()) {
continue;
}
ByteBuffer[] byteBufferArray = byteBufferList.toArray(new ByteBuffer[0]);
byteBufferList.add(bufferLease.buffer());
}
ByteBuffer[] byteBufferArray = byteBufferList.toArray(new ByteBuffer[0]);

readBytes = establishedContext.socket().read(byteBufferArray);
readBytes = establishedContext.socket().read(byteBufferArray);

activateBuffers(bufferLeases);
activateBuffers(bufferLeases);

LOGGER.debug("establishedContext.read got <{}> bytes from socket", readBytes);
}
catch (NeedsReadException nre) {
try {
establishedContext.interestOps().add(OP_READ);
}
catch (CancelledKeyException cke) {
LOGGER
.warn(
"CancelledKeyException <{}>. Closing connection for PeerAddress <{}> PeerPort <{}>",
cke.getMessage(), establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
establishedContext.close();
}
}
catch (NeedsWriteException nwe) {
needWrite.set(true);
try {
establishedContext.interestOps().add(OP_WRITE);
}
catch (CancelledKeyException cke) {
LOGGER
.warn(
"CancelledKeyException <{}>. Closing connection for PeerAddress <{}> PeerPort <{}>",
cke.getMessage(), establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
establishedContext.close();
}
}
catch (IOException ioException) {
LOGGER
.error(
"IOException <{}> while reading from socket. Closing establishedContext PeerAddress <{}> PeerPort <{}>.",
ioException, establishedContext.socket().getTransportInfo().getPeerAddress(),
establishedContext.socket().getTransportInfo().getPeerPort()
);
establishedContext.close();
}
LOGGER.debug("establishedContext.read got <{}> bytes from socket", readBytes);

return readBytes;
}
Expand Down

0 comments on commit 85d71ae

Please sign in to comment.