Skip to content

Commit

Permalink
Kafka adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Apr 14, 2023
1 parent 2e493c0 commit b2ef1d5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
1 change: 1 addition & 0 deletions hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
implementation project(':hoptimator-planner')
implementation project(':hoptimator-catalog')
implementation project(':hoptimator-flink-iterator')
implementation project(':hoptimator-kafka-adapter')

implementation libs.avro
implementation libs.sqlline
Expand Down
26 changes: 26 additions & 0 deletions hoptimator-kafka-adapter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'java'
id 'idea'
}

dependencies {
implementation project(':hoptimator-catalog')
implementation libs.kafkaClients
implementation libs.calciteCore
implementation libs.flinkConnectorKafka

testImplementation libs.junit
testImplementation libs.assertj
}

tasks.withType(JavaCompile) {
options.compilerArgs << '-Xlint:deprecation'
options.compilerArgs << '-Xlint:unchecked'
}

idea {
module {
downloadJavadoc = true
downloadSources = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.hoptimator.catalog.kafka;

import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;

import org.apache.kafka.clients.admin.AdminClient;

import com.linkedin.hoptimator.catalog.ConfigProvider;
import com.linkedin.hoptimator.catalog.DataType;
import com.linkedin.hoptimator.catalog.Database;
import com.linkedin.hoptimator.catalog.DatabaseSchema;
import com.linkedin.hoptimator.catalog.TableLister;
import com.linkedin.hoptimator.catalog.TableResolver;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class RawKafkaSchemaFactory implements SchemaFactory {

@Override
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
DataType.Struct rowType = DataType.struct()
.with("PAYLOAD", DataType.VARCHAR_NULL);
ConfigProvider configProvider = ConfigProvider.from(operand).withPrefix("properties.")
.with("connector", "kafka")
.with("format", "raw")
.with("topic", x -> x);
TableLister tableLister = () -> {
AdminClient client = AdminClient.create(operand);
Collection<String> topics = client.listTopics().names().get();
client.close();
return topics;
};
TableResolver resolver = x -> rowType.rel();
Database database = new Database(name, tableLister, resolver, configProvider);
return new DatabaseSchema(database);
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ include 'hoptimator-catalog'
include 'hoptimator-cli'
include 'hoptimator-flink-iterator'
include 'hoptimator-flink-runner'
include 'hoptimator-kafka-adapter'
include 'hoptimator-operator'
include 'hoptimator-planner'
include 'models'
Expand Down

0 comments on commit b2ef1d5

Please sign in to comment.