Skip to content

Commit

Permalink
Merge pull request #2892 from Opetushallitus/tor-2141-jonojarru
Browse files Browse the repository at this point in the history
TOR-2141: Pysäytä jonon käsittely, jos tietokanta tukehtumassa
  • Loading branch information
ilkkahanninen authored Mar 18, 2024
2 parents 61afa60 + 0d065b0 commit 7c1fefc
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 41 deletions.
8 changes: 8 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ kyselyt = {
cluster = "koski-cluster"
service = "koski"
}
backpressureLimits = {
duration = "2m"
maxDatabaseReplayLag = "5m"
ebsByteBalance = {
stopAt = 50
continueAt = 90
}
}
}

# kela = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import com.typesafe.config.Config
import fi.oph.koski.config.Environment
import fi.oph.koski.log.Logging
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient
import software.amazon.awssdk.services.cloudwatch.model.{Dimension, MetricDatum, PutMetricDataRequest, StandardUnit}
import software.amazon.awssdk.services.cloudwatch.model._

import java.sql.Timestamp
import java.time.Instant
import scala.collection.JavaConverters._
import scala.util.Try

object CloudWatchMetricsService {
def apply(config: Config): CloudWatchMetricsService = {
Expand All @@ -24,6 +27,8 @@ trait CloudWatchMetricsService {

def putQueuedQueryMetric(queryState: String): Unit

def getEbsByteBalance(databaseId: String): Option[Double]

protected def durationInSeconds(start: Timestamp, end: Timestamp): Double = (end.getTime - start.getTime) / 1000.0
}

Expand All @@ -36,6 +41,11 @@ class MockCloudWatchMetricsService extends CloudWatchMetricsService with Logging
def putQueuedQueryMetric(queryState: String): Unit = {
logger.debug(s"Mocking cloudwatch metric: Queries -> State -> ${queryState.capitalize} with value 1.0 sent")
}

def getEbsByteBalance(databaseId: String): Option[Double] = {
logger.debug("getEbsByteBalance mock")
None
}
}

class AwsCloudWatchMetricsService extends CloudWatchMetricsService with Logging {
Expand Down Expand Up @@ -106,4 +116,40 @@ class AwsCloudWatchMetricsService extends CloudWatchMetricsService with Logging
client.putMetricData(request)
}

def getEbsByteBalance(databaseId: String): Option[Double] = {
val endTime = Instant.now()
val startTime = endTime.minusSeconds(600)

val dimension = Dimension.builder()
.name("DBInstanceIdentifier")
.value(databaseId)
.build()

val metric = Metric.builder()
.namespace("AWS/RDS")
.metricName("EBSByteBalance%")
.dimensions(dimension)
.build()

val metricStat = MetricStat.builder()
.metric(metric)
.stat("Average")
.period(60)
.unit(StandardUnit.PERCENT)
.build()

val query = MetricDataQuery.builder()
.id("byteBalance")
.metricStat(metricStat)
.build()

val request = GetMetricDataRequest.builder()
.metricDataQueries(query)
.startTime(startTime)
.endTime(endTime)
.build()

val result = Try { client.getMetricData(request).metricDataResults().get(0) }
result.toOption.flatMap(_.values().asScala.headOption.map(_.doubleValue()))
}
}
12 changes: 12 additions & 0 deletions src/main/scala/fi/oph/koski/db/KoskiDatabase.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package fi.oph.koski.db

import com.typesafe.config.Config
import slick.jdbc.GetResult
import slick.jdbc.PostgresProfile.api._

import scala.concurrent.duration.{Duration, DurationInt}


