diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index e433aea..71e7017 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "confluent.cregistry" -version = "0.3.0" +version = "0.4.0" authors = ["Ballerina"] export=["confluent.cregistry"] keywords = ["confluent", "schema_registry", "avro", "serdes"] @@ -15,26 +15,26 @@ graalvmCompatible = true [[platform.java21.dependency]] groupId = "io.ballerina.lib" artifactId = "schema-registry-native" -version = "0.3.0" -path = "../native/build/libs/confluent.cregistry-native-0.3.0.jar" +version = "0.4.0" +path = "../native/build/libs/confluent.cregistry-native-0.4.0.jar" [[platform.java21.dependency]] groupId = "io.confluent" artifactId = "kafka-schema-registry-client" -version = "5.3.2" -path = "./lib/kafka-schema-registry-client-5.3.2.jar" +version = "7.8.0" +path = "./lib/kafka-schema-registry-client-7.8.0.jar" [[platform.java21.dependency]] groupId = "io.confluent" artifactId = "common-config" -version = "5.3.0" -path = "./lib/common-config-5.3.0.jar" +version = "7.8.0" +path = "./lib/common-config-7.8.0.jar" [[platform.java21.dependency]] groupId = "io.confluent" artifactId = "common-utils" -version = "5.3.0" -path = "./lib/common-utils-5.3.0.jar" +version = "7.8.0" +path = "./lib/common-utils-7.8.0.jar" [[platform.java21.dependency]] groupId = "org.apache.kafka" @@ -51,17 +51,23 @@ path = "./lib/avro-1.11.4.jar" [[platform.java21.dependency]] groupId = "com.fasterxml.jackson.core" artifactId = "jackson-core" -version = "2.17.0" -path = "./lib/jackson-core-2.17.0.jar" +version = "2.18.1" +path = "./lib/jackson-core-2.18.1.jar" [[platform.java11.dependency]] groupId = "com.fasterxml.jackson.core" artifactId = "jackson-annotations" -version = "2.17.0" -path = "./lib/jackson-annotations-2.17.0.jar" +version = "2.18.1" +path = "./lib/jackson-annotations-2.18.1.jar" [[platform.java11.dependency]] groupId = "com.fasterxml.jackson.core" artifactId = "jackson-databind" -version = "2.17.0" -path = "./lib/jackson-databind-2.17.0.jar" +version = "2.18.1" +path = "./lib/jackson-databind-2.18.1.jar" + +[[platform.java11.dependency]] +groupId = "com.google.guava" +artifactId = "guava" +version = "32.0.1-jre" +path = "./lib/guava-32.0.1-jre.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 2471b91..9f78456 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -76,7 +76,7 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.13.2" +version = "2.13.3" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, @@ -335,7 +335,7 @@ dependencies = [ [[package]] org = "ballerinax" name = "confluent.cregistry" -version = "0.3.0" +version = "0.4.0" dependencies = [ {org = "ballerina", name = "http"}, {org = "ballerina", name = "jballerina.java"}, diff --git a/ballerina/build.gradle b/ballerina/build.gradle index 1265f1b..e627640 100644 --- a/ballerina/build.gradle +++ b/ballerina/build.gradle @@ -86,6 +86,9 @@ dependencies { externalJars(group: 'org.apache.avro', name: 'avro', version: "${avroVersion}") { transitive = false } + externalJars(group: 'com.google.guava', name: 'guava', version: '32.0.1-jre') { + transitive = false + } } task updateTomlFiles { diff --git a/ballerina/client.bal b/ballerina/client.bal index 384c37e..a0bd940 100644 --- a/ballerina/client.bal +++ b/ballerina/client.bal @@ -47,13 +47,4 @@ public isolated client class Client { remote isolated function getSchemaById(int id) returns string|Error = @java:Method { 'class: "io.ballerina.lib.confluent.registry.CustomSchemaRegistryClient" } external; - - # Retrieves the ID for the given subject and schema from the schema registry client. - # - # + subject - The subject of the schema - # + schema - The Avro schema - # + return - Returns the ID of the schema if found, or an `cregistry:Error` if an error occurs - remote isolated function getId(string subject, string schema) returns int|Error = @java:Method { - 'class: "io.ballerina.lib.confluent.registry.CustomSchemaRegistryClient" - } external; } diff --git a/ballerina/tests/test.bal b/ballerina/tests/test.bal index cfc63f9..133515e 100644 --- a/ballerina/tests/test.bal +++ b/ballerina/tests/test.bal @@ -85,24 +85,6 @@ public function testGetInvalidSchemaById() returns error? { } } -@test:Config {} -public function testGetId() returns error? { - string schema = string ` - { - "namespace": "example.avro", - "type": "record", - "name": "Student", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_color", "type": ["string", "null"]} - ] - }`; - - int registerId = check schemaRegistryClient->register(subject, schema); - int schemaId = check schemaRegistryClient->getId(subject, schema); - test:assertEquals(registerId, schemaId); -} - @test:Config {} public function testInvalidClientInitiation() returns error? { map originals = {}; diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 05be23f..50890b1 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -65,3 +65,9 @@ groupId = "com.fasterxml.jackson.core" artifactId = "jackson-databind" version = "@jackson.version@" path = "./lib/jackson-databind-@jackson.version@.jar" + +[[platform.java11.dependency]] +groupId = "com.google.guava" +artifactId = "guava" +version = "32.0.1-jre" +path = "./lib/guava-32.0.1-jre.jar" diff --git a/docs/spec/spec.md b/docs/spec/spec.md index 635f022..487276b 100644 --- a/docs/spec/spec.md +++ b/docs/spec/spec.md @@ -25,9 +25,7 @@ The conforming implementation of the specification is released and included in t * 3.1 [The `getSchemaById` API](#31-the-getschemabyid-api) 4. [Schema registration](#4-schema-registration) * 4.1 [The `register` API](#41-the-register-api) -5. [Fetch a schema ID](#5-fetch-a-schema-id) - * 5.1 [The `getId` API](#51-the-getid-api) -6. [The `cregistry:Error` Type](#6-the-cregistryerror-type) +5. [The `cregistry:Error` Type](#6-the-cregistryerror-type) ## 1. Overview @@ -83,19 +81,7 @@ The `register` method registers a new schema to the registry. int schemaId = check registry->register("subject", "schema"); ``` -## 5. Fetch a schema ID - -The Confluent Schema Registry module provides a method to fetch a schema ID from the registry. - -### 5.1 The `getId` API - -The `getId` method retrieves a schema ID from the registry. - -```ballerina -int schemaId = check registry.getId(subject, schema); -``` - -## 6. The `cregistry:Error` Type +## 5. The `cregistry:Error` Type The `cregistry:Error` type represents all the errors related to the Confluent Schema Registry module. This is a subtype of the Ballerina `error` type. diff --git a/examples/user-registration/main.bal b/examples/user-registration/main.bal index 4d851d7..226dfb0 100644 --- a/examples/user-registration/main.bal +++ b/examples/user-registration/main.bal @@ -45,7 +45,4 @@ public function main() returns error? { int registerId = check schemaRegistryClient->register(subject, schema); io:println("Registered Id for the schema: ", registerId); - - int schemaId = check schemaRegistryClient->getId(subject, schema); - io:println("Id for the given schema: ", schemaId); } diff --git a/gradle.properties b/gradle.properties index 88b4248..86d9d18 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ org.gradle.caching=true group=io.ballerina.lib -version=0.3.1-SNAPSHOT +version=0.4.1-SNAPSHOT ballerinaLangVersion=2201.11.0 checkstylePluginVersion=10.12.0 @@ -11,10 +11,11 @@ releasePluginVersion=2.8.0 ballerinaGradlePluginVersion=2.3.0 jacocoVersion=0.8.10 -jacksonVersion=2.17.0 +jacksonVersion=2.18.1 avroVersion=1.11.4 kafkaAvroVersion=5.3.0 kafkaClientVersion=3.7.2 -kafkaSchemaRegistryVersion=5.3.2 -commonConfigVersion=5.3.0 -commonUtilsVersion=5.3.0 +kafkaSchemaRegistryVersion=7.8.0 +commonConfigVersion=7.8.0 +commonUtilsVersion=7.8.0 + diff --git a/native/build.gradle b/native/build.gradle index 1e758c4..817a0e8 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -45,6 +45,7 @@ dependencies { implementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: "${kafkaSchemaRegistryVersion}" implementation group: 'io.confluent', name: 'common-config', version: "${commonConfigVersion}" implementation group: 'io.confluent', name: 'common-utils', version: "${commonUtilsVersion}" + implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' } tasks.withType(JavaCompile) { diff --git a/native/src/main/java/io/ballerina/lib/confluent/registry/CustomSchemaRegistryClient.java b/native/src/main/java/io/ballerina/lib/confluent/registry/CustomSchemaRegistryClient.java index d917981..2b15f5e 100644 --- a/native/src/main/java/io/ballerina/lib/confluent/registry/CustomSchemaRegistryClient.java +++ b/native/src/main/java/io/ballerina/lib/confluent/registry/CustomSchemaRegistryClient.java @@ -22,6 +22,7 @@ import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; +import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; @@ -30,12 +31,15 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static io.ballerina.lib.confluent.registry.ModuleUtils.NATIVE_CLIENT; import static io.ballerina.lib.confluent.registry.Utils.CLIENT_INVOCATION_ERROR; public final class CustomSchemaRegistryClient { + public static final String AVRO = "AVRO"; + private CustomSchemaRegistryClient() {} public static void generateSchemaRegistryClient(BObject registryClient, BMap config) { @@ -66,29 +70,29 @@ public static Object register(BObject registryClient, BString subject, BString s try { Schema.Parser parser = new Schema.Parser(); Schema avroSchema = parser.parse(schema.getValue()); - return schemaRegistryClient.register(subject.getValue(), avroSchema); + ParsedSchema parsedSchema = getParsedSchema(schemaRegistryClient, avroSchema); + return schemaRegistryClient.register(subject.getValue(), parsedSchema); } catch (Exception e) { return Utils.createError(CLIENT_INVOCATION_ERROR, e); } } - public static Object getId(BObject registryClient, BString subject, BString schema) { + public static Object getSchemaById(BObject registryClient, int i) { SchemaRegistryClient schemaRegistryClient = (SchemaRegistryClient) registryClient.getNativeData(NATIVE_CLIENT); try { - Schema.Parser parser = new Schema.Parser(); - Schema avroSchema = parser.parse(schema.getValue()); - return schemaRegistryClient.getId(subject.getValue(), avroSchema); - } catch (Exception e) { + return StringUtils.fromString(schemaRegistryClient.getSchemaById(i).canonicalString()); + } catch (IOException | RestClientException e) { return Utils.createError(CLIENT_INVOCATION_ERROR, e); } } - public static Object getSchemaById(BObject registryClient, int i) { - SchemaRegistryClient schemaRegistryClient = (SchemaRegistryClient) registryClient.getNativeData(NATIVE_CLIENT); - try { - return StringUtils.fromString(schemaRegistryClient.getById(i).toString()); - } catch (IOException | RestClientException e) { - return Utils.createError(CLIENT_INVOCATION_ERROR, e); + private static ParsedSchema getParsedSchema(SchemaRegistryClient schemaRegistryClient, + Schema avroSchema) { + Optional parsedSchema = schemaRegistryClient.parseSchema(AVRO, + avroSchema.toString(), null); + if (parsedSchema.isEmpty()) { + throw Utils.createError(CLIENT_INVOCATION_ERROR, new Throwable("Error occurred while parsing the schema")); } + return parsedSchema.get(); } }