From 11c7c18a0ae775a47fad2c742947480439ae6e77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 08:55:12 +0200 Subject: [PATCH 1/7] =?UTF-8?q?Siirr=C3=A4=20tietokantalagin=20hakeminen?= =?UTF-8?q?=20uudelleenk=C3=A4ytett=C3=A4v=C3=A4=C3=A4n=20paikkaan?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/scala/fi/oph/koski/db/KoskiDatabase.scala | 12 ++++++++++++ .../oph/koski/ytr/download/YtrDownloadService.scala | 2 +- .../oph/koski/ytr/download/YtrDownloadStatus.scala | 8 -------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/main/scala/fi/oph/koski/db/KoskiDatabase.scala b/src/main/scala/fi/oph/koski/db/KoskiDatabase.scala index 3dde73b4ef..9d686920c0 100644 --- a/src/main/scala/fi/oph/koski/db/KoskiDatabase.scala +++ b/src/main/scala/fi/oph/koski/db/KoskiDatabase.scala @@ -1,8 +1,11 @@ package fi.oph.koski.db import com.typesafe.config.Config +import slick.jdbc.GetResult import slick.jdbc.PostgresProfile.api._ +import scala.concurrent.duration.{Duration, DurationInt} + object KoskiDatabase { def master(config: Config): KoskiDatabase = @@ -18,4 +21,13 @@ class KoskiDatabase(protected val config: DatabaseConfig, isReplica: Boolean) ex override protected lazy val dbSizeQuery = KoskiTables.KoskiOpiskeluOikeudet.length.result val isLocal: Boolean = config.isLocal + + def replayLag: Duration = + QueryMethods.runDbSync( + db, + sql"select extract(epoch from replay_lag) as replay_lag from pg_stat_replication".as[Double](GetResult(_.nextDouble)) + ).headOption + .map(_.toInt) + .getOrElse(0) + .seconds } diff --git a/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadService.scala b/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadService.scala index 65dfd40665..a24ddec361 100644 --- a/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadService.scala +++ b/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadService.scala @@ -50,7 +50,7 @@ class YtrDownloadService( var tooMuchLagOnLastCheck = false val task = new java.util.TimerTask { def run() = { - val replayLag = status.getReplayLagSeconds + val replayLag = application.replicaDatabase.replayLag.toSeconds if (replayLag > maxAllowedLagInSeconds) { logger.warn(s"Replay lag (${replayLag} s) is above threshold - will sleep ${longerSleepPerStudentInMs} ms between oppijas") extraSleepPerStudentInMs = longerSleepPerStudentInMs diff --git a/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadStatus.scala b/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadStatus.scala index 6abded85e8..88c2fe4a08 100644 --- a/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadStatus.scala +++ b/src/main/scala/fi/oph/koski/ytr/download/YtrDownloadStatus.scala @@ -54,14 +54,6 @@ class YtrDownloadStatus(val db: DB) extends QueryMethods with Logging with Datab .getOrElse(constructStatusJson("idle", None, 0, 0)) } - def getReplayLagSeconds: Int = { - runDbSync( - sql""" - select extract(epoch from replay_lag) as replay_lag from pg_stat_replication; - """.as[Double](GetResult(_.nextDouble)) - ).headOption.map(_.toInt).getOrElse(0) - } - private def rowById(id: Int): Option[YtrDownloadStatusRow] = { runDbSync(KoskiTables.YtrDownloadStatus.filter(_.id === id).result).headOption } From 9db9d7b6ebca796bbaf6fab56cc4f59e6e6e3d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 09:51:14 +0200 Subject: [PATCH 2/7] =?UTF-8?q?Pys=C3=A4yt=C3=A4=20uusien=20kyselyiden=20a?= =?UTF-8?q?loitus=20hetkeksi,=20jos=20replica=20on=20p=C3=A4=C3=A4kantaa?= =?UTF-8?q?=20liikaa=20j=C3=A4ljess=C3=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/reference.conf | 4 ++++ .../fi/oph/koski/queuedqueries/QueryScheduler.scala | 6 +++++- .../fi/oph/koski/queuedqueries/QueryService.scala | 11 ++++++++--- src/main/scala/fi/oph/koski/schedule/Scheduler.scala | 10 ++++++++++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 5d0e0c100b..cc22c9c631 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -260,6 +260,10 @@ kyselyt = { cluster = "koski-cluster" service = "koski" } + backpressureLimits = { + duration = "2m" + maxDatabaseReplayLag = "5m" + } } # kela = { diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala index 5d0c43ca49..6865bb3e9c 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala @@ -11,6 +11,7 @@ class QueryScheduler(application: KoskiApplication) extends Logging { val schedulerName = "kysely" val schedulerDb = application.masterDatabase.db val concurrency: Int = application.config.getInt("kyselyt.concurrency") + val backpressureDuration = application.config.getDuration("kyselyt.backpressureLimits.duration") val kyselyt: QueryService = application.kyselyService private var context: QuerySchedulerContext = QuerySchedulerContext( workerId = kyselyt.workerId, @@ -56,7 +57,10 @@ class QueryScheduler(application: KoskiApplication) extends Logging { private def runNextQuery(context: Option[JValue]): Option[JValue] = { if (context.flatMap(parseContext).exists(isQueryWorker)) { - if (kyselyt.numberOfRunningQueries < concurrency) { + if (kyselyt.systemIsOverloaded) { + logger.info(s"System is overloaded. Postponing running the next query for $backpressureDuration") + Scheduler.pauseForDuration(schedulerDb, schedulerName, backpressureDuration) + } else if (kyselyt.numberOfRunningQueries < concurrency) { kyselyt.runNext() } } diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala index 19da89b666..67b67e87b8 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala @@ -6,13 +6,14 @@ import fi.oph.koski.http.{HttpStatus, KoskiErrorCategory} import fi.oph.koski.koskiuser.KoskiSpecificSession import fi.oph.koski.log.Logging -import java.time.{Duration, LocalDateTime} import java.time.format.DateTimeFormatter +import java.time.{Duration, LocalDateTime} import java.util.UUID class QueryService(application: KoskiApplication) extends Logging { val workerId: String = application.ecsMetadata.taskARN.getOrElse("local") val metrics: CloudWatchMetricsService = CloudWatchMetricsService(application.config) + private val maxAllowedDatabaseReplayLag: Duration = application.config.getDuration("kyselyt.backpressureLimits.maxDatabaseReplayLag") private val queries = new QueryRepository( db = application.masterDatabase.db, @@ -100,7 +101,11 @@ class QueryService(application: KoskiApplication) extends Logging { def queueStalledFor(duration: Duration): Boolean = queries.queueStalledFor(duration) - def cancelAllTasks(reason: String) = queries.setRunningTasksFailed(reason) + def systemIsOverloaded: Boolean = { + application.replicaDatabase.replayLag.toSeconds > maxAllowedDatabaseReplayLag.toSeconds + } + + def cancelAllTasks(reason: String): Boolean = queries.setRunningTasksFailed(reason) private def logStart(query: RunningQuery): Unit = { logger.info(s"Starting new ${query.name} as user ${query.userOid}") @@ -113,7 +118,7 @@ class QueryService(application: KoskiApplication) extends Logging { metrics.putQueuedQueryMetric(QueryState.failed) } - def logCompletedQuery(query: RunningQuery, fileCount: Int): Unit = { + private def logCompletedQuery(query: RunningQuery, fileCount: Int): Unit = { logger.info(s"${query.name} completed with $fileCount result files") metrics.putQueuedQueryMetric(QueryState.complete) } diff --git a/src/main/scala/fi/oph/koski/schedule/Scheduler.scala b/src/main/scala/fi/oph/koski/schedule/Scheduler.scala index c3115f44fb..324d3637cd 100644 --- a/src/main/scala/fi/oph/koski/schedule/Scheduler.scala +++ b/src/main/scala/fi/oph/koski/schedule/Scheduler.scala @@ -128,6 +128,16 @@ object Scheduler { def setContext(db: DB, name: String, context: Option[JValue]): Boolean = QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.context).update(context)) > 0 + + def pauseForDuration(db: DB, name: String, duration: Duration): Boolean = { + val currentNextFireTime = QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.nextFireTime).result.headOption) + val postponedFireTime = Timestamp.valueOf(LocalDateTime.now().plus(duration)) + if (currentNextFireTime.isEmpty || currentNextFireTime.get.getTime < postponedFireTime.getTime) { + QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.nextFireTime).update(postponedFireTime)) > 0 + } else { + false + } + } } trait Schedule { From e60d2f5e79cf1aab8e045b9d4cf4b56006507364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 13:48:40 +0200 Subject: [PATCH 3/7] =?UTF-8?q?=C3=84l=C3=A4=20aloita=20uutta=20kysely?= =?UTF-8?q?=C3=A4,=20jos=20tietokannan=20EBS-bytebalance=20on=20alle=2050?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/reference.conf | 4 ++ .../cloudwatch/CloudWatchMetricsService.scala | 48 ++++++++++++++++++- .../koski/queuedqueries/QueryService.scala | 42 ++++++++++++++-- .../oph/koski/queuedqueries/QueryUtils.scala | 5 +- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index cc22c9c631..cf23cc6cb4 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -263,6 +263,10 @@ kyselyt = { backpressureLimits = { duration = "2m" maxDatabaseReplayLag = "5m" + ebsByteBalance = { + stopAt = 50 + continueAt = 90 + } } } diff --git a/src/main/scala/fi/oph/koski/cloudwatch/CloudWatchMetricsService.scala b/src/main/scala/fi/oph/koski/cloudwatch/CloudWatchMetricsService.scala index edef173cb6..a399deb88c 100644 --- a/src/main/scala/fi/oph/koski/cloudwatch/CloudWatchMetricsService.scala +++ b/src/main/scala/fi/oph/koski/cloudwatch/CloudWatchMetricsService.scala @@ -5,9 +5,12 @@ import com.typesafe.config.Config import fi.oph.koski.config.Environment import fi.oph.koski.log.Logging import software.amazon.awssdk.services.cloudwatch.CloudWatchClient -import software.amazon.awssdk.services.cloudwatch.model.{Dimension, MetricDatum, PutMetricDataRequest, StandardUnit} +import software.amazon.awssdk.services.cloudwatch.model._ import java.sql.Timestamp +import java.time.Instant +import scala.collection.JavaConverters._ +import scala.util.Try object CloudWatchMetricsService { def apply(config: Config): CloudWatchMetricsService = { @@ -24,6 +27,8 @@ trait CloudWatchMetricsService { def putQueuedQueryMetric(queryState: String): Unit + def getEbsByteBalance(databaseId: String): Option[Double] + protected def durationInSeconds(start: Timestamp, end: Timestamp): Double = (end.getTime - start.getTime) / 1000.0 } @@ -36,6 +41,11 @@ class MockCloudWatchMetricsService extends CloudWatchMetricsService with Logging def putQueuedQueryMetric(queryState: String): Unit = { logger.debug(s"Mocking cloudwatch metric: Queries -> State -> ${queryState.capitalize} with value 1.0 sent") } + + def getEbsByteBalance(databaseId: String): Option[Double] = { + logger.debug("getEbsByteBalance mock") + None + } } class AwsCloudWatchMetricsService extends CloudWatchMetricsService with Logging { @@ -106,4 +116,40 @@ class AwsCloudWatchMetricsService extends CloudWatchMetricsService with Logging client.putMetricData(request) } + def getEbsByteBalance(databaseId: String): Option[Double] = { + val endTime = Instant.now() + val startTime = endTime.minusSeconds(600) + + val dimension = Dimension.builder() + .name("DBInstanceIdentifier") + .value(databaseId) + .build() + + val metric = Metric.builder() + .namespace("AWS/RDS") + .metricName("EBSByteBalance%") + .dimensions(dimension) + .build() + + val metricStat = MetricStat.builder() + .metric(metric) + .stat("Average") + .period(60) + .unit(StandardUnit.PERCENT) + .build() + + val query = MetricDataQuery.builder() + .id("byteBalance") + .metricStat(metricStat) + .build() + + val request = GetMetricDataRequest.builder() + .metricDataQueries(query) + .startTime(startTime) + .endTime(endTime) + .build() + + val result = Try { client.getMetricData(request).metricDataResults().get(0) } + result.toOption.flatMap(_.values().asScala.headOption.map(_.doubleValue())) + } } diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala index 67b67e87b8..c00d71b0d7 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala @@ -1,5 +1,7 @@ package fi.oph.koski.queuedqueries +import fi.oph.koski.cache.GlobalCacheManager._ +import fi.oph.koski.cache.{RefreshingCache, SingleValueCache} import fi.oph.koski.cloudwatch.CloudWatchMetricsService import fi.oph.koski.config.KoskiApplication import fi.oph.koski.http.{HttpStatus, KoskiErrorCategory} @@ -9,11 +11,14 @@ import fi.oph.koski.log.Logging import java.time.format.DateTimeFormatter import java.time.{Duration, LocalDateTime} import java.util.UUID +import scala.concurrent.duration.DurationInt class QueryService(application: KoskiApplication) extends Logging { val workerId: String = application.ecsMetadata.taskARN.getOrElse("local") val metrics: CloudWatchMetricsService = CloudWatchMetricsService(application.config) private val maxAllowedDatabaseReplayLag: Duration = application.config.getDuration("kyselyt.backpressureLimits.maxDatabaseReplayLag") + private val readDatabaseId = QueryUtils.readDatabaseId(application.config) + private val databaseLoadLimiter = new DatabaseLoadLimiter(application, metrics, readDatabaseId) private val queries = new QueryRepository( db = application.masterDatabase.db, @@ -101,9 +106,8 @@ class QueryService(application: KoskiApplication) extends Logging { def queueStalledFor(duration: Duration): Boolean = queries.queueStalledFor(duration) - def systemIsOverloaded: Boolean = { - application.replicaDatabase.replayLag.toSeconds > maxAllowedDatabaseReplayLag.toSeconds - } + def systemIsOverloaded: Boolean = + (application.replicaDatabase.replayLag.toSeconds > maxAllowedDatabaseReplayLag.toSeconds) || databaseLoadLimiter.checkOverloading def cancelAllTasks(reason: String): Boolean = queries.setRunningTasksFailed(reason) @@ -123,3 +127,35 @@ class QueryService(application: KoskiApplication) extends Logging { metrics.putQueuedQueryMetric(QueryState.complete) } } + +class DatabaseLoadLimiter( + application: KoskiApplication, + metrics: CloudWatchMetricsService, + readDatabaseId: String, +) { + private val stopAt: Double = application.config.getDouble("kyselyt.backpressureLimits.ebsByteBalance.stopAt") + private val continueAt: Double = application.config.getDouble("kyselyt.backpressureLimits.ebsByteBalance.continueAt") + var limiterActive: Boolean = false + + def checkOverloading: Boolean = { + synchronized { + ebsByteBalance.apply.foreach { balance => + if (limiterActive) { + if (balance >= continueAt) { + limiterActive = false + } + } else { + if (balance <= stopAt) { + limiterActive = true + } + } + } + limiterActive + } + } + + private val ebsByteBalance = SingleValueCache( + RefreshingCache(name = "DatabaseLoadLimiter.ebsByteBalance", duration = 1.minutes, maxSize = 2), + () => metrics.getEbsByteBalance(readDatabaseId) + ) +} diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryUtils.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryUtils.scala index 8a8732c225..445186d49f 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryUtils.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryUtils.scala @@ -1,5 +1,6 @@ package fi.oph.koski.queuedqueries +import com.typesafe.config.Config import fi.oph.koski.config.{Environment, KoskiApplication} import fi.oph.koski.http.{HttpStatus, KoskiErrorCategory} import fi.oph.koski.koskiuser.KoskiSpecificSession @@ -13,9 +14,11 @@ import scala.jdk.CollectionConverters._ import scala.util.Using object QueryUtils { + def readDatabaseId(config: Config): String = config.getString("kyselyt.readDatabase") + def isQueryWorker(application: KoskiApplication): Boolean = { val instanceAz = application.ecsMetadata.availabilityZone - val databaseAz = getDatabaseAz(application, application.config.getString("kyselyt.readDatabase")) + val databaseAz = getDatabaseAz(application, readDatabaseId(application.config)) (instanceAz, databaseAz) match { case (None, None) => true // Lokaali devausinstanssi From 4ce6a9cac5ad968e3ac859f8de2affcaa1ee8c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 13:49:13 +0200 Subject: [PATCH 4/7] =?UTF-8?q?=C3=84l=C3=A4=20kysele=20j=C3=A4rjestelm?= =?UTF-8?q?=C3=A4n=20ylikuormitustilaa,=20jos=20ei=20ole=20uutta=20kysely?= =?UTF-8?q?=C3=A4=20jonossa?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fi/oph/koski/queuedqueries/QueryRepository.scala | 7 +++++++ .../fi/oph/koski/queuedqueries/QueryScheduler.scala | 12 +++++++----- .../fi/oph/koski/queuedqueries/QueryService.scala | 2 ++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala index 79bba5580c..321ecc678c 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala @@ -66,6 +66,13 @@ class QueryRepository( AND worker = $workerId """.as[Int]).head + def numberOfPendingQueries: Int = + runDbSync(sql""" + SELECT count(*) + FROM kysely + WHERE state = ${QueryState.pending} + """.as[Int]).head + def takeNext: Option[RunningQuery] = runDbSync(sql""" UPDATE kysely diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala index 6865bb3e9c..579c79085b 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala @@ -57,11 +57,13 @@ class QueryScheduler(application: KoskiApplication) extends Logging { private def runNextQuery(context: Option[JValue]): Option[JValue] = { if (context.flatMap(parseContext).exists(isQueryWorker)) { - if (kyselyt.systemIsOverloaded) { - logger.info(s"System is overloaded. Postponing running the next query for $backpressureDuration") - Scheduler.pauseForDuration(schedulerDb, schedulerName, backpressureDuration) - } else if (kyselyt.numberOfRunningQueries < concurrency) { - kyselyt.runNext() + if (kyselyt.hasNext) { + if (kyselyt.systemIsOverloaded) { + logger.info(s"System is overloaded. Postponing running the next query for $backpressureDuration") + Scheduler.pauseForDuration(schedulerDb, schedulerName, backpressureDuration) + } else if (kyselyt.numberOfRunningQueries < concurrency) { + kyselyt.runNext() + } } } None // QueryScheduler päivitä kontekstia vain käynnistyessään diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala index c00d71b0d7..5241b152ec 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala @@ -46,6 +46,8 @@ class QueryService(application: KoskiApplication) extends Logging { def numberOfRunningQueries: Int = queries.numberOfRunningQueries + def hasNext: Boolean = queries.numberOfPendingQueries > 0 + def runNext(): Unit = { queries.takeNext.foreach { query => query.getSession(application.käyttöoikeusRepository).fold { From 75b8142bf56893c49c43f4ee210030d1cefe2f64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 13:51:29 +0200 Subject: [PATCH 5/7] =?UTF-8?q?Refaktoroi=20siistemm=C3=A4ksi?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/fi/oph/koski/queuedqueries/QueryRepository.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala index 321ecc678c..6301a761ef 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala @@ -175,8 +175,7 @@ class QueryRepository( } def queueStalledFor(duration: Duration): Boolean = { - val runningTasks = runDbSync(sql"SELECT COUNT(*) FROM kysely WHERE state = ${QueryState.running}".as[Int]).head - if (runningTasks > 0) { + if (numberOfRunningQueries > 0) { false } else { val timeLimit = Timestamp.valueOf(LocalDateTime.now().minus(duration)) From e6a008d3749e0b45c043336301ad5ede9fb20224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 14:29:44 +0200 Subject: [PATCH 6/7] Korjaa virheellisesti orpoutuvat taskit --- .../scala/fi/oph/koski/queuedqueries/QueryRepository.scala | 2 +- src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala index 6301a761ef..c3e767b650 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala @@ -160,7 +160,7 @@ class QueryRepository( SELECT * FROM kysely WHERE state = ${QueryState.running} - AND worker <> any($koskiInstances) + AND NOT worker = any($koskiInstances) """.as[Query]) .collect { case q: RunningQuery => q } diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala index 5241b152ec..db69089199 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala @@ -88,6 +88,9 @@ class QueryService(application: KoskiApplication) extends Logging { def cleanup(): Unit = { val timeout = application.config.getDuration("kyselyt.timeout") + val instances = application.ecsMetadata.currentlyRunningKoskiInstances + logger.info(s"Check orphaned using instance list: $instances") + queries .findOrphanedQueries(application.ecsMetadata.currentlyRunningKoskiInstances) .foreach { query => @@ -96,7 +99,7 @@ class QueryService(application: KoskiApplication) extends Logging { logger.warn(s"Orphaned query (${query.name}) detected and cancelled after ${query.restartCount} restarts") } else { if (queries.restart(query, s"Orphaned ${LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}")) { - logger.warn(s"Orphaned query (${query.name}) detected and it has been set back to pending state") + logger.warn(s"Orphaned query (${query.name}) detected and it has been set back to pending state $query") } } } From 0d065b0c5995456d74bd83016e56f94feee3bb97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ilkka=20H=C3=A4nninen?= Date: Fri, 15 Mar 2024 15:41:54 +0200 Subject: [PATCH 7/7] Vapauta skeduleri lukosta --- .../queuedqueries/QueryCleanupScheduler.scala | 26 +++++++++++++------ .../koski/queuedqueries/QueryRepository.scala | 12 +-------- .../koski/queuedqueries/QueryScheduler.scala | 2 ++ .../koski/queuedqueries/QueryService.scala | 9 ++----- .../fi/oph/koski/schedule/Scheduler.scala | 3 +++ 5 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryCleanupScheduler.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryCleanupScheduler.scala index c630f5183e..38cd700964 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryCleanupScheduler.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryCleanupScheduler.scala @@ -22,12 +22,11 @@ class QueryCleanupScheduler(application: KoskiApplication) extends Logging { } private def runNextQuery(_ignore: Option[JValue]): Option[JValue] = { - kyselyt.cleanup() + val instances = application.ecsMetadata.currentlyRunningKoskiInstances - if (kyselyt.queueStalledFor(Duration.ofMinutes(1)) && !queryWorkerIsAlive) { - logger.warn("Query worker is missing. Promoting this instance to process the queue.") - takeover() - } + fixSchedulerLock(instances) + kyselyt.cleanup(instances) + takeoverIfQueryWorkerIsMissing() None } @@ -38,8 +37,19 @@ class QueryCleanupScheduler(application: KoskiApplication) extends Logging { context.map(_.workerId).exists(instances.contains) } - private def takeover(): Unit = { - application.kyselyScheduler.promote(true) - application.scheduledTasks.restartKyselyScheduler() + private def takeoverIfQueryWorkerIsMissing(): Unit = { + if (!queryWorkerIsAlive) { + logger.warn("Query worker is missing. Promoting this instance to process the queue.") + application.kyselyScheduler.promote(true) + application.scheduledTasks.restartKyselyScheduler() + } + } + + private def fixSchedulerLock(instances: Seq[String]): Unit = { + val context = application.kyselyScheduler.getContext + if (!context.exists(ctx => instances.contains(ctx.workerId))) { + logger.info("Query worker in scheduler does not exist. Releasing the scheduler lock.") + application.kyselyScheduler.resolveLock() + } } } diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala index c3e767b650..122cceb8ac 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryRepository.scala @@ -173,17 +173,7 @@ class QueryRepository( RETURNING meta """.as[QueryMeta]).head } - - def queueStalledFor(duration: Duration): Boolean = { - if (numberOfRunningQueries > 0) { - false - } else { - val timeLimit = Timestamp.valueOf(LocalDateTime.now().minus(duration)) - val pendingTasks = runDbSync(sql"SELECT COUNT(*) FROM kysely WHERE state = ${QueryState.pending} AND created_at < $timeLimit".as[Int]).head - pendingTasks > 0 - } - } - + implicit private val getQueryResult: GetResult[Query] = GetResult[Query] { r => val id = r.rs.getString("id") val userOid = r.rs.getString("user_oid") diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala index 579c79085b..f71d9b2d3c 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryScheduler.scala @@ -51,6 +51,8 @@ class QueryScheduler(application: KoskiApplication) extends Logging { overrideContext() } + def resolveLock(): Boolean = Scheduler.resolveLock(schedulerDb, schedulerName) + private def overrideContext(): Unit = { Scheduler.setContext(schedulerDb, schedulerName, Some(context.asJson)) } diff --git a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala index db69089199..2f57358d01 100644 --- a/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala +++ b/src/main/scala/fi/oph/koski/queuedqueries/QueryService.scala @@ -85,14 +85,11 @@ class QueryService(application: KoskiApplication) extends Logging { } } - def cleanup(): Unit = { + def cleanup(koskiInstances: Seq[String]): Unit = { val timeout = application.config.getDuration("kyselyt.timeout") - val instances = application.ecsMetadata.currentlyRunningKoskiInstances - logger.info(s"Check orphaned using instance list: $instances") - queries - .findOrphanedQueries(application.ecsMetadata.currentlyRunningKoskiInstances) + .findOrphanedQueries(koskiInstances) .foreach { query => if (query.restartCount >= 3) { queries.setFailed(query.queryId, "Orphaned") @@ -109,8 +106,6 @@ class QueryService(application: KoskiApplication) extends Logging { .foreach(query => logger.error(s"${query.name} timeouted after $timeout")) } - def queueStalledFor(duration: Duration): Boolean = queries.queueStalledFor(duration) - def systemIsOverloaded: Boolean = (application.replicaDatabase.replayLag.toSeconds > maxAllowedDatabaseReplayLag.toSeconds) || databaseLoadLimiter.checkOverloading diff --git a/src/main/scala/fi/oph/koski/schedule/Scheduler.scala b/src/main/scala/fi/oph/koski/schedule/Scheduler.scala index 324d3637cd..7f28efc728 100644 --- a/src/main/scala/fi/oph/koski/schedule/Scheduler.scala +++ b/src/main/scala/fi/oph/koski/schedule/Scheduler.scala @@ -138,6 +138,9 @@ object Scheduler { false } } + + def resolveLock(db: DB, name: String): Boolean = + QueryMethods.runDbSync(db, KoskiTables.Scheduler.filter(_.name === name).map(_.status).update(ScheduledTaskStatus.scheduled)) > 0 } trait Schedule {