Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get garbled codes in the snapshot phase with MySql CDC #1166

Closed
lknan opened this issue May 10, 2022 · 8 comments
Closed

Get garbled codes in the snapshot phase with MySql CDC #1166

lknan opened this issue May 10, 2022 · 8 comments
Assignees
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@lknan
Copy link

lknan commented May 10, 2022

Describe the bug(Please use English)
A clear and concise description of what the bug is.

Environment :

  • Flink version : 1.13.1
  • Flink CDC version: 2.2.1
  • Database and version: 8.0.28

To Reproduce
Steps to reproduce the behavior:
mysql数据库编码为utf-8,A库的a1表的aa1字段的编码为ucs2,然后使用flink cdc读取数据,存量数据读取是乱码的;
读取binlog的新增数据是不乱码的;
然后我查了下读取binlog对aa1字段使用的utf-16编码,使用的也是utf-16解码;
读取存量快照的时候对usc2是utf-8编码(可能是默认),utf-16解码,所以乱码了

Additional Description
image

@lknan lknan added the bug Something isn't working label May 10, 2022
@fsk119
Copy link
Member

fsk119 commented May 10, 2022

Please use English in the open source community.

@fsk119
Copy link
Member

fsk119 commented May 10, 2022

To produce the problem, please add the sql in the costomer.sql

CREATE TABLE `gen_table_copy1` (
                                   `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
                                   `table_name` varchar(200) CHARACTER SET ucs2 COLLATE ucs2_general_ci DEFAULT '' COMMENT '表名称',
                                   PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ucs2;

INSERT into gen_table_copy1 values (101, '测试数据'), (2, 'Craig Marshall');

You can add the test in the MySqlConnectorITCase

@Test
    public void testEncoding() {
        customerDatabase.createAndInitialize();
        String sourceDDL =
                String.format(
                        "CREATE TABLE gen_table_copy1 (\n"
                                + "  table_id BIGINT,\n"
                                + "  table_name STRING,\n"
                                + "  primary key(table_id) not enforced"
                                + ") WITH ("
                                + " 'connector' = 'mysql-cdc',"
                                + " 'hostname' = '%s',"
                                + " 'port' = '%s',"
                                + " 'username' = '%s',"
                                + " 'password' = '%s',"
                                + " 'database-name' = '%s',"
                                + " 'table-name' = '%s',"
                                + " 'scan.incremental.snapshot.enabled' = '%s',"
                                + " 'server-id' = '%s',"
                                + " 'scan.incremental.snapshot.chunk.size' = '%s'\n"
                                //                                + "
                                // 'jdbc.properties.characterEncoding' = 'UnicodeBig',\n"
                                //                                + "
                                // 'jdbc.properties.characterSetResults' = 'UnicodeBig'"
                                + ")",
                        MYSQL_CONTAINER.getHost(),
                        MYSQL_CONTAINER.getDatabasePort(),
                        customerDatabase.getUsername(),
                        customerDatabase.getPassword(),
                        customerDatabase.getDatabaseName(),
                        "gen_table_copy1",
                        incrementalSnapshot,
                        getServerId(),
                        getSplitSize());
        tEnv.executeSql(sourceDDL);

        tEnv.executeSql("SELECT * FROM gen_table_copy1 LIMIT 2").print();
    }

For the quick fix, you can modify as we do in the picture.

image

@fsk119
Copy link
Member

fsk119 commented May 10, 2022

The bug is because the jdbc connection uses the utf-8 encoding format. It causes the results from the jdbc connection has already been encoded in the utf-8 charset. But when convert the object array to the SourceRecord, it tries to use the charset defined in the DDL to encoding bytes. You can take a look at the MySqlValueConverts#converter for more details.

image

@fsk119 fsk119 added the good first issue Good for newcomers label May 10, 2022
@lknan
Copy link
Author

lknan commented May 11, 2022

这个改动只能兼容ucs2,相同的问题在其他引擎也会存在。debezium对数据库表字段的编码都进行了解析,并会进行相应的编解码,flink里边的快照读应该都使用的是默认的UTF-8,在这里对所有的类型进行转换总觉得不是最好的方式

@fsk119
Copy link
Member

fsk119 commented May 11, 2022

Please use English...

@fsk119
Copy link
Member

fsk119 commented May 11, 2022

It's just a quick fix. You should refer to the doc[1] that shows the mapping releationship bewteen the mysql charset and java charset. I think we should do all the mapping in the doc.

Maybe we should also test how the debezium works in the snapshot phase.

[1] https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-charsets.html

@fsk119 fsk119 changed the title 编码问题 Get garbled codes in the snapshot phase with MySql CDC May 11, 2022
@ruanhang1993
Copy link
Contributor

I am interested in this ticket. I could help to fix it.

@fsk119
Copy link
Member

fsk119 commented May 16, 2022

Thanks for your help. Please ping me when you finish your PR.

ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue May 17, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue May 18, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Jun 13, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Jul 5, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Jul 29, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Aug 3, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Aug 8, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Aug 9, 2022
ruanhang1993 added a commit to ruanhang1993/flink-cdc-connectors that referenced this issue Aug 10, 2022
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this issue Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

3 participants