Skip to content

Commit

Permalink
[mysql] Use the right column charset in the snapshot phase (apache#1166)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Aug 8, 2022
1 parent 9e6549c commit ddf0706
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,6 @@ else if (actualColumn.jdbcType() == Types.TINYINT
// We thus need to use getObject() to identify if the value was provided and if yes then
// read it again to get correct scale
return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo);
}
// DBZ-2673
// It is necessary to check the type names as types like ENUM and SET are
// also reported as JDBC type char
else if ("CHAR".equals(actualColumn.typeName())
|| "VARCHAR".equals(actualColumn.typeName())
|| "TEXT".equals(actualColumn.typeName())) {
return rs.getBytes(fieldNo);
} else {
return rs.getObject(fieldNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private final UniqueDatabase userDatabase2 =
new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD);

private final UniqueDatabase charsetTestDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD);

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

/** Integration tests for MySQL Table source. */
@RunWith(Parameterized.class)
public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {

private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private static final UniqueDatabase charsetTestDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD);

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

private final String testName;
private final String[] expected;

public MysqlConnectorCharsetITCase(String testName, String[] expected) {
this.testName = testName;
this.expected = expected;
}

@Parameterized.Parameters(name = "Test column charset: {0}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {
"ucs2_test", new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}
},
new Object[] {
"utf8_test", new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}
},
new Object[] {
"ascii_test",
new String[] {"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"}
},
new Object[] {
"sjis_test", new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"}
},
new Object[] {
"gbk_test", new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}
},
new Object[] {
"cp932_test", new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"}
},
new Object[] {
"gb2312_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}
},
new Object[] {
"ujis_test", new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"}
},
new Object[] {
"euckr_test", new String[] {"+I[1, 죠주쥬]", "+I[2, Craig Marshall]", "+I[3, 한국어]"}
},
new Object[] {
"latin1_test", new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"}
},
new Object[] {
"latin2_test", new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŠŞŤŹ]"}
},
new Object[] {
"greek_test", new String[] {"+I[1, αβγδε]", "+I[2, Craig Marshall]", "+I[3, θικλ]"}
},
new Object[] {
"hebrew_test", new String[] {"+I[1, בבקשה]", "+I[2, Craig Marshall]", "+I[3, שרפה]"}
},
new Object[] {
"cp866_test", new String[] {"+I[1, твой]", "+I[2, Craig Marshall]", "+I[3, любой]"}
},
new Object[] {
"tis620_test",
new String[] {"+I[1, ภาษาไทย]", "+I[2, Craig Marshall]", "+I[3, ฆงจฉ]"}
},
new Object[] {
"cp1250_test", new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŠŞŤŹ]"}
},
new Object[] {
"cp1251_test", new String[] {"+I[1, твой]", "+I[2, Craig Marshall]", "+I[3, любой]"}
},
new Object[] {
"cp1257_test",
new String[] {
"+I[1, piedzimst brīvi]", "+I[2, Craig Marshall]", "+I[3, apveltīti ar saprātu]"
}
},
new Object[] {
"macroman_test", new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"}
},
new Object[] {
"macce_test", new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŮÚŰÜ]"}
},
new Object[] {
"big5_test", new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"}
}
};
}

@BeforeClass
public static void beforeClass() {
charsetTestDatabase.createAndInitialize();
}

@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200);
}

@Test
public void testCharset() throws Exception {
String sourceDDL =
String.format(
"CREATE TABLE %s (\n"
+ " table_id BIGINT,\n"
+ " table_name STRING,\n"
+ " primary key(table_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
charsetTestDatabase.getUsername(),
charsetTestDatabase.getPassword(),
charsetTestDatabase.getDatabaseName(),
testName,
true,
getServerId(),
4);
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName));

CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().ifPresent(client -> client.cancel());
}

private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}

private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
}
Loading

0 comments on commit ddf0706

Please sign in to comment.