Skip to content

Commit

Permalink
[#3587] feat(trino-connector): Support create Internal connector like…
Browse files Browse the repository at this point in the history
… hive/iceberg/mysql/pg/memory (#3588)

### What changes were proposed in this pull request?

Support create Internal connector like hive/iceberg/mysql/pg/memory

### Why are the changes needed?

Fix: #3587

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

NO
  • Loading branch information
diqiu50 authored and jerryshao committed May 31, 2024
1 parent c487d51 commit 29827e5
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.trino.connector;

import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_OPERATION_FAILED;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;

import com.google.common.collect.ImmutableList;
import io.trino.spi.Plugin;
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import java.io.File;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class is mange the internal connector plugin and help to create the connector. */
public class GravitinoConnectorPluginManager {

private static final Logger LOG = LoggerFactory.getLogger(GravitinoConnectorPluginManager.class);

public static final String CONNECTOR_HIVE = "hive";
public static final String CONNECTOR_ICEBERG = "iceberg";
public static final String CONNECTOR_MYSQL = "mysql";
public static final String CONNECTOR_POSTGRESQL = "postgresql";
public static final String CONNECTOR_MEMORY = "memory";

private static final String PLUGIN_NAME_PREFIX = "gravitino-";
private static final String PLUGIN_CLASSLOADER_CLASS_NAME = "io.trino.server.PluginClassLoader";

private static volatile GravitinoConnectorPluginManager instance;

private final Class<?> pluginLoaderClass;

private static final Set<String> usePlugins =
Set.of(
CONNECTOR_HIVE,
CONNECTOR_ICEBERG,
CONNECTOR_MYSQL,
CONNECTOR_POSTGRESQL,
CONNECTOR_MEMORY);

private final Map<String, ClassLoader> pluginClassLoaders = new HashMap<>();
private final ClassLoader appClassloader;

public GravitinoConnectorPluginManager(ClassLoader classLoader) {
try {
// Retrieve plugin directory
// The Trino plugin director like:
// /data/trino/plugin/hive/**.jar
// /data/trino/plugin/gravitino/**.jar
// /data/trino/plugin/mysql/**.jar
String jarPath =
GravitinoConnectorPluginManager.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
.toURI()
.getPath();
String pluginDir = Paths.get(jarPath).getParent().getParent().toString();

this.appClassloader = classLoader;
pluginLoaderClass = appClassloader.loadClass(PLUGIN_CLASSLOADER_CLASS_NAME);

// Load all plugins
for (String pluginName : usePlugins) {
loadPlugin(pluginDir, pluginName);
LOG.info("Load plugin {}/{} successful", pluginDir, pluginName);
}
} catch (Exception e) {
throw new TrinoException(GRAVITINO_RUNTIME_ERROR, "Error while loading plugins", e);
}
}

public static GravitinoConnectorPluginManager instance(ClassLoader classLoader) {
if (instance != null) {
return instance;
}
synchronized (GravitinoConnectorPluginManager.class) {
if (instance == null) {
instance = new GravitinoConnectorPluginManager(classLoader);
}
return instance;
}
}

private void loadPlugin(String pluginPath, String pluginName) {
String dirName = pluginPath + "." + pluginName;
File directory = new File(dirName);
if (!directory.exists()) {
throw new TrinoException(
GRAVITINO_RUNTIME_ERROR, "Can not found plugin directory " + pluginPath);
}

File[] pluginFiles = directory.listFiles();
if (pluginFiles == null || pluginFiles.length == 0) {
throw new TrinoException(
GRAVITINO_RUNTIME_ERROR, "Can not found any files plugin directory " + pluginPath);
}
List<URL> files =
Arrays.stream(pluginFiles)
.map(File::toURI)
.map(
uri -> {
try {
return uri.toURL();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.toList();

try {
Constructor<?> constructor =
pluginLoaderClass.getConstructor(String.class, List.class, ClassLoader.class, List.class);
String classLoaderName = PLUGIN_NAME_PREFIX + pluginName;
// Load Trino SPI package and other dependencies refer to io.trino.server.PluginClassLoader
Object pluginClassLoader =
constructor.newInstance(
classLoaderName,
files,
appClassloader,
List.of(
"io.trino.spi.",
"com.fasterxml.jackson.annotation.",
"io.airlift.slice.",
"org.openjdk.jol.",
"io.opentelemetry.api.",
"io.opentelemetry.context."));
pluginClassLoaders.put(pluginName, (ClassLoader) pluginClassLoader);
} catch (Exception e) {
throw new TrinoException(
GRAVITINO_RUNTIME_ERROR, "Failed to create Plugin class loader " + pluginName, e);
}
}

public Connector createConnector(
String connectorName, Map<String, String> config, ConnectorContext context) {
try {
ClassLoader pluginClassLoader = pluginClassLoaders.get(connectorName);
if (pluginClassLoader == null) {
throw new TrinoException(
GRAVITINO_OPERATION_FAILED,
"Gravitino connector does not support connector " + connectorName);
}

ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
if (plugins.isEmpty()) {
throw new TrinoException(
GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR,
String.format("The %s plugin does not found connector SIP interface", connectorName));
}
Plugin plugin = plugins.get(0);

try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
if (plugin.getConnectorFactories() == null
|| !plugin.getConnectorFactories().iterator().hasNext()) {
throw new TrinoException(
GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR,
String.format(
"The %s plugin does not contains any ConnectorFactories", connectorName));
}
ConnectorFactory connectorFactory = plugin.getConnectorFactories().iterator().next();
Connector connector = connectorFactory.create(connectorName, config, context);
LOG.info("create connector {} with config {} successful", connectorName, config);
return connector;
}
} catch (Exception e) {
throw new TrinoException(
GRAVITINO_RUNTIME_ERROR, "Failed to create connector " + connectorName, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum GravitinoErrorCode implements ErrorCodeSupplier {
GRAVITINO_CATALOG_ALREADY_EXISTS(20, EXTERNAL),
GRAVITINO_METALAKE_ALREADY_EXISTS(21, EXTERNAL),
GRAVITINO_OPERATION_FAILED(22, EXTERNAL),
GRAVITINO_RUNTIME_ERROR(23, EXTERNAL),
;

// suppress ImmutableEnumChecker because ErrorCode is outside the project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static java.util.Collections.emptyList;

import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.connector.Connector;
import io.trino.spi.session.PropertyMetadata;
import java.util.List;
import java.util.Map;
Expand All @@ -25,8 +24,8 @@ default List<PropertyMetadata<?>> getTableProperties() {
/** @return Return internal connector config with Trino. */
Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception;

/** @return Return internal connector with Trino. */
Connector buildInternalConnector(Map<String, String> config) throws Exception;
/** @return Return internal connector name with Trino. */
String internalConnectorName();

/** @return SchemaProperties list that used to validate schema properties. */
default List<PropertyMetadata<?>> getSchemaProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.trino.connector.GravitinoConnector;
import com.datastrato.gravitino.trino.connector.GravitinoConnectorPluginManager;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import com.google.common.base.Preconditions;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.session.PropertyMetadata;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,6 +86,7 @@ static class Builder {
private final CatalogConnectorAdapter connectorAdapter;
private GravitinoCatalog catalog;
private GravitinoMetalake metalake;
private ConnectorContext context;

Builder(CatalogConnectorAdapter connectorAdapter) {
this.connectorAdapter = connectorAdapter;
Expand All @@ -103,12 +106,21 @@ Builder withMetalake(GravitinoMetalake metalake) {
return this;
}

Builder withContext(ConnectorContext context) {
this.context = context;
return this;
}

CatalogConnectorContext build() throws Exception {
Preconditions.checkArgument(metalake != null, "metalake is not null");
Preconditions.checkArgument(catalog != null, "catalog is not null");
Preconditions.checkArgument(context != null, "context is not null");
Map<String, String> connectorConfig = connectorAdapter.buildInternalConnectorConfig(catalog);
Connector connector = connectorAdapter.buildInternalConnector(connectorConfig);
String internalConnectorName = connectorAdapter.internalConnectorName();

Connector connector =
GravitinoConnectorPluginManager.instance(context.getClass().getClassLoader())
.createConnector(internalConnectorName, connectorConfig, context);
return new CatalogConnectorContext(catalog, metalake, connector, connectorAdapter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,9 @@ public Connector createConnector(
GravitinoCatalog catalog = GravitinoCatalog.fromJson(catalogConfig);
CatalogConnectorContext.Builder builder =
catalogConnectorFactory.createCatalogConnectorContextBuilder(catalog);
builder.withMetalake(
metalakes.computeIfAbsent(catalog.getMetalake(), this::retrieveMetalake));
builder
.withMetalake(metalakes.computeIfAbsent(catalog.getMetalake(), this::retrieveMetalake))
.withContext(context);

CatalogConnectorContext connectorContext = builder.build();
catalogConnectors.put(connectorName, connectorContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
*/
package com.datastrato.gravitino.trino.connector.catalog.hive;

import static com.datastrato.gravitino.trino.connector.GravitinoConnectorPluginManager.CONNECTOR_HIVE;

import com.datastrato.gravitino.catalog.property.PropertyConverter;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import com.datastrato.gravitino.trino.connector.catalog.HasPropertyMeta;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.connector.Connector;
import io.trino.spi.session.PropertyMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.NotImplementedException;

/** Transforming Hive connector configuration and components into Gravitino connector. */
public class HiveConnectorAdapter implements CatalogConnectorAdapter {
Expand All @@ -41,8 +41,8 @@ public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog
}

@Override
public Connector buildInternalConnector(Map<String, String> config) throws Exception {
throw new NotImplementedException();
public String internalConnectorName() {
return CONNECTOR_HIVE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
*/
package com.datastrato.gravitino.trino.connector.catalog.iceberg;

import static com.datastrato.gravitino.trino.connector.GravitinoConnectorPluginManager.CONNECTOR_ICEBERG;
import static java.util.Collections.emptyList;

import com.datastrato.gravitino.catalog.property.PropertyConverter;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.connector.Connector;
import io.trino.spi.session.PropertyMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.NotImplementedException;

/** Transforming Iceberg connector configuration and components into Gravitino connector. */
public class IcebergConnectorAdapter implements CatalogConnectorAdapter {
Expand All @@ -31,18 +29,12 @@ public IcebergConnectorAdapter() {
@Override
public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog)
throws Exception {
Map<String, String> config = new HashMap<>();
config.put("connector.name", "iceberg");

Map<String, String> properties =
catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
config.putAll(properties);
return config;
return catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
}

@Override
public Connector buildInternalConnector(Map<String, String> config) throws Exception {
throw new NotImplementedException();
public String internalConnectorName() {
return CONNECTOR_ICEBERG;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.gravitino.trino.connector.catalog.jdbc.mysql;

import static com.datastrato.gravitino.trino.connector.GravitinoConnectorPluginManager.CONNECTOR_MYSQL;
import static java.util.Collections.emptyList;

import com.datastrato.gravitino.catalog.property.PropertyConverter;
Expand All @@ -12,12 +13,9 @@
import com.datastrato.gravitino.trino.connector.catalog.HasPropertyMeta;
import com.datastrato.gravitino.trino.connector.catalog.jdbc.JDBCCatalogPropertyConverter;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.connector.Connector;
import io.trino.spi.session.PropertyMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.NotImplementedException;

/** Transforming MySQL connector configuration and components into Gravitino connector. */
public class MySQLConnectorAdapter implements CatalogConnectorAdapter {
Expand All @@ -33,18 +31,12 @@ public MySQLConnectorAdapter() {
@Override
public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog)
throws Exception {
Map<String, String> config = new HashMap<>();
config.put("connector.name", "mysql");

Map<String, String> properties =
catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
config.putAll(properties);
return config;
return catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
}

@Override
public Connector buildInternalConnector(Map<String, String> config) throws Exception {
throw new NotImplementedException();
public String internalConnectorName() {
return CONNECTOR_MYSQL;
}

@Override
Expand Down
Loading

0 comments on commit 29827e5

Please sign in to comment.