Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamSourceRecordReader should support AckPolicy.MANUAL #17

Closed
ahawtho opened this issue Oct 13, 2022 · 8 comments
Closed

StreamSourceRecordReader should support AckPolicy.MANUAL #17

ahawtho opened this issue Oct 13, 2022 · 8 comments
Labels
enhancement New feature or request released Issue has been released

Comments

@ahawtho
Copy link
Contributor

ahawtho commented Oct 13, 2022

StreamSourceRecordReader uses the default AckPolicy on RedisItemReader, which is AUTO. If the connector fails to write to the topic after the reader has received a message, and the task is stopped/restarted, that message will be lost (at-most-once processing).

The connector should also support at-least-once processing by XACK'ing each message after they've been committed, indicated by AckPolicy.MANUAL. The connector should support a property such as:

redis.stream.delivery.type: at-least-once | at-most-once

and I'd argue the default should be at-least-once. I believe I can put together a PR for this.

@jruaux jruaux added the enhancement New feature or request label Oct 21, 2022
@jruaux
Copy link
Collaborator

jruaux commented Oct 21, 2022

Thank you. I added an option to set the ack policy (auto or explicit).

@jruaux jruaux closed this as completed in ba014f0 Oct 21, 2022
@ahawtho
Copy link
Contributor Author

ahawtho commented Oct 21, 2022

Hi @jruaux , thanks for putting together a change for this. I've been working on another PR for this but there are some differences between our approaches:

I was hoping to define the behavior in terms of the semantics of the delivery policy (at least once vs at most once) rather than the Redis behavior of XACK. To me as a connector user, this is a more meaningful guarantee. I also believe the current implementation after these changes may be missing part of that guarantee. Read on for details:

If the connector fails between reading messages from the stream and writing the messages to Kafka, the default behavior of the StreamItemReader will drop all of the messages that remain in Redis' "pending entries list", or PEL, described in the docs. This breaks the promise of "at-least-once" delivery. Even if the messages are written to Kafka, the dropped messages will remain in the PEL indefinitely, which essentially leaks memory in Redis.

One last comment: the new changes refer to a method StreamItemReader.ack(String[]) rather than StreamItemReader.ack(StreamMessage). This doesn't seem to be available in the previous version of spring-batch-redis (3.0.4), and it seems that this change introduces a breaking API change in a patch version (3.0.5), which breaks semantic versioning. I'd suggest restoring the old ack(StreamMessage) method to avoid the breaking change.

I'll push my changes and post my PR so you can review the spirit of my approach. The changes are complete, but I was writing more unit tests to try to verify the behavior. I hadn't noticed the availability of the integration tests, but I'd be happy to pivot to writing more of those instead to demonstrate some of the potential issues with recovery.

Edit: One other thing I forgot to mention is that by relying solely on the kafka-connect source offset for the stream recovery, it provides the framework necessary to support "exactly-once" processing for connectors just released in Kafka 3.3 .

@ahawtho
Copy link
Contributor Author

ahawtho commented Oct 21, 2022

Here's a link to the commit with all the changes. The master branch here has diverged too much for now, but it's probably not too much work to incorporate the idea of the StreamMessageRecovery into the current master:

coursehero@88f49eb

@jruaux
Copy link
Collaborator

jruaux commented Oct 24, 2022

Thanks for your input and contribution. I agree with exposing the semantics to the user (vs the Redis implementation). I will also incorporate that message recovery mechanism, although I'm thinking it might belong to Spring Batch Redis as it might benefit other projects.
Good catch regarding the breaking API in Spring Batch Redis. I'll restore that method and push a new version shortly.

Thanks again!

@ahawtho
Copy link
Contributor Author

ahawtho commented Oct 26, 2022

@jruaux - I pushed another commit on that branch that fixes issues with StreamMessageRecovery and adds integration tests. I know you won't be incorporating these changes as-is, but I'd appreciate you taking a look. I'm not sure the best way to provide the integration in Spring Batch Redis to ack the messages that were committed previously by Connect, but not yet acked manually (e.g. we lost a connection to Redis between reading a batch and attempting to commit messages). This behavior is necessary to support "exactly-once" connector processing. The test RedisSourceTaskIT.pollStreamAtLeastOnceRecoverFromOffset(RedisTestContext) mocks restoring from a connector offset.

@jruaux
Copy link
Collaborator

jruaux commented Nov 9, 2022

@ahawtho I included your message recovery logic in Spring Batch Redis v3.0.7 and this project's early access release leverages it. Can you let me know what you think? If all looks good I'll cut a new version

@ahawtho
Copy link
Contributor Author

ahawtho commented Nov 16, 2022

@jruaux I will try to take a look at this tomorrow or Friday. Thanks for putting it all together!

@jruaux jruaux added the released Issue has been released label Jun 21, 2023
@jruaux
Copy link
Collaborator

jruaux commented Jun 21, 2023

🎉 This issue has been resolved in v0.7.5 (Release Notes)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request released Issue has been released
Projects
None yet
Development

No branches or pull requests

2 participants