Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Chunk.traverse take 2 #1957

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 76 additions & 64 deletions core/shared/src/main/scala/fs2/Chunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,17 @@ object Chunk extends CollectorK[Chunk] with ChunkCompanionPlatform {

/** Creates a chunk backed by a `Chain`. */
def chain[O](c: Chain[O]): Chunk[O] =
seq(c.toList)
if (c.isEmpty) empty
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the generic Iterable implementation and seems strictly better than converting to List first, which is what we were doing before.

else {
val itr = c.iterator
val head = itr.next
if (itr.hasNext) {
val bldr = collection.mutable.Buffer.newBuilder[O]
bldr += head
bldr ++= itr
buffer(bldr.result)
} else singleton(head)
}

/**
* Creates a chunk backed by a mutable buffer. The underlying buffer must not be modified after
Expand Down Expand Up @@ -1669,87 +1679,89 @@ object Chunk extends CollectorK[Chunk] with ChunkCompanionPlatform {
)(f: A => F[B])(implicit F: Applicative[F]): F[Chunk[B]] =
if (fa.isEmpty) F.pure(Chunk.empty[B])
else {
val applied: collection.mutable.Buffer[F[B]] = {
val size = fa.size
val b = collection.mutable.Buffer.newBuilder[F[B]]
b.sizeHint(size)
var idx = 0
while (idx < size) {
b += f(fa(idx))
idx = idx + 1
}
b.result()
}

// we branch out by this factor
val width = 128
// By making a tree here we don't blow the stack
// even if the Chunk is very long
// by construction, this is never called with start == end
def loop(start: Int, end: Int): F[Chain[B]] =
if (start >= (end - 2))
// Here we are at the leafs of the trees, either a single or a pair
if (start == (end - 2))
F.map2(applied(start), applied(start + 1)) { (a, b) =>
Chain.one(a).concat(Chain.one(b))
}
else
F.map(applied(start))(Chain.one)
else {
// we have 3 or more nodes left
val mid = start + ((end - start) / 2)
val left = loop(start, mid)
val right = loop(mid, end)
F.map2(left, right)(_.concat(_))
def loop(start: Int, end: Int): Eval[F[Chain[B]]] =
if (end - start <= width) {
// Here we are at the leafs of the trees
// we don't use map2Eval since it is always
// at most width in size.
var flist = f(fa(end - 1)).map(_ :: Nil)
var idx = end - 2
while (start <= idx) {
flist = F.map2(f(fa(idx)), flist)(_ :: _)
idx = idx - 1
}
Eval.now(flist.map(Chain.fromSeq(_)))
} else {
// we have width + 1 or more nodes left
val step = (end - start) / width

var fchain = Eval.defer(loop(start, start + step))
var start0 = start + step
var end0 = start0 + step

while (start0 < end) {
val end1 = math.min(end, end0)
fchain = fchain.flatMap(F.map2Eval(_, Eval.defer(loop(start0, end1)))(_.concat(_)))
start0 = start0 + step
end0 = end0 + step
}
fchain
}

F.map(loop(0, fa.size))(Chunk.chain)
F.map(loop(0, fa.size).value)(Chunk.chain)
}

override def traverseFilter[F[_], A, B](
fa: Chunk[A]
)(f: A => F[Option[B]])(implicit F: Applicative[F]): F[Chunk[B]] =
if (fa.isEmpty) F.pure(Chunk.empty[B])
else {
val applied: collection.mutable.Buffer[F[Option[B]]] = {
val size = fa.size
val b = collection.mutable.Buffer.newBuilder[F[Option[B]]]
b.sizeHint(size)
var idx = 0
while (idx < size) {
b += f(fa(idx))
idx = idx + 1
}
b.result()
}

val empty = Chain.empty[B]
// we branch out by this factor
val width = 128
// By making a tree here we don't blow the stack
// even if the Chunk is very long
// by construction, this is never called with start == end
def loop(start: Int, end: Int): F[Chain[B]] =
if (start >= (end - 2))
// Here we are at the leafs of the trees, either a single or a pair
if (start == (end - 2))
F.map2(applied(start), applied(start + 1)) { (a, b) =>
if (a.nonEmpty)
if (b.nonEmpty) Chain.one(a.get).concat(Chain.one(b.get))
else Chain.one(a.get)
else if (b.nonEmpty) Chain.one(b.get)
else empty
}
else
F.map(applied(start)) { opt =>
if (opt.isEmpty) empty
else Chain.one(opt.get)
def loop(start: Int, end: Int): Eval[F[Chain[B]]] =
if (end - start <= width) {
// Here we are at the leafs of the trees
// we don't use map2Eval since it is always
// at most width in size.
var flist = f(fa(end - 1)).map {
case Some(a) => a :: Nil
case None => Nil
}
var idx = end - 2
while (start <= idx) {
flist = F.map2(f(fa(idx)), flist) { (optB, list) =>
if (optB.isDefined) optB.get :: list
else list
}
else {
// we have 3 or more nodes left
val mid = start + ((end - start) / 2)
val left = loop(start, mid)
val right = loop(mid, end)
F.map2(left, right)(_.concat(_))
idx = idx - 1
}
Eval.now(flist.map(Chain.fromSeq(_)))
} else {
// we have width + 1 or more nodes left
val step = (end - start) / width

var fchain = Eval.defer(loop(start, start + step))
var start0 = start + step
var end0 = start0 + step

while (start0 < end) {
val end1 = math.min(end, end0)
fchain = fchain.flatMap(F.map2Eval(_, Eval.defer(loop(start0, end1)))(_.concat(_)))
start0 = start0 + step
end0 = end0 + step
}
fchain
}

F.map(loop(0, fa.size))(Chunk.chain)
F.map(loop(0, fa.size).value)(Chunk.chain)
}

override def mapFilter[A, B](fa: Chunk[A])(f: A => Option[B]): Chunk[B] = {
Expand Down