Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37386] Emit CreateTableEvent only when met the related SourceRecord. #3932

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

lvyanquan
Copy link
Contributor

Emit CreateTableEvent only when met the related SourceRecord to avoid downstream backpressure.

@lvyanquan lvyanquan changed the title [FLINK-37278] Emit CreateTableEvent only when met the related SourceRecord. [FLINK-37386] Emit CreateTableEvent only when met the related SourceRecord. Feb 26, 2025
@lvyanquan
Copy link
Contributor Author

Hi @yuxiqian.

Could you help to review this?

} else {
if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
TableId tableId = getTableId(element);
if (!alreadySendCreateTableTables.contains(tableId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems alreadySendCreateTableTables and createTableEventCache are not being stored into MySQL source state persistently. Will that block users from recovering from an earlier binlog state where schemas are different from current state?

TableId tableId = getTableId(element);
if (!alreadySendCreateTableTables.contains(tableId)) {
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
// New created table in binlog reading phase.
Copy link
Contributor

@yuxiqian yuxiqian Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--- Create Table A --> Table A Binlog --> Drop Table A --> Startup Offset --->

                    ^
                    |
       What if we start from here?

Seems testDanglingDropTableEventInBinlog will fail.

Copy link
Contributor Author

@lvyanquan lvyanquan Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed a problem, but it is different from the one I hope to solve here.

For DataChange, perhaps we can derive the schema from SourceRecord instead of using SQL to query the latest schema. But to make this problem more difficult, if we start reading from a position where SchemaChange happened, we cannot derive the original schema.

--- Create Table A --> Alter Table A Add a column--> Table A Binlog --> Startup Offset --->

                     ^
                     |
        What if we start from here?

So I think starting from a position where historical schema is different with the current schema is still an unresolved issue for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying this. Should we throw an exception for such dangling schema change events here?

@lvyanquan
Copy link
Contributor Author

The issue is related to #3912 (comment) @gongzexin as you may be interested about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants