generated from SAP/repository-template
-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Orchestration streaming #227
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
40e56b3
Orchestration streaming first version
CharlesDuboisSAP bc899a1
Added unit tests
CharlesDuboisSAP 6abcd59
Formatting
bot-sdk-js 61198e1
Added documentation
CharlesDuboisSAP 837b23e
Added tests
CharlesDuboisSAP 21cace4
Merge branch 'refs/heads/main' into orchestration-streaming
CharlesDuboisSAP 33a92d5
Release notes
CharlesDuboisSAP 144c53b
Merge branch 'main' into orchestration-streaming
CharlesDuboisSAP c3c3938
Merge branch 'main' into orchestration-streaming
CharlesDuboisSAP 39642e5
Applied Alex's review comments
CharlesDuboisSAP 192db45
Merge branch 'main' into orchestration-streaming
CharlesDuboisSAP File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
orchestration/src/main/java/com/sap/ai/sdk/orchestration/IterableStreamConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package com.sap.ai.sdk.orchestration; | ||
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
import static java.util.Spliterator.NONNULL; | ||
import static java.util.Spliterator.ORDERED; | ||
|
||
import io.vavr.control.Try; | ||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
import java.util.Spliterators; | ||
import java.util.concurrent.Callable; | ||
import java.util.function.Function; | ||
import java.util.stream.Stream; | ||
import java.util.stream.StreamSupport; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import lombok.AccessLevel; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.hc.core5.http.HttpEntity; | ||
|
||
/** | ||
* Internal utility class to convert from a reading handler to {@link Iterable} and {@link Stream}. | ||
* | ||
* <p><strong>Note:</strong> All operations are sequential in nature. Thread safety is not | ||
* guaranteed. | ||
* | ||
* @param <T> Iterated item type. | ||
*/ | ||
@Slf4j | ||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE) | ||
class IterableStreamConverter<T> implements Iterator<T> { | ||
/** see DEFAULT_CHAR_BUFFER_SIZE in {@link BufferedReader} * */ | ||
static final int BUFFER_SIZE = 8192; | ||
|
||
/** Read next entry for Stream or {@code null} when no further entry can be read. */ | ||
private final Callable<T> readHandler; | ||
|
||
/** Close handler to be called when Stream terminated. */ | ||
private final Runnable stopHandler; | ||
|
||
/** Error handler to be called when Stream is interrupted. */ | ||
private final Function<Exception, RuntimeException> errorHandler; | ||
|
||
private boolean isDone = false; | ||
private boolean isNextFetched = false; | ||
private T next = null; | ||
|
||
@SuppressWarnings("checkstyle:IllegalCatch") | ||
@Override | ||
public boolean hasNext() { | ||
if (isDone) { | ||
return false; | ||
} | ||
if (isNextFetched) { | ||
return true; | ||
} | ||
try { | ||
next = readHandler.call(); | ||
isNextFetched = true; | ||
if (next == null) { | ||
isDone = true; | ||
stopHandler.run(); | ||
} | ||
} catch (final Exception e) { | ||
isDone = true; | ||
stopHandler.run(); | ||
log.debug("Error while reading next element.", e); | ||
throw errorHandler.apply(e); | ||
} | ||
return !isDone; | ||
} | ||
|
||
@Override | ||
public T next() { | ||
if (next == null && !hasNext()) { | ||
throw new NoSuchElementException(); // normally not reached with Stream API | ||
} | ||
isNextFetched = false; | ||
return next; | ||
} | ||
|
||
/** | ||
* Create a sequential Stream of lines from an HTTP response string (UTF-8). The underlying {@link | ||
* InputStream} is closed, when the resulting Stream is closed (e.g. via try-with-resources) or | ||
* when an exception occurred. | ||
* | ||
* @param entity The HTTP entity object. | ||
* @return A sequential Stream object. | ||
* @throws OrchestrationClientException if the provided HTTP entity object is {@code null} or | ||
* empty. | ||
*/ | ||
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed | ||
@Nonnull | ||
static Stream<String> lines(@Nullable final HttpEntity entity) | ||
throws OrchestrationClientException { | ||
if (entity == null) { | ||
throw new OrchestrationClientException("Orchestration service response was empty."); | ||
} | ||
|
||
final InputStream inputStream; | ||
try { | ||
inputStream = entity.getContent(); | ||
} catch (final IOException e) { | ||
throw new OrchestrationClientException("Failed to read response content.", e); | ||
} | ||
|
||
final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE); | ||
final Runnable closeHandler = | ||
() -> Try.run(reader::close).onFailure(e -> log.error("Could not close input stream", e)); | ||
final Function<Exception, RuntimeException> errHandler = | ||
e -> new OrchestrationClientException("Parsing response content was interrupted.", e); | ||
|
||
final var iterator = new IterableStreamConverter<>(reader::readLine, closeHandler, errHandler); | ||
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL); | ||
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler); | ||
} | ||
} |
44 changes: 44 additions & 0 deletions
44
...stration/src/main/java/com/sap/ai/sdk/orchestration/OrchestrationChatCompletionDelta.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package com.sap.ai.sdk.orchestration; | ||
|
||
import com.sap.ai.sdk.orchestration.model.CompletionPostResponse; | ||
import com.sap.ai.sdk.orchestration.model.LLMModuleResultSynchronous; | ||
import java.util.Map; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import lombok.val; | ||
|
||
/** Orchestration chat completion output delta for streaming. */ | ||
public class OrchestrationChatCompletionDelta extends CompletionPostResponse | ||
implements StreamedDelta { | ||
|
||
@Nonnull | ||
@Override | ||
// will be fixed once the generated code add a discriminator which will allow this class to extend | ||
// CompletionPostResponseStreaming | ||
@SuppressWarnings("unchecked") | ||
public String getDeltaContent() { | ||
val choices = ((LLMModuleResultSynchronous) getOrchestrationResult()).getChoices(); | ||
// Avoid the first delta: "choices":[] | ||
if (!choices.isEmpty() | ||
// Multiple choices are spread out on multiple deltas | ||
// A delta only contains one choice with a variable index | ||
&& choices.get(0).getIndex() == 0) { | ||
|
||
final var message = (Map<String, Object>) choices.get(0).getCustomField("delta"); | ||
// Avoid the second delta: "choices":[{"delta":{"content":"","role":"assistant"}}] | ||
if (message != null && message.get("content") != null) { | ||
return message.get("content").toString(); | ||
} | ||
} | ||
return ""; | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public String getFinishReason() { | ||
return ((LLMModuleResultSynchronous) getOrchestrationResult()) | ||
.getChoices() | ||
.get(0) | ||
.getFinishReason(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
orchestration/src/main/java/com/sap/ai/sdk/orchestration/OrchestrationStreamingHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.sap.ai.sdk.orchestration; | ||
|
||
import static com.sap.ai.sdk.orchestration.OrchestrationClient.JACKSON; | ||
import static com.sap.ai.sdk.orchestration.OrchestrationResponseHandler.buildExceptionAndThrow; | ||
import static com.sap.ai.sdk.orchestration.OrchestrationResponseHandler.parseErrorAndThrow; | ||
|
||
import java.io.IOException; | ||
import java.util.stream.Stream; | ||
import javax.annotation.Nonnull; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.hc.core5.http.ClassicHttpResponse; | ||
|
||
@Slf4j | ||
@RequiredArgsConstructor | ||
class OrchestrationStreamingHandler<D extends StreamedDelta> { | ||
|
||
@Nonnull private final Class<D> deltaType; | ||
|
||
/** | ||
* @param response The response to process | ||
* @return A {@link Stream} of a model class instantiated from the response | ||
*/ | ||
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed | ||
@Nonnull | ||
Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response) | ||
throws OrchestrationClientException { | ||
if (response.getCode() >= 300) { | ||
buildExceptionAndThrow(response); | ||
} | ||
return IterableStreamConverter.lines(response.getEntity()) | ||
// half of the lines are empty newlines, the last line is "data: [DONE]" | ||
.peek(line -> log.info("Handler: {}", line)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Comment) I'm surprised we have so many (no action required.) |
||
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim())) | ||
.peek( | ||
line -> { | ||
if (!line.startsWith("data: ")) { | ||
final String msg = "Failed to parse response from the Orchestration service"; | ||
parseErrorAndThrow(line, new OrchestrationClientException(msg)); | ||
} | ||
}) | ||
.map( | ||
line -> { | ||
final String data = line.substring(5); // remove "data: " | ||
try { | ||
return JACKSON.readValue(data, deltaType); | ||
} catch (final IOException e) { // exception message e gets lost | ||
log.error( | ||
"Failed to parse the following response from the Orchestration service: {}", | ||
line); | ||
throw new OrchestrationClientException("Failed to parse delta message: " + line, e); | ||
} | ||
}); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Comment)
New method names match the existing
OpenAiClient
counter-part inopenai
module.