Skip to content

Commit

Permalink
Merge pull request #318 from LossyDragon/websocket-ktor
Browse files Browse the repository at this point in the history
Replace WebSocket with Ktor
  • Loading branch information
LossyDragon authored Feb 5, 2025
2 parents 4ba8443 + e3502d2 commit c44c708
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 170 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ plugins {

allprojects {
group = "in.dragonbra"
version = "1.6.0-SNAPSHOT"
version = "1.6.0"
}

repositories {
Expand Down Expand Up @@ -122,6 +122,7 @@ dependencies {
implementation(libs.okHttp)
implementation(libs.xz)
implementation(libs.protobuf.java)
implementation(libs.bundles.ktor)

testImplementation(libs.bundles.testing)
}
Expand Down
9 changes: 9 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "p
protobuf-protoc = { module = "com.google.protobuf:protoc", version.ref = "protobuf" }
qrCode = { module = "pro.leaco.qrcode:console-qrcode", version.ref = "qrCode" }
xz = { module = "org.tukaani:xz", version.ref = "xz" }
ktor-client-core = { module = "io.ktor:ktor-client-core", version = "3.0.3" }
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version = "3.0.3" }
ktor-client-websocket = { module = "io.ktor:ktor-client-websockets", version = "3.0.3" }

test-commons-codec = { module = "commons-codec:commons-codec", version.ref = "commonsCodec" }
test-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit5" }
Expand Down Expand Up @@ -71,3 +74,9 @@ testing = [
"test-mockito-core",
"test-mockito-jupiter",
]

ktor = [
"ktor-client-core",
"ktor-client-cio",
"ktor-client-websocket",
]

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,105 +1,175 @@
package `in`.dragonbra.javasteam.networking.steam3

import `in`.dragonbra.javasteam.util.log.LogManager
import `in`.dragonbra.javasteam.util.log.Logger
import okhttp3.Response
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.plugins.websocket.pingInterval
import io.ktor.client.plugins.websocket.webSocketSession
import io.ktor.http.URLProtocol
import io.ktor.http.path
import io.ktor.websocket.Frame
import io.ktor.websocket.WebSocketSession
import io.ktor.websocket.close
import io.ktor.websocket.readBytes
import io.ktor.websocket.readText
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.URI
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.time.DurationUnit
import kotlin.time.toDuration

class WebSocketConnection :
Connection(),
WebSocketCMClient.WSListener {
CoroutineScope {

companion object {
private val logger: Logger = LogManager.getLogger(WebSocketConnection::class.java)

private fun constructUri(address: InetSocketAddress): URI =
URI.create("wss://${address.hostString}:${address.port}/cmsocket/")
private val logger = LogManager.getLogger(WebSocketConnection::class.java)
}

private val client = AtomicReference<WebSocketCMClient?>(null)
private val job: Job = SupervisorJob()

private var socketEndPoint: InetSocketAddress? = null
private var client: HttpClient? = null

override fun connect(endPoint: InetSocketAddress, timeout: Int) {
logger.debug("Connecting to $endPoint...")
private var session: WebSocketSession? = null

val serverUri = constructUri(endPoint)
val newClient = WebSocketCMClient(timeout, serverUri, this)
val oldClient = client.getAndSet(newClient)
private var endpoint: InetSocketAddress? = null

oldClient?.let { oldClient ->
logger.debug("Attempted to connect while already connected. Closing old connection...")
oldClient.close()
onDisconnected(false)
}
private var lastFrameTime = System.currentTimeMillis()

socketEndPoint = endPoint
override val coroutineContext: CoroutineContext = Dispatchers.IO + job

newClient.connect()
override fun connect(endPoint: InetSocketAddress, timeout: Int) {
launch {
logger.debug("Trying connection to ${endPoint.hostName}:${endPoint.port}")

try {
endpoint = endPoint

client = HttpClient(CIO) {
install(WebSockets) {
pingInterval = timeout.toDuration(DurationUnit.SECONDS)
}
}

val session = client?.webSocketSession {
url {
host = endPoint.hostName
port = endPoint.port
protocol = URLProtocol.WSS
path("cmsocket/")
}
}

this@WebSocketConnection.session = session

startConnectionMonitoring()

launch {
try {
session?.incoming?.consumeEach { frame ->
when (frame) {
is Frame.Binary -> {
// logger.debug("on Binary ${frame.data.size}")
lastFrameTime = System.currentTimeMillis()
onNetMsgReceived(NetMsgEventArgs(frame.readBytes(), currentEndPoint))
}

is Frame.Close -> disconnect(false)
is Frame.Ping -> logger.debug("Received pong")
is Frame.Pong -> logger.debug("Received pong")
is Frame.Text -> logger.debug("Received plain text ${frame.readText()}")
}
}
} catch (e: Exception) {
logger.error("An error occurred while receiving data", e)
disconnect(false)
}
}

logger.debug("Connected to ${endPoint.hostName}:${endPoint.port}")
onConnected()
} catch (e: Exception) {
logger.error("An error occurred setting up the web socket client", e)
disconnect(false)
}
}
}

override fun disconnect(userInitiated: Boolean) {
disconnectCore(userInitiated)
logger.debug("Disconnect called: $userInitiated")
launch {
try {
session?.close()
client?.close()
} finally {
session = null
client = null

job.cancelChildren()
}
}

onDisconnected(userInitiated)
}

override fun send(data: ByteArray) {
try {
client.get()?.send(data)
} catch (e: Exception) {
logger.debug("Exception while sending data", e)
disconnectCore(false)
launch {
try {
val frame = Frame.Binary(true, data)
session?.send(frame)
} catch (e: Exception) {
logger.error("An error occurred while sending data", e)
disconnect(false)
}
}
}

override fun getLocalIP(): InetAddress? = InetAddress.getByAddress(byteArrayOf(0, 0, 0, 0))
override fun getLocalIP(): InetAddress = InetAddress.getLocalHost()

override fun getCurrentEndPoint(): InetSocketAddress? = socketEndPoint
override fun getCurrentEndPoint(): InetSocketAddress? = endpoint

override fun getProtocolTypes(): ProtocolTypes = ProtocolTypes.WEB_SOCKET

private fun disconnectCore(userInitiated: Boolean) {
logger.debug("User initiated disconnection: $userInitiated")

val oldClient = client.getAndSet(null)
oldClient?.close()

onDisconnected(userInitiated)
/**
* Rudimentary watchdog
*/
private fun startConnectionMonitoring() {
launch {
while (isActive) {
if (client?.isActive == false || session?.isActive == false) {
logger.error("Client or Session is no longer active")
disconnect(userInitiated = false)
}

socketEndPoint = null
}
val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime

override fun onTextData(data: String) {
// Ignore string messages
logger.debug("Got string message: $data")
}

override fun onData(data: ByteArray) {
if (data.isNotEmpty()) {
onNetMsgReceived(NetMsgEventArgs(data, getCurrentEndPoint()))
}
}
// logger.debug("Watchdog status: $timeSinceLastFrame")
when {
timeSinceLastFrame > 30000 -> {
logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam")
disconnect(userInitiated = false)
break
}

override fun onClose(code: Int, reason: String) {
logger.debug("Connection closed")
}
timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds")

override fun onClosing(code: Int, reason: String) {
logger.debug("Closing connection: $code, reason: ${reason.ifEmpty { "No reason given" }}")
// Steam can close a connection if there is nothing else it wants to send.
// For example: AccountLoginDeniedNeedTwoFactor, InvalidPassword, etc.
disconnectCore(code == 1000)
}
timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds")

override fun onError(t: Throwable) {
logger.error("Error in websocket", t)
disconnectCore(false)
}
timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds")
}

override fun onOpen(response: Response) {
logger.debug("WebSocket connected to $socketEndPoint using TLS: ${response.handshake?.tlsVersion}")
onConnected()
delay(5000)
}
}
}
}
7 changes: 6 additions & 1 deletion src/main/java/in/dragonbra/javasteam/steam/CMClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public CMClient(SteamConfiguration configuration) {

this.configuration = configuration;

heartBeatFunc = new ScheduledFunction(() -> send(new ClientMsgProtobuf<CMsgClientHeartBeat.Builder>(CMsgClientHeartBeat.class, EMsg.ClientHeartBeat)), 5000);
heartBeatFunc = new ScheduledFunction(() -> {
var heartbeat = new ClientMsgProtobuf<CMsgClientHeartBeat.Builder>(
CMsgClientHeartBeat.class, EMsg.ClientHeartBeat);
heartbeat.getBody().setSendReply(true); // Ping Pong
send(heartbeat);
}, 5000);
}

/**
Expand Down
Loading

0 comments on commit c44c708

Please sign in to comment.