Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and implement tests for used to named unique now distincByDocId function #309

Merged
merged 4 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ lazy val ingestor = (project in file("modules/ingestor"))
log4Cats,
logback,
weaver,
weaverScalaCheck,
testContainers
),
Compile / run / fork := true
Expand Down
18 changes: 18 additions & 0 deletions modules/ingestor/src/main/scala/HasDocId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package lila.search
package ingestor

trait HasDocId[A]:
extension (a: A) def docId: Option[String]
extension (xs: List[A])
/**
* Returns a list of distinct changes by their document id in the reverse order they appear in the input
* list. If a change has no document id, We ignore it.
*/
def distincByDocId: List[A] =
xs
.foldRight(List.empty[A] -> Set.empty[String]) { case (change, p @ (acc, ids)) =>
change.docId.fold(p) { id =>
ids.contains(id).fold(p, (change :: acc) -> (ids + id))
}
}
._1
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ object ForumIngestor:
.drop(skip)
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList.unique)
.map(_.toList.distincByDocId)

private type SourceWithId = (String, ForumSource)

Expand Down
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/ingestor.game.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ object GameIngestor:
info"Received $x without p0 or p1 fields".whenA(x.fullDocument.exists(_.shouldDebug))
)
)
.map(_.toList.unique)
.map(_.toList.distincByDocId)
.evalTap(_.traverse_(x => x.fullDocument.traverse_(x => debug"${x.debug}")))

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
Expand Down
2 changes: 1 addition & 1 deletion modules/ingestor/src/main/scala/ingestor.team.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object TeamIngestor:
.drop(skip)
.evalTap(x => debug"Team change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList.unique)
.map(_.toList.distincByDocId)

extension (docs: List[Document])
private def toSources: List[(String, TeamSource)] =
Expand Down
19 changes: 4 additions & 15 deletions modules/ingestor/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,10 @@ val _id = "_id"

type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id)

extension [A](changes: List[ChangeStreamDocument[A]])
/**
* Returns a list of distinct changes by their document id in the reverse order they appear in the input
* list. If a change has no document id, We ignore it.
*/
def unique: List[ChangeStreamDocument[A]] =
changes
.foldRight(List.empty[ChangeStreamDocument[A]] -> Set.empty) { case (change, p @ (acc, ids)) =>
if change.docId.exists(!ids.contains(_))
then (change :: acc) -> (ids + id)
else p
}
._1
given [A]: HasDocId[ChangeStreamDocument[A]] with
extension (change: ChangeStreamDocument[A])
def docId: Option[String] =
change.documentKey.flatMap(_.id)

extension (doc: Document)
private def id: Option[String] =
Expand Down
46 changes: 46 additions & 0 deletions modules/ingestor/src/test/scala/HasDocIdTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package lila.search
package ingestor

import cats.Show
import cats.effect.IO
import cats.syntax.all.*
import org.scalacheck.{ Arbitrary, Gen }
import weaver.*
import weaver.scalacheck.*

object HasDocIdTest extends SimpleIOSuite with Checkers:

case class Change(value: Int, docId: Option[String])
given HasDocId[Change] with
extension (a: Change) def docId: Option[String] = a.docId

given Show[Change] = Show.fromToString
given Arbitrary[Change] = Arbitrary:
for
value <- Gen.posNum[Int]
docId <- Gen.option(Gen.alphaNumStr)
yield Change(value, docId)

test("distincByDocId is empty when input is empty"):
val changes = List.empty[Change]
val result = changes.distincByDocId
IO(expect(List.empty[Option[String]] == result))

test("distincByDocId is empty when all docIds are none"):
forall: (changes: List[Change]) =>
val xs = changes.map(_.copy(docId = none))
expect(xs.distincByDocId.isEmpty)

test("distincByDocId contains only item with defined docId"):
forall: (changes: List[Change]) =>
expect(changes.distincByDocId.forall(_.docId.isDefined))

test("distincByDocId is idempotent"):
forall: (changes: List[Change]) =>
expect(changes.distincByDocId == changes.distincByDocId.distincByDocId)

test("distincByDocId == reverse.distincBy.reverse"):
forall: (changes: List[Change]) =>
val result = changes.distincByDocId
val doubleReversed = changes.reverse.filter(_.docId.isDefined).distinctBy(_.docId).reverse
expect(result == doubleReversed)
Loading