Skip to content

Commit

Permalink
Add support for JSON streams to Kotlin Serialization
Browse files Browse the repository at this point in the history
Closes gh-32074
  • Loading branch information
rotilho authored and sdeleuze committed Jan 24, 2024
1 parent 4d7da00 commit 11898da
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package org.springframework.http.codec;

import java.util.List;
import java.util.Map;
import java.util.*;

import kotlinx.serialization.KSerializer;
import kotlinx.serialization.StringFormat;
Expand Down Expand Up @@ -95,13 +94,20 @@ public List<MimeType> getDecodableMimeTypes(ResolvableType targetType) {
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
return Flux.error(new UnsupportedOperationException());
return Flux.defer(() -> {
KSerializer<Object> serializer = serializer(elementType);
if (serializer == null) {
return Mono.error(new DecodingException("Could not find KSerializer for " + elementType));
}
return this.stringDecoder
.decode(inputStream, elementType, mimeType, hints)
.map(string -> format().decodeFromString(serializer, string));
});
}

@Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return Mono.defer(() -> {
KSerializer<Object> serializer = serializer(elementType);
if (serializer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package org.springframework.http.codec;

import java.util.List;
import java.util.Map;
import java.util.*;

import kotlin.text.Charsets;
import kotlinx.serialization.KSerializer;
import kotlinx.serialization.StringFormat;
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -49,11 +50,17 @@ public abstract class KotlinSerializationStringEncoder<T extends StringFormat> e

// CharSequence encoding needed for now, see /~https://github.com/Kotlin/kotlinx.serialization/issues/204 for more details
private final CharSequenceEncoder charSequenceEncoder = CharSequenceEncoder.allMimeTypes();
private final Set<MimeType> streamingMediaTypes = new HashSet<>();

protected KotlinSerializationStringEncoder(T format, MimeType... supportedMimeTypes) {
super(format, supportedMimeTypes);
}

public void setStreamingMediaTypes(Collection<MediaType> streamingMediaTypes) {
this.streamingMediaTypes.clear();
this.streamingMediaTypes.addAll(streamingMediaTypes);
}

@Override
public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) {
return canSerialize(elementType, mimeType);
Expand All @@ -79,13 +86,17 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
else {
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);

if (mimeType != null && streamingMediaTypes.contains(mimeType)) {
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
.map(list -> encodeValue(list, bufferFactory, elementType, mimeType, hints).write("\n", Charsets.UTF_8));
}

ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public KotlinSerializationJsonDecoder() {
}

public KotlinSerializationJsonDecoder(Json json) {
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"));
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"),
MediaType.APPLICATION_NDJSON);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.springframework.http.MediaType;
import org.springframework.http.codec.KotlinSerializationStringEncoder;

import java.util.List;

/**
* Encode from an {@code Object} stream to a byte stream of JSON objects using
* <a href="/~https://github.com/Kotlin/kotlinx.serialization">kotlinx.serialization</a>.
Expand All @@ -42,7 +44,9 @@ public KotlinSerializationJsonEncoder() {
}

public KotlinSerializationJsonEncoder(Json json) {
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"));
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"),
MediaType.APPLICATION_NDJSON);
setStreamingMediaTypes(List.of(MediaType.APPLICATION_NDJSON));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.testfixture.codec.AbstractDecoderTests
import org.springframework.http.MediaType
import org.springframework.http.customJson
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.math.BigDecimal
Expand All @@ -45,12 +46,16 @@ class CustomKotlinSerializationJsonDecoderTests :

@Test
override fun decode() {
val output = decoder.decode(Mono.empty(),
ResolvableType.forClass(KotlinSerializationJsonDecoderTests.Pojo::class.java), null, emptyMap())
val input = Flux.concat(
stringBuffer("1.0\n"),
stringBuffer("2.0\n")
)
val output = decoder.decode(input, ResolvableType.forClass(BigDecimal::class.java), null, emptyMap())
StepVerifier
.create(output)
.expectError(UnsupportedOperationException::class.java)
.verify()
.expectNext(BigDecimal.valueOf(1.0))
.expectNext(BigDecimal.valueOf(2.0))
.verifyComplete()
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class KotlinSerializationJsonDecoderTests : AbstractDecoderTests<KotlinSerializa
assertThat(decoder.canDecode(ResolvableType.forClass(Ordered::class.java), MediaType.APPLICATION_JSON)).isFalse()
assertThat(decoder.canDecode(ResolvableType.NONE, MediaType.APPLICATION_JSON)).isFalse()
assertThat(decoder.canDecode(ResolvableType.forClass(BigDecimal::class.java), MediaType.APPLICATION_JSON)).isFalse()

assertThat(decoder.canDecode(ResolvableType.forClass(Pojo::class.java), MediaType.APPLICATION_NDJSON)).isTrue()
}

@Test
Expand All @@ -73,6 +75,23 @@ class KotlinSerializationJsonDecoderTests : AbstractDecoderTests<KotlinSerializa
.expectError(UnsupportedOperationException::class.java)
}


@Test
fun decodeStream() {
val input = Flux.concat(
stringBuffer("{\"bar\":\"b1\",\"foo\":\"f1\"}\n"),
stringBuffer("{\"bar\":\"b2\",\"foo\":\"f2\"}\n")
)

testDecodeAll(input, ResolvableType.forClass(Pojo::class.java), { step: FirstStep<Any> ->
step
.expectNext(Pojo("f1", "b1"))
.expectNext(Pojo("f2", "b2"))
.expectComplete()
.verify()
}, null, null)
}

@Test
override fun decodeToMono() {
val input = Flux.concat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
assertThat(encoder.canEncode(ResolvableType.forClassWithGenerics(ArrayList::class.java, Int::class.java), MediaType.APPLICATION_JSON)).isTrue()
assertThat(encoder.canEncode(ResolvableType.forClassWithGenerics(ArrayList::class.java, Int::class.java), MediaType.APPLICATION_PDF)).isFalse()
assertThat(encoder.canEncode(ResolvableType.NONE, MediaType.APPLICATION_JSON)).isFalse()

assertThat(encoder.canEncode(pojoType, MediaType.APPLICATION_NDJSON)).isTrue()
}

@Test
override fun encode() {
val input = Flux.just(
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
)
testEncode(input, Pojo::class.java, { step: FirstStep<DataBuffer?> -> step
.consumeNextWith(expectString("[" +
Expand All @@ -76,6 +78,26 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
.verifyComplete()
})
}
@Test
fun encodeStream() {
val input = Flux.just(
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
)
testEncodeAll(
input,
ResolvableType.forClass(Pojo::class.java),
MediaType.APPLICATION_NDJSON,
null
) { step: FirstStep<DataBuffer?> ->
step
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"))
.verifyComplete()
}
}

@Test
fun encodeMono() {
Expand Down

0 comments on commit 11898da

Please sign in to comment.