diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
index 893bea21846..3b08e986825 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java
@@ -77,6 +77,7 @@ private int numericColumnCount(byte[] types) {
case NEWDECIMAL:
case FLOAT:
case DOUBLE:
+ case YEAR:
count++;
break;
default:
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java
new file mode 100644
index 00000000000..16701538b1d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventMetadataDeserializer.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization;
+
+import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
+import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata.DefaultCharset;
+import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Copied from mysql-binlog-connector 0.27.2 to fix table metadata deserialization issue #363.
+ *
+ *
Line 75: skip to end of the signedness block so that it won't affect parsing the next table
+ * metadata.
+ */
+public class TableMapEventMetadataDeserializer {
+
+ private final Logger logger = Logger.getLogger(getClass().getName());
+
+ public TableMapEventMetadata deserialize(
+ ByteArrayInputStream inputStream, int nColumns, int nNumericColumns)
+ throws IOException {
+ int remainingBytes = inputStream.available();
+ if (remainingBytes <= 0) {
+ return null;
+ }
+
+ TableMapEventMetadata result = new TableMapEventMetadata();
+
+ for (; remainingBytes > 0; inputStream.enterBlock(remainingBytes)) {
+ int code = inputStream.readInteger(1);
+
+ MetadataFieldType fieldType = MetadataFieldType.byCode(code);
+ if (fieldType == null) {
+ throw new IOException("Unsupported table metadata field type " + code);
+ }
+ if (MetadataFieldType.UNKNOWN_METADATA_FIELD_TYPE.equals(fieldType)) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("Received metadata field of unknown type");
+ }
+ continue;
+ }
+
+ int fieldLength = inputStream.readPackedInteger();
+
+ remainingBytes = inputStream.available();
+ inputStream.enterBlock(fieldLength);
+
+ switch (fieldType) {
+ case SIGNEDNESS:
+ result.setSignedness(readBooleanList(inputStream, nNumericColumns));
+ inputStream.skipToTheEndOfTheBlock();
+ break;
+ case DEFAULT_CHARSET:
+ result.setDefaultCharset(readDefaultCharset(inputStream));
+ break;
+ case COLUMN_CHARSET:
+ result.setColumnCharsets(readIntegers(inputStream));
+ break;
+ case COLUMN_NAME:
+ result.setColumnNames(readColumnNames(inputStream));
+ break;
+ case SET_STR_VALUE:
+ result.setSetStrValues(readTypeValues(inputStream));
+ break;
+ case ENUM_STR_VALUE:
+ result.setEnumStrValues(readTypeValues(inputStream));
+ break;
+ case GEOMETRY_TYPE:
+ result.setGeometryTypes(readIntegers(inputStream));
+ break;
+ case SIMPLE_PRIMARY_KEY:
+ result.setSimplePrimaryKeys(readIntegers(inputStream));
+ break;
+ case PRIMARY_KEY_WITH_PREFIX:
+ result.setPrimaryKeysWithPrefix(readIntegerPairs(inputStream));
+ break;
+ case ENUM_AND_SET_DEFAULT_CHARSET:
+ result.setEnumAndSetDefaultCharset(readDefaultCharset(inputStream));
+ break;
+ case ENUM_AND_SET_COLUMN_CHARSET:
+ result.setEnumAndSetColumnCharsets(readIntegers(inputStream));
+ result.setVisibility(readBooleanList(inputStream, nColumns));
+ break;
+ case VISIBILITY:
+ result.setVisibility(readBooleanList(inputStream, nColumns));
+ break;
+ default:
+ inputStream.enterBlock(remainingBytes);
+ throw new IOException("Unsupported table metadata field type " + code);
+ }
+ remainingBytes -= fieldLength;
+ }
+ return result;
+ }
+
+ private static BitSet readBooleanList(ByteArrayInputStream inputStream, int length)
+ throws IOException {
+ BitSet result = new BitSet();
+ // according to MySQL internals the amount of storage required for N columns is INT((N+7)/8)
+ // bytes
+ byte[] bytes = inputStream.read((length + 7) >> 3);
+ for (int i = 0; i < length; ++i) {
+ if ((bytes[i >> 3] & (1 << (7 - (i % 8)))) != 0) {
+ result.set(i);
+ }
+ }
+ return result;
+ }
+
+ private static DefaultCharset readDefaultCharset(ByteArrayInputStream inputStream)
+ throws IOException {
+ TableMapEventMetadata.DefaultCharset result = new TableMapEventMetadata.DefaultCharset();
+ result.setDefaultCharsetCollation(inputStream.readPackedInteger());
+ Map charsetCollations = readIntegerPairs(inputStream);
+ if (!charsetCollations.isEmpty()) {
+ result.setCharsetCollations(charsetCollations);
+ }
+ return result;
+ }
+
+ private static List readIntegers(ByteArrayInputStream inputStream) throws IOException {
+ List result = new ArrayList();
+ while (inputStream.available() > 0) {
+ result.add(inputStream.readPackedInteger());
+ }
+ return result;
+ }
+
+ private static List readColumnNames(ByteArrayInputStream inputStream)
+ throws IOException {
+ List columnNames = new ArrayList();
+ while (inputStream.available() > 0) {
+ columnNames.add(inputStream.readLengthEncodedString());
+ }
+ return columnNames;
+ }
+
+ private static List readTypeValues(ByteArrayInputStream inputStream)
+ throws IOException {
+ List result = new ArrayList();
+ while (inputStream.available() > 0) {
+ List typeValues = new ArrayList();
+ int valuesCount = inputStream.readPackedInteger();
+ for (int i = 0; i < valuesCount; ++i) {
+ typeValues.add(inputStream.readLengthEncodedString());
+ }
+ result.add(typeValues.toArray(new String[typeValues.size()]));
+ }
+ return result;
+ }
+
+ private static Map readIntegerPairs(ByteArrayInputStream inputStream)
+ throws IOException {
+ Map result = new LinkedHashMap();
+ while (inputStream.available() > 0) {
+ int columnIndex = inputStream.readPackedInteger();
+ int columnCharset = inputStream.readPackedInteger();
+ result.put(columnIndex, columnCharset);
+ }
+ return result;
+ }
+
+ private enum MetadataFieldType {
+ SIGNEDNESS(1), // Signedness of numeric colums
+ DEFAULT_CHARSET(2), // Charsets of character columns
+ COLUMN_CHARSET(3), // Charsets of character columns
+ COLUMN_NAME(4), // Names of columns
+ SET_STR_VALUE(5), // The string values of SET columns
+ ENUM_STR_VALUE(6), // The string values is ENUM columns
+ GEOMETRY_TYPE(7), // The real type of geometry columns
+ SIMPLE_PRIMARY_KEY(8), // The primary key without any prefix
+ PRIMARY_KEY_WITH_PREFIX(9), // The primary key with some prefix
+ ENUM_AND_SET_DEFAULT_CHARSET(10), // Charsets of ENUM and SET columns
+ ENUM_AND_SET_COLUMN_CHARSET(11), // Charsets of ENUM and SET columns
+ VISIBILITY(12), // Column visibility (8.0.23 and newer)
+ UNKNOWN_METADATA_FIELD_TYPE(
+ 128); // Returned with binlog-row-metadata=FULL from MySQL 8.0 in some cases
+
+ private final int code;
+
+ private MetadataFieldType(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ private static final Map INDEX_BY_CODE;
+
+ static {
+ INDEX_BY_CODE = new HashMap();
+ for (MetadataFieldType fieldType : values()) {
+ INDEX_BY_CODE.put(fieldType.code, fieldType);
+ }
+ }
+
+ public static MetadataFieldType byCode(int code) {
+ return INDEX_BY_CODE.get(code);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializerTest.java
index dc48f4d8413..068b775010b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializerTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializerTest.java
@@ -22,7 +22,9 @@
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
import java.util.BitSet;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -95,6 +97,66 @@ public void testDeserialize() throws IOException {
assertThat(eventData.toString()).isEqualTo(getExpectedEventData().toString());
}
+ @Test
+ public void testDeserializeMetadata() throws IOException {
+ byte[] data = {
+ // SIGNEDNESS
+ 1,
+ // SIGNEDNESS length
+ 5,
+ // 5 bytes for SIGNEDNESS
+ -74,
+ -39,
+ -101,
+ 97,
+ 0,
+ // COLUMN_CHARSET
+ 3,
+ // COLUMN_CHARSET length
+ 32,
+ // remaining 32 bytes for COLUMN_CHARSET
+ 33,
+ 33,
+ 63,
+ 33,
+ 63,
+ 63,
+ 63,
+ 63,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0,
+ -4,
+ -1,
+ 0
+ };
+
+ TableMapEventMetadataDeserializer deserializer = new TableMapEventMetadataDeserializer();
+
+ TableMapEventMetadata metadata =
+ deserializer.deserialize(new ByteArrayInputStream(data), 59, 32);
+
+ assertThat(metadata.toString()).isEqualTo(getExpectedTableMapEventMetaData().toString());
+ }
+
private TableMapEventData getExpectedEventData() {
TableMapEventData eventData = new TableMapEventData();
// table_id
@@ -123,4 +185,23 @@ private TableMapEventData getExpectedEventData() {
eventData.setEventMetadata(metadata);
return eventData;
}
+
+ private TableMapEventMetadata getExpectedTableMapEventMetaData() {
+ // optional metadata fields
+ TableMapEventMetadata metadata = new TableMapEventMetadata();
+ BitSet signedness = new BitSet();
+
+ List signedBits =
+ Arrays.asList(0, 2, 3, 5, 6, 8, 9, 11, 12, 15, 16, 19, 20, 22, 23, 25, 26, 31);
+
+ for (Integer signedBit : signedBits) {
+ signedness.set(signedBit);
+ }
+
+ metadata.setSignedness(signedness);
+ metadata.setColumnCharsets(
+ Arrays.asList(
+ 33, 33, 63, 33, 63, 63, 63, 63, 255, 255, 255, 255, 255, 255, 255, 255));
+ return metadata;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index c89480fcc48..931d6734ce4 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -106,6 +106,9 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private final UniqueDatabase inventoryDatabase8 =
new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
+ private final UniqueDatabase binlogDatabase =
+ new UniqueDatabase(MYSQL8_CONTAINER, "binlog_metadata_test", TEST_USER, TEST_PASSWORD);
+
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
@@ -2186,6 +2189,73 @@ public void testServerIdConflict() {
}
}
+ @Test
+ public void testBinlogTableMetadataDeserialization() throws Exception {
+ if (!incrementalSnapshot) {
+ return;
+ }
+ binlogDatabase.createAndInitialize();
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE binlog_metadata (\n"
+ + " id BIGINT NOT NULL,\n"
+ + " tiny_c TINYINT,\n"
+ + " tiny_un_c SMALLINT ,\n"
+ + " tiny_un_z_c SMALLINT ,\n"
+ + " small_c SMALLINT,\n"
+ + " small_un_c INT,\n"
+ + " small_un_z_c INT,\n"
+ + " year_c INT,\n"
+ + " PRIMARY KEY(id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'server-time-zone' = 'UTC',"
+ + " 'server-id' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ + ")",
+ MYSQL8_CONTAINER.getHost(),
+ MYSQL8_CONTAINER.getDatabasePort(),
+ TEST_USER,
+ TEST_PASSWORD,
+ binlogDatabase.getDatabaseName(),
+ "binlog_metadata",
+ getServerId(),
+ getSplitSize());
+ tEnv.executeSql(sourceDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("SELECT * FROM binlog_metadata");
+
+ // wait for the source startup, we don't have a better way to wait it, use sleep for now
+ do {
+ Thread.sleep(5000L);
+ } while (result.getJobClient().get().getJobStatus().get() != RUNNING);
+
+ CloseableIterator iterator = result.collect();
+
+ try (Connection connection = binlogDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO binlog_metadata VALUES (2, 127, 255, 255, 32767, 65535, 65535, 2024),(3, 127, 255, 255, 32767, 65535, 65535, 2024);");
+ statement.execute("DELETE FROM binlog_metadata WHERE id=3;");
+ }
+
+ String[] expected =
+ new String[] {
+ // snapshot records
+ "+I[1, 127, 255, 255, 32767, 65535, 65535, 2023]",
+ "+I[2, 127, 255, 255, 32767, 65535, 65535, 2024]"
+ };
+ assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
+ result.getJobClient().get().cancel().get();
+ }
+
// ------------------------------------------------------------------------------------
private String getServerId() {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/binlog_metadata_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/binlog_metadata_test.sql
new file mode 100644
index 00000000000..f1e7dba0f44
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/binlog_metadata_test.sql
@@ -0,0 +1,36 @@
+-- Copyright 2023 Ververica Inc.
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- http://www.apache.org/licenses/LICENSE-2.0
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: column_type_test
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- create a table with that number of numeric columns x meets the condition 8|(x + 32), so the table contains
+-- 8 numeric columns. Which mean any miscalculation in numeric columns will result in the wrong number of bytes
+-- read when parsing the signedness table metadata.
+
+CREATE TABLE binlog_metadata
+(
+ id SERIAL,
+ tiny_c TINYINT,
+ tiny_un_c TINYINT UNSIGNED,
+ tiny_un_z_c TINYINT UNSIGNED ZEROFILL,
+ small_c SMALLINT,
+ small_un_c SMALLINT UNSIGNED,
+ small_un_z_c SMALLINT UNSIGNED ZEROFILL,
+ year_c YEAR,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO binlog_metadata
+VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 2023);
\ No newline at end of file