Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

catalogTable support default properties #247

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC;

/**
* catalog support.
*/
Expand Down Expand Up @@ -97,6 +99,20 @@ public void putSchema(ObjectPath tablePath, CatalogBaseTable table, String forma
pulsarMetadataReader.putSchema(topicName, tableSchemaToPulsarSchema(format, schema));
}

/**
* Get default table properties by pulsar schemaRegistry.
* @param objectPath
* @return table properties
* @throws IncompatibleSchemaException
*/
public Map<String, String> getCatalogTableDefaultProperties(ObjectPath objectPath) throws IncompatibleSchemaException {
String topicName = objectPath2TopicName(objectPath);
final SchemaInfo pulsarSchema = pulsarMetadataReader.getPulsarSchema(topicName);
Map<String, String> properties = schemaTranslator.schemaInfo2TableProperties(pulsarSchema);
properties.put(TOPIC.key(), topicName);
return properties;
}

private SchemaInfo tableSchemaToPulsarSchema(String format, TableSchema schema) throws IncompatibleSchemaException {
// The exclusion logic for the key is not handled correctly here when the user sets the key-related fields using pulsar
final DataType physicalRowDataType = schema.toPhysicalRowDataType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;

import java.io.Serializable;
import java.util.Map;

/**
* schema translator.
Expand All @@ -49,6 +50,8 @@ public abstract FieldsDataType pulsarSchemaToFieldsDataType(SchemaInfo pulsarSch

public abstract DataType schemaInfo2SqlType(SchemaInfo si) throws IncompatibleSchemaException;

public abstract Map<String, String> schemaInfo2TableProperties(SchemaInfo si) throws IncompatibleSchemaException;

public static Schema atomicType2PulsarSchema(DataType flinkType) throws IncompatibleSchemaException {
LogicalTypeRoot type = flinkType.getLogicalType().getTypeRoot();
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.formats.avro.AvroFormatFactory;
import org.apache.flink.formats.json.JsonFormatFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.formats.raw.RawFormatFactory;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
Expand All @@ -26,8 +29,13 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
Expand All @@ -40,7 +48,9 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -49,6 +59,10 @@
import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.MESSAGE_ID_NAME;
import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.PUBLISH_TIME_NAME;
import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_ATTRIBUTE_NAME;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;

/**
* flink 1.11 schema translator.
Expand Down Expand Up @@ -263,6 +277,64 @@ public FieldsDataType pulsarSchemaToFieldsDataType(SchemaInfo schemaInfo)
return (FieldsDataType) DataTypes.ROW(mainSchema.toArray(new DataTypes.Field[0]));
}

@Override
public Map<String, String> schemaInfo2TableProperties(SchemaInfo si) throws IncompatibleSchemaException {

Map<String, String> properties = new HashMap<>();

if (si.getType().equals(SchemaType.KEY_VALUE)) {

KeyValue<SchemaInfo, SchemaInfo> keyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si);
String keyFormat = nonKeyValueSchema2Format(keyValue.getKey().getType());
if (StringUtils.isNotBlank(keyFormat)) {
properties.put(KEY_FORMAT.key(), keyFormat);
String keyFields = getSchemaKeyFieldNames(keyValue.getKey());
if (StringUtils.isNotBlank(keyFields)) {
properties.put(KEY_FIELDS.key(), keyFields);
} else {
throw new IncompatibleSchemaException(String.format("Schema %s key format is defined, " +
"the 'key.fields' option is required.",
si.getName())
);
}
}

properties.put(VALUE_FORMAT.key(), nonKeyValueSchema2Format(keyValue.getValue().getType()));
} else {
String format = nonKeyValueSchema2Format(si.getType());
if (StringUtils.isNotBlank(format)) {
properties.put(FORMAT.key(), format);
}
}
return properties;
}

private static String getSchemaKeyFieldNames(SchemaInfo si) {
if (si.getType().equals(SchemaType.NONE)) {
return null;
} else if (si.getType().isPrimitive()) {
return "key";
} else if (si.getType().equals(SchemaType.JSON) || si.getType().equals(SchemaType.AVRO)) {
return GenericJsonSchema.of(si).getFields().stream().map(Field::getName).collect(Collectors.joining(","));
} else {
throw new UnsupportedOperationException(String.format("We do not support %s currently.", si.getType()));
}
}

private static String nonKeyValueSchema2Format(SchemaType schemaType) {
if (schemaType.equals(SchemaType.NONE)) {
return null;
} else if (schemaType.isPrimitive()) {
return RawFormatFactory.IDENTIFIER;
} else if (schemaType.equals(SchemaType.JSON)) {
return JsonFormatFactory.IDENTIFIER;
} else if (schemaType.equals(SchemaType.AVRO)) {
return AvroFormatFactory.IDENTIFIER;
} else {
throw new UnsupportedOperationException(String.format("We do not support %s currently.", schemaType));
}
}

public static final List<DataTypes.Field> METADATA_FIELDS = ImmutableList.of(
DataTypes.FIELD(
KEY_ATTRIBUTE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
try {
return new CatalogTableImpl(catalogSupport.getTableSchema(tablePath), properties, "");
//deep-copy properties
Map<String, String> catalogTableProperties = new HashMap<>();
catalogTableProperties.putAll(properties);
catalogTableProperties.putAll(catalogSupport.getCatalogTableDefaultProperties(tablePath));
return new CatalogTableImpl(catalogSupport.getTableSchema(tablePath), catalogTableProperties, "");
} catch (PulsarAdminException.NotFoundException e) {
throw new TableNotExistException(getName(), tablePath, e);
} catch (PulsarAdminException | IncompatibleSchemaException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void validate(DescriptorProperties properties) {
properties.validateString(CATALOG_SERVICE_URL, false, 1);
properties.validateString(CATALOG_ADMIN_URL, false, 1);
properties.validateInt(CATALOG_DEFAULT_PARTITIONS, true, 1);
properties.validateString(FormatDescriptorValidator.FORMAT, false);
properties.validateString(FormatDescriptorValidator.FORMAT, true);
validateStartingOffsets(properties);
}

Expand Down