From 7c0862cd3ece1520b43f316be65a3443e92abaaf Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Mon, 9 Sep 2024 12:26:20 +0200 Subject: [PATCH 1/4] Implement HasDocId type class Seperate functionality of unique function from ChangeStreamDocument --- .../ingestor/src/main/scala/HasDocId.scala | 18 ++++++++++++++++++ modules/ingestor/src/main/scala/package.scala | 19 ++++--------------- 2 files changed, 22 insertions(+), 15 deletions(-) create mode 100644 modules/ingestor/src/main/scala/HasDocId.scala diff --git a/modules/ingestor/src/main/scala/HasDocId.scala b/modules/ingestor/src/main/scala/HasDocId.scala new file mode 100644 index 00000000..52d9b2f3 --- /dev/null +++ b/modules/ingestor/src/main/scala/HasDocId.scala @@ -0,0 +1,18 @@ +package lila.search +package ingestor + +trait HasDocId[A]: + extension (a: A) def docId: Option[String] + extension (xs: List[A]) + /** + * Returns a list of distinct changes by their document id in the reverse order they appear in the input + * list. If a change has no document id, We ignore it. + */ + def unique: List[A] = + xs + .foldRight(List.empty[A] -> Set.empty) { case (change, p @ (acc, ids)) => + if change.docId.exists(!ids.contains(_)) + then (change :: acc) -> (ids + id) + else p + } + ._1 diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index 52418ded..44a86afb 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -22,21 +22,10 @@ val _id = "_id" type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]] -extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id) - -extension [A](changes: List[ChangeStreamDocument[A]]) - /** - * Returns a list of distinct changes by their document id in the reverse order they appear in the input - * list. If a change has no document id, We ignore it. - */ - def unique: List[ChangeStreamDocument[A]] = - changes - .foldRight(List.empty[ChangeStreamDocument[A]] -> Set.empty) { case (change, p @ (acc, ids)) => - if change.docId.exists(!ids.contains(_)) - then (change :: acc) -> (ids + id) - else p - } - ._1 +given [A]: HasDocId[ChangeStreamDocument[A]] with + extension (change: ChangeStreamDocument[A]) + def docId: Option[String] = + change.documentKey.flatMap(_.id) extension (doc: Document) private def id: Option[String] = From 7df0cff6b0f6a1f8dadd99c6d2bbd72559f5616a Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Mon, 9 Sep 2024 19:58:18 +0200 Subject: [PATCH 2/4] Fix and rename unique to distinctByDocId --- modules/ingestor/src/main/scala/HasDocId.scala | 10 +++++----- modules/ingestor/src/main/scala/ingestor.forum.scala | 2 +- modules/ingestor/src/main/scala/ingestor.game.scala | 2 +- modules/ingestor/src/main/scala/ingestor.team.scala | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/ingestor/src/main/scala/HasDocId.scala b/modules/ingestor/src/main/scala/HasDocId.scala index 52d9b2f3..85ce140b 100644 --- a/modules/ingestor/src/main/scala/HasDocId.scala +++ b/modules/ingestor/src/main/scala/HasDocId.scala @@ -8,11 +8,11 @@ trait HasDocId[A]: * Returns a list of distinct changes by their document id in the reverse order they appear in the input * list. If a change has no document id, We ignore it. */ - def unique: List[A] = + def distincByDocId: List[A] = xs - .foldRight(List.empty[A] -> Set.empty) { case (change, p @ (acc, ids)) => - if change.docId.exists(!ids.contains(_)) - then (change :: acc) -> (ids + id) - else p + .foldRight(List.empty[A] -> Set.empty[String]) { case (change, p @ (acc, ids)) => + change.docId.fold(p) { id => + ids.contains(id).fold(p, (change :: acc) -> (ids + id)) + } } ._1 diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index 7cf2a176..d94092a6 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -124,7 +124,7 @@ object ForumIngestor: .drop(skip) .groupWithin(config.batchSize, config.timeWindows.second) .evalTap(_.traverse_(x => debug"received $x")) - .map(_.toList.unique) + .map(_.toList.distincByDocId) private type SourceWithId = (String, ForumSource) diff --git a/modules/ingestor/src/main/scala/ingestor.game.scala b/modules/ingestor/src/main/scala/ingestor.game.scala index 76afe8b8..f523ffda 100644 --- a/modules/ingestor/src/main/scala/ingestor.game.scala +++ b/modules/ingestor/src/main/scala/ingestor.game.scala @@ -131,7 +131,7 @@ object GameIngestor: info"Received $x without p0 or p1 fields".whenA(x.fullDocument.exists(_.shouldDebug)) ) ) - .map(_.toList.unique) + .map(_.toList.distincByDocId) .evalTap(_.traverse_(x => x.fullDocument.traverse_(x => debug"${x.debug}"))) private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index 2cb49f95..ce6a534c 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -87,7 +87,7 @@ object TeamIngestor: .drop(skip) .evalTap(x => debug"Team change stream event: $x") .groupWithin(config.batchSize, config.timeWindows.second) - .map(_.toList.unique) + .map(_.toList.distincByDocId) extension (docs: List[Document]) private def toSources: List[(String, TeamSource)] = From 38bdbba49cccbfe021cf230029ad731dca540d8d Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Mon, 9 Sep 2024 19:58:40 +0200 Subject: [PATCH 3/4] Add tests for distinctByDocId --- build.sbt | 1 + .../src/test/scala/HasDocIdTest.scala | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 modules/ingestor/src/test/scala/HasDocIdTest.scala diff --git a/build.sbt b/build.sbt index f8a5d853..e4b108f8 100644 --- a/build.sbt +++ b/build.sbt @@ -74,6 +74,7 @@ lazy val ingestor = (project in file("modules/ingestor")) log4Cats, logback, weaver, + weaverScalaCheck, testContainers ), Compile / run / fork := true diff --git a/modules/ingestor/src/test/scala/HasDocIdTest.scala b/modules/ingestor/src/test/scala/HasDocIdTest.scala new file mode 100644 index 00000000..4b100c8e --- /dev/null +++ b/modules/ingestor/src/test/scala/HasDocIdTest.scala @@ -0,0 +1,51 @@ +package lila.search +package ingestor + +import cats.Show +import cats.effect.IO +import cats.syntax.all.* +import org.scalacheck.{ Arbitrary, Gen } +import weaver.* +import weaver.scalacheck.* + +object HasDocIdTest extends SimpleIOSuite with Checkers: + + case class Change(value: Int, docId: Option[String]) + given HasDocId[Change] with + extension (a: Change) def docId: Option[String] = a.docId + + given Show[Change] = Show.fromToString + given Arbitrary[Change] = Arbitrary { + for + value <- Gen.posNum[Int] + docId <- Gen.option(Gen.alphaNumStr) + yield Change(value, docId) + } + + test("distincByDocId is empty when input is empty"): + val changes = List.empty[Change] + val result = changes.distincByDocId + IO(expect(List.empty[Option[String]] == result)) + + test("distincByDocId is empty when all docIds are none"): + forall { (changes: List[Change]) => + val xs = changes.map(_.copy(docId = none)) + IO(expect(xs.distincByDocId.isEmpty)) + } + + test("distincByDocId contains only item with defined docId"): + forall { (changes: List[Change]) => + IO(expect(changes.distincByDocId.forall(_.docId.isDefined))) + } + + test("distincByDocId is idempotent"): + forall { (changes: List[Change]) => + IO(expect(changes.distincByDocId == changes.distincByDocId.distincByDocId)) + } + + test("distincByDocId == reverse.distincBy.reverse"): + forall { (changes: List[Change]) => + val result = changes.distincByDocId + val doubleReversed = changes.reverse.filter(_.docId.isDefined).distinctBy(_.docId).reverse + IO(expect(result == doubleReversed)) + } From 14d90546340e0ed0fd7ffc67ec8df283f25df5de Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Mon, 9 Sep 2024 23:25:44 +0200 Subject: [PATCH 4/4] fewer braces --- .../src/test/scala/HasDocIdTest.scala | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/modules/ingestor/src/test/scala/HasDocIdTest.scala b/modules/ingestor/src/test/scala/HasDocIdTest.scala index 4b100c8e..016dd6e8 100644 --- a/modules/ingestor/src/test/scala/HasDocIdTest.scala +++ b/modules/ingestor/src/test/scala/HasDocIdTest.scala @@ -15,12 +15,11 @@ object HasDocIdTest extends SimpleIOSuite with Checkers: extension (a: Change) def docId: Option[String] = a.docId given Show[Change] = Show.fromToString - given Arbitrary[Change] = Arbitrary { + given Arbitrary[Change] = Arbitrary: for value <- Gen.posNum[Int] docId <- Gen.option(Gen.alphaNumStr) yield Change(value, docId) - } test("distincByDocId is empty when input is empty"): val changes = List.empty[Change] @@ -28,24 +27,20 @@ object HasDocIdTest extends SimpleIOSuite with Checkers: IO(expect(List.empty[Option[String]] == result)) test("distincByDocId is empty when all docIds are none"): - forall { (changes: List[Change]) => + forall: (changes: List[Change]) => val xs = changes.map(_.copy(docId = none)) - IO(expect(xs.distincByDocId.isEmpty)) - } + expect(xs.distincByDocId.isEmpty) test("distincByDocId contains only item with defined docId"): - forall { (changes: List[Change]) => - IO(expect(changes.distincByDocId.forall(_.docId.isDefined))) - } + forall: (changes: List[Change]) => + expect(changes.distincByDocId.forall(_.docId.isDefined)) test("distincByDocId is idempotent"): - forall { (changes: List[Change]) => - IO(expect(changes.distincByDocId == changes.distincByDocId.distincByDocId)) - } + forall: (changes: List[Change]) => + expect(changes.distincByDocId == changes.distincByDocId.distincByDocId) test("distincByDocId == reverse.distincBy.reverse"): - forall { (changes: List[Change]) => + forall: (changes: List[Change]) => val result = changes.distincByDocId val doubleReversed = changes.reverse.filter(_.docId.isDefined).distinctBy(_.docId).reverse - IO(expect(result == doubleReversed)) - } + expect(result == doubleReversed)