Skip to content
This repository has been archived by the owner on Jun 21, 2020. It is now read-only.

Commit

Permalink
default schemas exists always;
Browse files Browse the repository at this point in the history
updated dependencies for Presto 0.203;
0.203.1-SNAPSHOT
  • Loading branch information
MartinWeindel committed Jun 11, 2018
1 parent 44a8b9e commit 629d1e4
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 48 deletions.
12 changes: 1 addition & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The [Presto](https://prestodb.io/) Kudu connector allows querying, inserting and
| Apache Kudu 1.5.0 | yes | by full API- and ABI-compatibility of Kudu Java Client 1.7.0 |
| Apache Kudu 1.4.0 | yes | by full API- and ABI-compatibility of Kudu Java Client 1.7.0 |
| | | | |
| Presto 0.198 | yes | tests ok |
| Presto 0.203 | yes | tests ok |

Support for older Presto versions see [release history](/~https://github.com/MartinWeindel/presto-kudu/releases)

Expand Down Expand Up @@ -82,16 +82,6 @@ A Kudu table containing a dot is considered as a schema/table combination, e.g.
`dev.mytable` is mapped to the Presto table `kudu.dev.mytable.
Only Kudu table names in lower case are currently supported.

Before using any tablets, it is needed to create the default schema, e.g.
```sql
CREATE SCHEMA default;
```

### Example
- Create default schema if needed:
```sql
CREATE SCHEMA IF NOT EXISTS default;
```

- Now you can use any Kudu table, if it is lower case and contains no dots.
- Alternatively you can create a users table with
Expand Down
21 changes: 14 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
<groupId>ml.littlebulb.presto.kudu</groupId>
<artifactId>presto-kudu</artifactId>
<name>Kudu Connector for Presto</name>
<version>0.198.2-SNAPSHOT</version>
<version>0.203.1-SNAPSHOT</version>
<packaging>presto-plugin</packaging>

<properties>
<presto.version>0.198</presto.version>
<presto.version>0.203</presto.version>
<kudu.version>1.7.0</kudu.version>
<dep.airlift.version>0.165</dep.airlift.version>
<dep.airlift.version>0.169</dep.airlift.version>
<dep.testng.version>6.10</dep.testng.version>
<dep.slice.version>0.33</dep.slice.version>
<dep.guice.version>4.2.0</dep.guice.version>
<dep.guava.version>24.1-jre</dep.guava.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -67,13 +69,19 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
<version>${dep.guava.version}</version>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>4.0</version>
<version>${dep.guice.version}</version>
</dependency>

<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<version>${dep.guice.version}</version>
</dependency>

<!--presto integrated-->
Expand Down Expand Up @@ -101,11 +109,10 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.8.1</version>
<version>2.8.9</version>
<scope>provided</scope>
</dependency>


<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
import java.util.stream.IntStream;

import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.collect.ImmutableList.toImmutableList;


public class NativeKuduClientSession implements KuduClientSession {
public static final String NULL_SCHEMA = "default";
public static final String DEFAULT_SCHEMA = "default";
private final Logger log = Logger.get(getClass());
private final KuduConnectorId connectorId;
private final String tenantPrefix;
Expand Down Expand Up @@ -103,13 +104,12 @@ private List<String> listSchemaNamesFromTablets() {
final String prefix = tenantPrefix;
List<String> tables = internalListTables(prefix);
LinkedHashSet<String> schemas = new LinkedHashSet<>();
schemas.add(DEFAULT_SCHEMA);
for (String table : tables) {
int index = table.indexOf('.', prefix.length());
if (index > prefix.length()) {
String schema = table.substring(prefix.length(), index);
schemas.add(schema);
} else {
schemas.add(NULL_SCHEMA);
}
}
return ImmutableList.copyOf(schemas);
Expand All @@ -134,7 +134,7 @@ private List<String> internalListTables(String prefix) {
public List<SchemaTableName> listTables(String schemaNameOrNull) {
final int offset = tenantPrefix.length();
final String prefix;
if (schemaNameOrNull == null || schemaNameOrNull.equals(NULL_SCHEMA)) {
if (schemaNameOrNull == null || schemaNameOrNull.equals(DEFAULT_SCHEMA)) {
prefix = tenantPrefix;
} else {
prefix = tenantPrefix + schemaNameOrNull + ".";
Expand All @@ -147,7 +147,7 @@ public List<SchemaTableName> listTables(String schemaNameOrNull) {
String table = name.substring(index + 1);
return new SchemaTableName(schema, table);
} else {
String schema = NULL_SCHEMA;
String schema = DEFAULT_SCHEMA;
String table = name.substring(offset);
return new SchemaTableName(schema, table);
}
Expand Down Expand Up @@ -253,18 +253,25 @@ public KuduSession newSession() {

@Override
public void createSchema(String schemaName) {
try {
KuduTable schemasTable = getSchemasTable();
KuduSession session = client.newSession();
if (DEFAULT_SCHEMA.equals(schemaName)) {
throw new SchemaAlreadyExistsException(schemaName);
}
else {
try {
Upsert upsert = schemasTable.newUpsert();
fillSchemaRow(upsert.getRow(), schemaName);
session.apply(upsert);
} finally {
session.close();
KuduTable schemasTable = getSchemasTable();
KuduSession session = client.newSession();
try {
Upsert upsert = schemasTable.newUpsert();
fillSchemaRow(upsert.getRow(), schemaName);
session.apply(upsert);
}
finally {
session.close();
}
}
catch (KuduException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
} catch (KuduException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
}

Expand All @@ -275,21 +282,28 @@ private void fillSchemaRow(PartialRow row, String schemaName) {

@Override
public void dropSchema(String schemaName) {
try {
for (SchemaTableName table: listTables(schemaName)) {
dropTable(table);
}
KuduTable schemasTable = getSchemasTable();
KuduSession session = client.newSession();
if (DEFAULT_SCHEMA.equals(schemaName)) {
throw new PrestoException(GENERIC_USER_ERROR, "Deleting default schema not allowed.");
}
else {
try {
Delete delete = schemasTable.newDelete();
fillSchemaRow(delete.getRow(), schemaName);
session.apply(delete);
} finally {
session.close();
for (SchemaTableName table : listTables(schemaName)) {
dropTable(table);
}
KuduTable schemasTable = getSchemasTable();
KuduSession session = client.newSession();
try {
Delete delete = schemasTable.newDelete();
fillSchemaRow(delete.getRow(), schemaName);
session.apply(delete);
}
finally {
session.close();
}
}
catch (KuduException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
} catch (KuduException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
}

Expand Down Expand Up @@ -374,7 +388,7 @@ public void renameColumn(SchemaTableName schemaTableName, String oldName, String
}
}

private static enum RangePartitionChange {
private enum RangePartitionChange {
ADD, DROP
}

Expand Down Expand Up @@ -620,7 +634,7 @@ private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token,

private String toRawName(SchemaTableName schemaTableName) {
String rawName;
if (schemaTableName.getSchemaName().equals(NULL_SCHEMA)) {
if (schemaTableName.getSchemaName().equals(DEFAULT_SCHEMA)) {
rawName = tenantPrefix + schemaTableName.getTableName();
} else {
rawName = tenantPrefix + schemaTableName.getSchemaName() + "." + schemaTableName.getTableName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ml.littlebulb.presto.kudu;

import com.facebook.presto.spi.PrestoException;

import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static java.lang.String.format;

public class SchemaAlreadyExistsException
extends PrestoException
{
private final String schemaName;

public SchemaAlreadyExistsException(String schemaName)
{
this(schemaName, format("Schema already exists: '%s'", schemaName));
}

public SchemaAlreadyExistsException(String schemaName, String message)
{
super(ALREADY_EXISTS, message);
this.schemaName = schemaName;
}

public String getSchemaName()
{
return schemaName;
}
}

0 comments on commit 629d1e4

Please sign in to comment.