-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37386] Emit CreateTableEvent only when met the related SourceRecord. #3932
base: master
Are you sure you want to change the base?
Conversation
Hi @yuxiqian. Could you help to review this? |
} else { | ||
if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) { | ||
TableId tableId = getTableId(element); | ||
if (!alreadySendCreateTableTables.contains(tableId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems alreadySendCreateTableTables
and createTableEventCache
are not being stored into MySQL source state persistently. Will that block users from recovering from an earlier binlog state where schemas are different from current state?
TableId tableId = getTableId(element); | ||
if (!alreadySendCreateTableTables.contains(tableId)) { | ||
CreateTableEvent createTableEvent = createTableEventCache.get(tableId); | ||
// New created table in binlog reading phase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--- Create Table A --> Table A Binlog --> Drop Table A --> Startup Offset --->
^
|
What if we start from here?
Seems testDanglingDropTableEventInBinlog
will fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indeed a problem, but it is different from the one I hope to solve here.
For DataChange, perhaps we can derive the schema from SourceRecord
instead of using SQL to query the latest schema. But to make this problem more difficult, if we start reading from a position where SchemaChange happened, we cannot derive the original schema.
--- Create Table A --> Alter Table A Add a column--> Table A Binlog --> Startup Offset --->
^
|
What if we start from here?
So I think starting from a position where historical schema is different with the current schema is still an unresolved issue for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying this. Should we throw an exception for such dangling schema change events here?
The issue is related to #3912 (comment) @gongzexin as you may be interested about it. |
Emit CreateTableEvent only when met the related SourceRecord to avoid downstream backpressure.