object KoskiDatabase {
def master(config: Config): KoskiDatabase =
Expand All @@ -18,4 +21,13 @@ class KoskiDatabase(protected val config: DatabaseConfig, isReplica: Boolean) ex
override protected lazy val dbSizeQuery = KoskiTables.KoskiOpiskeluOikeudet.length.result

val isLocal: Boolean = config.isLocal

def replayLag: Duration =
QueryMethods.runDbSync(
db,
sql"select extract(epoch from replay_lag) as replay_lag from pg_stat_replication".as[Double](GetResult(_.nextDouble))
).headOption
.map(_.toInt)
.getOrElse(0)
.seconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ class QueryCleanupScheduler(application: KoskiApplication) extends Logging {
}

private def runNextQuery(_ignore: Option[JValue]): Option[JValue] = {
kyselyt.cleanup()
val instances = application.ecsMetadata.currentlyRunningKoskiInstances

if (kyselyt.queueStalledFor(Duration.ofMinutes(1)) && !queryWorkerIsAlive) {
logger.warn("Query worker is missing. Promoting this instance to process the queue.")
takeover()
}
fixSchedulerLock(instances)
kyselyt.cleanup(instances)
takeoverIfQueryWorkerIsMissing()

None
}
Expand All @@ -38,8 +37,19 @@ class QueryCleanupScheduler(application: KoskiApplication) extends Logging {
context.map(_.workerId).exists(instances.contains)
}

private def takeover(): Unit = {
application.kyselyScheduler.promote(true)
application.scheduledTasks.restartKyselyScheduler()
private def takeoverIfQueryWorkerIsMissing(): Unit = {
if (!queryWorkerIsAlive) {
logger.warn("Query worker is missing. Promoting this instance to process the queue.")
application.kyselyScheduler.promote(true)
application.scheduledTasks.restartKyselyScheduler()
}
}

private def fixSchedulerLock(instances: Seq[String]): Unit = {
val context = application.kyselyScheduler.getContext
if (!context.exists(ctx => instances.contains(ctx.workerId))) {
logger.info("Query worker in scheduler does not exist. Releasing the scheduler lock.")
application.kyselyScheduler.resolveLock()
}
}
}
22 changes: 9 additions & 13 deletions src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ class QueryRepository(
AND worker = $workerId
""".as[Int]).head

def numberOfPendingQueries: Int =
runDbSync(sql"""
SELECT count(*)
FROM kysely
WHERE state = ${QueryState.pending}
""".as[Int]).head

def takeNext: Option[RunningQuery] =
runDbSync(sql"""
UPDATE kysely
Expand Down Expand Up @@ -153,7 +160,7 @@ class QueryRepository(
SELECT *
FROM kysely
WHERE state = ${QueryState.running}
AND worker <> any($koskiInstances)
AND NOT worker = any($koskiInstances)
""".as[Query])
.collect { case q: RunningQuery => q }

Expand All @@ -166,18 +173,7 @@ class QueryRepository(
RETURNING meta
""".as[QueryMeta]).head
}

def queueStalledFor(duration: Duration): Boolean = {
val runningTasks = runDbSync(sql"SELECT COUNT(*) FROM kysely WHERE state = ${QueryState.running}".as[Int]).head
if (runningTasks > 0) {
false
} else {
val timeLimit = Timestamp.valueOf(LocalDateTime.now().minus(duration))
val pendingTasks = runDbSync(sql"SELECT COUNT(*) FROM kysely WHERE state = ${QueryState.pending} AND created_at < $timeLimit".as[Int]).head
pendingTasks > 0
}
}


implicit private val getQueryResult: GetResult[Query] = GetResult[Query] { r =>
val id = r.rs.getString("id")
val userOid = r.rs.getString("user_oid")
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class QueryScheduler(application: KoskiApplication) extends Logging {
val schedulerName = "kysely"
val schedulerDb = application.masterDatabase.db
val concurrency: Int = application.config.getInt("kyselyt.concurrency")
val backpressureDuration = application.config.getDuration("kyselyt.backpressureLimits.duration")
val kyselyt: QueryService = application.kyselyService
private var context: QuerySchedulerContext = QuerySchedulerContext(
workerId = kyselyt.workerId,
Expand Down Expand Up @@ -50,14 +51,21 @@ class QueryScheduler(application: KoskiApplication) extends Logging {
overrideContext()
}

def resolveLock(): Boolean = Scheduler.resolveLock(schedulerDb, schedulerName)

private def overrideContext(): Unit = {
Scheduler.setContext(schedulerDb, schedulerName, Some(context.asJson))
}

private def runNextQuery(context: Option[JValue]): Option[JValue] = {
if (context.flatMap(parseContext).exists(isQueryWorker)) {
if (kyselyt.numberOfRunningQueries < concurrency) {
kyselyt.runNext()
if (kyselyt.hasNext) {
if (kyselyt.systemIsOverloaded) {
logger.info(s"System is overloaded. Postponing running the next query for $backpressureDuration")
Scheduler.pauseForDuration(schedulerDb, schedulerName, backpressureDuration)
} else if (kyselyt.numberOfRunningQueries < concurrency) {
kyselyt.runNext()
}
}
}
None // QueryScheduler päivitä kontekstia vain käynnistyessään
Expand Down
55 changes: 48 additions & 7 deletions src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package fi.oph.koski.queuedqueries

import fi.oph.koski.cache.GlobalCacheManager._
import fi.oph.koski.cache.{RefreshingCache, SingleValueCache}
import fi.oph.koski.cloudwatch.CloudWatchMetricsService
import fi.oph.koski.config.KoskiApplication
import fi.oph.koski.http.{HttpStatus, KoskiErrorCategory}
import fi.oph.koski.koskiuser.KoskiSpecificSession
import fi.oph.koski.log.Logging

import java.time.{Duration, LocalDateTime}
import java.time.format.DateTimeFormatter
import java.time.{Duration, LocalDateTime}
import java.util.UUID
import scala.concurrent.duration.DurationInt

class QueryService(application: KoskiApplication) extends Logging {
val workerId: String = application.ecsMetadata.taskARN.getOrElse("local")
val metrics: CloudWatchMetricsService = CloudWatchMetricsService(application.config)
private val maxAllowedDatabaseReplayLag: Duration = application.config.getDuration("kyselyt.backpressureLimits.maxDatabaseReplayLag")
private val readDatabaseId = QueryUtils.readDatabaseId(application.config)
private val databaseLoadLimiter = new DatabaseLoadLimiter(application, metrics, readDatabaseId)

private val queries = new QueryRepository(
db = application.masterDatabase.db,
Expand Down Expand Up @@ -40,6 +46,8 @@ class QueryService(application: KoskiApplication) extends Logging {

def numberOfRunningQueries: Int = queries.numberOfRunningQueries

def hasNext: Boolean = queries.numberOfPendingQueries > 0

def runNext(): Unit = {
queries.takeNext.foreach { query =>
query.getSession(application.käyttöoikeusRepository).fold {
Expand Down Expand Up @@ -77,18 +85,18 @@ class QueryService(application: KoskiApplication) extends Logging {
}
}

def cleanup(): Unit = {
def cleanup(koskiInstances: Seq[String]): Unit = {
val timeout = application.config.getDuration("kyselyt.timeout")

queries
.findOrphanedQueries(application.ecsMetadata.currentlyRunningKoskiInstances)
.findOrphanedQueries(koskiInstances)
.foreach { query =>
if (query.restartCount >= 3) {
queries.setFailed(query.queryId, "Orphaned")
logger.warn(s"Orphaned query (${query.name}) detected and cancelled after ${query.restartCount} restarts")
} else {
if (queries.restart(query, s"Orphaned ${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}")) {
logger.warn(s"Orphaned query (${query.name}) detected and it has been set back to pending state")
logger.warn(s"Orphaned query (${query.name}) detected and it has been set back to pending state $query")
}
}
}
Expand All @@ -98,9 +106,10 @@ class QueryService(application: KoskiApplication) extends Logging {
.foreach(query => logger.error(s"${query.name} timeouted after $timeout"))
}

def queueStalledFor(duration: Duration): Boolean = queries.queueStalledFor(duration)
def systemIsOverloaded: Boolean =
(application.replicaDatabase.replayLag.toSeconds > maxAllowedDatabaseReplayLag.toSeconds) || databaseLoadLimiter.checkOverloading

def cancelAllTasks(reason: String) = queries.setRunningTasksFailed(reason)
def cancelAllTasks(reason: String): Boolean = queries.setRunningTasksFailed(reason)

private def logStart(query: RunningQuery): Unit = {
logger.info(s"Starting new ${query.name} as user ${query.userOid}")
Expand All @@ -113,8 +122,40 @@ class QueryService(application: KoskiApplication) extends Logging {
metrics.putQueuedQueryMetric(QueryState.failed)
}

def logCompletedQuery(query: RunningQuery, fileCount: Int): Unit = {
private def logCompletedQuery(query: RunningQuery, fileCount: Int): Unit = {
logger.info(s"${query.name} completed with $fileCount result files")
metrics.putQueuedQueryMetric(QueryState.complete)
}
}

class DatabaseLoadLimiter(
application: KoskiApplication,
metrics: CloudWatchMetricsService,
readDatabaseId: String,
) {
private val stopAt: Double = application.config.getDouble("kyselyt.backpressureLimits.ebsByteBalance.stopAt")
private val continueAt: Double = application.config.getDouble("kyselyt.backpressureLimits.ebsByteBalance.continueAt")
var limiterActive: Boolean = false

def checkOverloading: Boolean = {
synchronized {
ebsByteBalance.apply.foreach { balance =>
if (limiterActive) {
if (balance >= continueAt) {
limiterActive = false
}
} else {
if (balance <= stopAt) {
limiterActive = true
}
}
}
limiterActive
}
}

private val ebsByteBalance = SingleValueCache(
RefreshingCache(name = "DatabaseLoadLimiter.ebsByteBalance", duration = 1.minutes, maxSize = 2),
() => metrics.getEbsByteBalance(readDatabaseId)
)
}
5 changes: 4 additions & 1 deletion src/main/scala/fi/oph/koski/queuedqueries/QueryUtils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package fi.oph.koski.queuedqueries

import com.typesafe.config.Config
import fi.oph.koski.config.{Environment, KoskiApplication}
import fi.oph.koski.http.{HttpStatus, KoskiErrorCategory}
import fi.oph.koski.koskiuser.KoskiSpecificSession
Expand All @@ -13,9 +14,11 @@ import scala.jdk.CollectionConverters._
import scala.util.Using

object QueryUtils {
def readDatabaseId(config: Config): String = config.getString("kyselyt.readDatabase")

def isQueryWorker(application: KoskiApplication): Boolean = {
val instanceAz = application.ecsMetadata.availabilityZone
val databaseAz = getDatabaseAz(application, application.config.getString("kyselyt.readDatabase"))
val databaseAz = getDatabaseAz(application, readDatabaseId(application.config))

(instanceAz, databaseAz) match {
case (None, None) => true // Lokaali devausinstanssi
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/fi/oph/koski/schedule/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ object Scheduler {

def setContext(db: DB, name: String, context: Option[JValue]): Boolean =
QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.context).update(context)) > 0

def pauseForDuration(db: DB, name: String, duration: Duration): Boolean = {
val currentNextFireTime = QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.nextFireTime).result.headOption)
val postponedFireTime = Timestamp.valueOf(LocalDateTime.now().plus(duration))
if (currentNextFireTime.isEmpty || currentNextFireTime.get.getTime < postponedFireTime.getTime) {
QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.nextFireTime).update(postponedFireTime)) > 0
} else {
false
}
}

def resolveLock(db: DB, name: String): Boolean =
QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.status).update(ScheduledTaskStatus.scheduled)) > 0
}

trait Schedule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class YtrDownloadService(
var tooMuchLagOnLastCheck = false
val task = new java.util.TimerTask {
def run() = {
val replayLag = status.getReplayLagSeconds
val replayLag = application.replicaDatabase.replayLag.toSeconds
if (replayLag > maxAllowedLagInSeconds) {
logger.warn(s"Replay lag (${replayLag} s) is above threshold - will sleep ${longerSleepPerStudentInMs} ms between oppijas")
extraSleepPerStudentInMs = longerSleepPerStudentInMs
Expand Down
Loading

0 comments on commit 7c1fefc

Please sign in to comment.