Skip to content

Commit

Permalink
[cdc-source-connector][mysql] fix deserialization issue on table meta…
Browse files Browse the repository at this point in the history
…data binlog event (#2682)


Co-authored-by: EchoLee5 <39044001+EchoLee5@users.noreply.github.com>
  • Loading branch information
qidian99 and EchoLee5 authored Dec 5, 2023
1 parent 7d7c1af commit f970444
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private int numericColumnCount(byte[] types) {
case NEWDECIMAL:
case FLOAT:
case DOUBLE:
case YEAR:
count++;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata.DefaultCharset;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Copied from mysql-binlog-connector 0.27.2 to fix table metadata deserialization issue #363.
*
* <p>Line 75: skip to end of the signedness block so that it won't affect parsing the next table
* metadata.
*/
public class TableMapEventMetadataDeserializer {

private final Logger logger = Logger.getLogger(getClass().getName());

public TableMapEventMetadata deserialize(
ByteArrayInputStream inputStream, int nColumns, int nNumericColumns)
throws IOException {
int remainingBytes = inputStream.available();
if (remainingBytes <= 0) {
return null;
}

TableMapEventMetadata result = new TableMapEventMetadata();

for (; remainingBytes > 0; inputStream.enterBlock(remainingBytes)) {
int code = inputStream.readInteger(1);

MetadataFieldType fieldType = MetadataFieldType.byCode(code);
if (fieldType == null) {
throw new IOException("Unsupported table metadata field type " + code);
}
if (MetadataFieldType.UNKNOWN_METADATA_FIELD_TYPE.equals(fieldType)) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received metadata field of unknown type");
}
continue;
}

int fieldLength = inputStream.readPackedInteger();

remainingBytes = inputStream.available();
inputStream.enterBlock(fieldLength);

switch (fieldType) {
case SIGNEDNESS:
result.setSignedness(readBooleanList(inputStream, nNumericColumns));
inputStream.skipToTheEndOfTheBlock();
break;
case DEFAULT_CHARSET:
result.setDefaultCharset(readDefaultCharset(inputStream));
break;
case COLUMN_CHARSET:
result.setColumnCharsets(readIntegers(inputStream));
break;
case COLUMN_NAME:
result.setColumnNames(readColumnNames(inputStream));
break;
case SET_STR_VALUE:
result.setSetStrValues(readTypeValues(inputStream));
break;
case ENUM_STR_VALUE:
result.setEnumStrValues(readTypeValues(inputStream));
break;
case GEOMETRY_TYPE:
result.setGeometryTypes(readIntegers(inputStream));
break;
case SIMPLE_PRIMARY_KEY:
result.setSimplePrimaryKeys(readIntegers(inputStream));
break;
case PRIMARY_KEY_WITH_PREFIX:
result.setPrimaryKeysWithPrefix(readIntegerPairs(inputStream));
break;
case ENUM_AND_SET_DEFAULT_CHARSET:
result.setEnumAndSetDefaultCharset(readDefaultCharset(inputStream));
break;
case ENUM_AND_SET_COLUMN_CHARSET:
result.setEnumAndSetColumnCharsets(readIntegers(inputStream));
result.setVisibility(readBooleanList(inputStream, nColumns));
break;
case VISIBILITY:
result.setVisibility(readBooleanList(inputStream, nColumns));
break;
default:
inputStream.enterBlock(remainingBytes);
throw new IOException("Unsupported table metadata field type " + code);
}
remainingBytes -= fieldLength;
}
return result;
}

private static BitSet readBooleanList(ByteArrayInputStream inputStream, int length)
throws IOException {
BitSet result = new BitSet();
// according to MySQL internals the amount of storage required for N columns is INT((N+7)/8)
// bytes
byte[] bytes = inputStream.read((length + 7) >> 3);
for (int i = 0; i < length; ++i) {
if ((bytes[i >> 3] & (1 << (7 - (i % 8)))) != 0) {
result.set(i);
}
}
return result;
}

private static DefaultCharset readDefaultCharset(ByteArrayInputStream inputStream)
throws IOException {
TableMapEventMetadata.DefaultCharset result = new TableMapEventMetadata.DefaultCharset();
result.setDefaultCharsetCollation(inputStream.readPackedInteger());
Map<Integer, Integer> charsetCollations = readIntegerPairs(inputStream);
if (!charsetCollations.isEmpty()) {
result.setCharsetCollations(charsetCollations);
}
return result;
}

private static List<Integer> readIntegers(ByteArrayInputStream inputStream) throws IOException {
List<Integer> result = new ArrayList<Integer>();
while (inputStream.available() > 0) {
result.add(inputStream.readPackedInteger());
}
return result;
}

private static List<String> readColumnNames(ByteArrayInputStream inputStream)
throws IOException {
List<String> columnNames = new ArrayList<String>();
while (inputStream.available() > 0) {
columnNames.add(inputStream.readLengthEncodedString());
}
return columnNames;
}

private static List<String[]> readTypeValues(ByteArrayInputStream inputStream)
throws IOException {
List<String[]> result = new ArrayList<String[]>();
while (inputStream.available() > 0) {
List<String> typeValues = new ArrayList<String>();
int valuesCount = inputStream.readPackedInteger();
for (int i = 0; i < valuesCount; ++i) {
typeValues.add(inputStream.readLengthEncodedString());
}
result.add(typeValues.toArray(new String[typeValues.size()]));
}
return result;
}

private static Map<Integer, Integer> readIntegerPairs(ByteArrayInputStream inputStream)
throws IOException {
Map<Integer, Integer> result = new LinkedHashMap<Integer, Integer>();
while (inputStream.available() > 0) {
int columnIndex = inputStream.readPackedInteger();
int columnCharset = inputStream.readPackedInteger();
result.put(columnIndex, columnCharset);
}
return result;
}

private enum MetadataFieldType {
SIGNEDNESS(1), // Signedness of numeric colums
DEFAULT_CHARSET(2), // Charsets of character columns
COLUMN_CHARSET(3), // Charsets of character columns
COLUMN_NAME(4), // Names of columns
SET_STR_VALUE(5), // The string values of SET columns
ENUM_STR_VALUE(6), // The string values is ENUM columns
GEOMETRY_TYPE(7), // The real type of geometry columns
SIMPLE_PRIMARY_KEY(8), // The primary key without any prefix
PRIMARY_KEY_WITH_PREFIX(9), // The primary key with some prefix
ENUM_AND_SET_DEFAULT_CHARSET(10), // Charsets of ENUM and SET columns
ENUM_AND_SET_COLUMN_CHARSET(11), // Charsets of ENUM and SET columns
VISIBILITY(12), // Column visibility (8.0.23 and newer)
UNKNOWN_METADATA_FIELD_TYPE(
128); // Returned with binlog-row-metadata=FULL from MySQL 8.0 in some cases

private final int code;

private MetadataFieldType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, MetadataFieldType> INDEX_BY_CODE;

static {
INDEX_BY_CODE = new HashMap<Integer, MetadataFieldType>();
for (MetadataFieldType fieldType : values()) {
INDEX_BY_CODE.put(fieldType.code, fieldType);
}
}

public static MetadataFieldType byCode(int code) {
return INDEX_BY_CODE.get(code);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -95,6 +97,66 @@ public void testDeserialize() throws IOException {
assertThat(eventData.toString()).isEqualTo(getExpectedEventData().toString());
}

@Test
public void testDeserializeMetadata() throws IOException {
byte[] data = {
// SIGNEDNESS
1,
// SIGNEDNESS length
5,
// 5 bytes for SIGNEDNESS
-74,
-39,
-101,
97,
0,
// COLUMN_CHARSET
3,
// COLUMN_CHARSET length
32,
// remaining 32 bytes for COLUMN_CHARSET
33,
33,
63,
33,
63,
63,
63,
63,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0,
-4,
-1,
0
};

TableMapEventMetadataDeserializer deserializer = new TableMapEventMetadataDeserializer();

TableMapEventMetadata metadata =
deserializer.deserialize(new ByteArrayInputStream(data), 59, 32);

assertThat(metadata.toString()).isEqualTo(getExpectedTableMapEventMetaData().toString());
}

private TableMapEventData getExpectedEventData() {
TableMapEventData eventData = new TableMapEventData();
// table_id
Expand Down Expand Up @@ -123,4 +185,23 @@ private TableMapEventData getExpectedEventData() {
eventData.setEventMetadata(metadata);
return eventData;
}

private TableMapEventMetadata getExpectedTableMapEventMetaData() {
// optional metadata fields
TableMapEventMetadata metadata = new TableMapEventMetadata();
BitSet signedness = new BitSet();

List<Integer> signedBits =
Arrays.asList(0, 2, 3, 5, 6, 8, 9, 11, 12, 15, 16, 19, 20, 22, 23, 25, 26, 31);

for (Integer signedBit : signedBits) {
signedness.set(signedBit);
}

metadata.setSignedness(signedness);
metadata.setColumnCharsets(
Arrays.asList(
33, 33, 63, 33, 63, 63, 63, 63, 255, 255, 255, 255, 255, 255, 255, 255));
return metadata;
}
}
Loading

0 comments on commit f970444

Please sign in to comment.