Skip to content

Commit

Permalink
Operator with KafkaTopic controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Apr 12, 2023
1 parent cfec185 commit fa33662
Show file tree
Hide file tree
Showing 21 changed files with 1,570 additions and 7 deletions.
12 changes: 12 additions & 0 deletions bin/hoptimator-operator
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh

JAR="./hoptimator-operator/build/libs/hoptimator-operator-all.jar"

if [[ -f "$JAR" ]]; then
java \
--add-opens java.base/java.lang=ALL-UNNAMED \
--add-opens java.base/java.util=ALL-UNNAMED \
-jar $JAR -n "" -p "" -u "jdbc:calcite:model=./models/test-model.yaml"
else
echo "jar file not found; maybe forgot to build?"
fi
85 changes: 85 additions & 0 deletions deploy/kafkatopics.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: kafkatopics.hoptimator.linkedin.com
spec:
group: hoptimator.linkedin.com
names:
kind: KafkaTopic
listKind: KafkaTopicList
plural: kafkatopics
singular: kafkatopic
shortNames:
- kt
preserveUnknownFields: false
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
description: Kafka Topic
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
description: Desired Kafka topic configuration.
type: object
properties:
name:
description: The topic name.
type: string
numPartitions:
description: Number of partitions the topic should have. By default, the cluster decides.
type: integer
replicationFactor:
description: The replication factor the topic should have. By default, the cluster decides.
type: integer
configs:
description: Topic configurations.
type: object
additionalProperties:
type: string
clientConfigs:
description: ConfigMaps for AdminClient configuration.
type: array
items:
type: object
properties:
configMapRef:
description: Reference to a ConfigMap to use for AdminClient configuration.
type: object
properties:
name:
description: Name of ConfigMap to use for AdminClient configuration.
type: string
required:
- name
clientOverrides:
description: AdminClient overrides.
type: object
additionalProperties:
type: string
required:
- name
status:
description: Current state of the topic.
type: object
properties:
numPartitions:
description: Actual number of partitions the topic has when last checked.
type: integer
ready:
description: Whether the requested topic has been created.
type: boolean
message:
description: Error or success message, for information only.
type: string
subresources:
status: {}
8 changes: 8 additions & 0 deletions deploy/samples/kafkatopic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaTopic
metadata:
name: sample-kafka-topic-1
spec:
name: sample-1
clientOverrides:
bootstrap.servers: localhost:9092
5 changes: 2 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ jackson = "com.fasterxml.jackson.core:jackson-core:2.13.2"
jacksonYaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2"
javaxAnnotationApi = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafkaClients = "org.apache.kafka:kafka-clients:3.1.1"
kubernetesClient = "io.kubernetes:client-java:16.0.2"
kubernetesExtendedClient = "io.kubernetes:client-java-extended:16.0.2"
log4jApi = "org.apache.logging.log4j:log4j-api:2.17.1"
log4jCore = "org.apache.logging.log4j:log4j-core:2.17.1"
log4jSlf4jImpl = "org.apache.logging.log4j:log4j-slf4j-impl:2.17.1"
slf4jLog4j = "org.slf4j:slf4j-log4j12:1.7.25"
sqlline = "sqlline:sqlline:1.12.0"
2 changes: 1 addition & 1 deletion hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies {
// implementation libs.jlineTerminal
// implementation libs.jlineReader
// implementation libs.jlineBuiltins
implementation libs.log4jSlf4jImpl
implementation libs.slf4jLog4j
implementation libs.flinkClients
implementation libs.flinkTableRuntime
implementation libs.flinkTablePlanner
Expand Down
27 changes: 27 additions & 0 deletions hoptimator-operator/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
plugins {
id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'java'
id 'application'
}

dependencies {
implementation project(':hoptimator-planner')
implementation project(':hoptimator-catalog')
implementation project(':models')

implementation libs.kafkaClients
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
implementation libs.slf4jLog4j

testImplementation libs.junit
testImplementation libs.assertj
}

shadowJar {
exclude 'META-INF/*.RSA', 'META-INF/*.SF','META-INF/*.DSA'
zip64 true
manifest.attributes 'Main-Class': 'com.linkedin.hoptimator.HoptimatorOperatorApp'
mainClassName = 'com.linkedin.hoptimator.HoptimatorOperatorApp'
mergeServiceFiles()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.hoptimator;

import io.kubernetes.client.openapi.models.V1ConfigMap;

import java.util.ArrayList;
import java.util.Properties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConfigAssembler {
private final Operator operator;
private final List<ConfigMapRef> refs = new ArrayList<>();
private final Map<String, String> overrides = new HashMap<>();

public ConfigAssembler(Operator operator) {
this.operator = operator;
}

public void addOverride(String key, String value) {
overrides.put(key, value);
}

public void addRef(String namespace, String name) {
refs.add(new ConfigMapRef(namespace, name));
}

public Map<String, String> assemble() {
Map<String, String> combined = new HashMap<>();
refs.forEach(x -> combined.putAll(x.fetch(operator)));
overrides.forEach((k, v) -> combined.put(k, v));
return combined;
}

public Properties assembleProperties() {
Properties properties = new Properties();
assemble().forEach((k, v) -> properties.put(k, v));
return properties;
}

private static class ConfigMapRef {
private final String namespace;
private final String name;

ConfigMapRef(String namespace, String name) {
this.namespace = namespace;
this.name = name;
}

Map<String, String> fetch(Operator operator) {
return operator.<V1ConfigMap>fetch("configmap", namespace, name).getData();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkedin.hoptimator;

import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.ControllerManager;

import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
import com.linkedin.hoptimator.kafka.KafkaTopicControllerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class HoptimatorOperatorApp {

public static void main(String[] args) throws IOException {

ApiClient apiClient = Config.defaultClient();
apiClient.setHttpClient(apiClient.getHttpClient().newBuilder()
.readTimeout(0, TimeUnit.SECONDS).build());
SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient);
Operator operator = new Operator(apiClient, informerFactory);

operator.registerApi("kafkatopic", "kafkatopics", "hoptimator.linkedin.com", "v1alpha1",
V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);

ControllerManager controllerManager = new ControllerManager(operator.informerFactory(),
KafkaTopicControllerFactory.create(operator));

controllerManager.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.linkedin.hoptimator;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.cache.Indexer;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.util.generic.GenericKubernetesApi;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/** Single handle to all the clients required by all the controllers. */
public class Operator {
private final ApiClient apiClient;
private final SharedInformerFactory informerFactory;
private final Map<String, ApiInfo<?, ?>> apiInfo = new HashMap<>();

public Operator(ApiClient apiClient, SharedInformerFactory informerFactory) {
this.apiClient = apiClient;
this.informerFactory = informerFactory;
}

public Operator(ApiClient apiClient) {
this(apiClient, new SharedInformerFactory(apiClient));
}

public <T extends KubernetesObject, L extends KubernetesListObject> void registerApi(String singular,
String plural, String group, String version, Class<T> type, Class<L> list) {
registerApi(new ApiInfo<T, L>(singular, plural, group, version, type, list));
}

public <T extends KubernetesObject, L extends KubernetesListObject> void registerApi(ApiInfo<T, L> api) {
this.apiInfo.put(api.singular(), api);
// side-effect: register shared informer
informerFactory.sharedIndexInformerFor(api.generic(apiClient), api.type(), resyncPeriod().toMillis());
}

public void start() {
informerFactory.startAllRegisteredInformers();
}

public <T extends KubernetesObject, L extends KubernetesListObject> ApiInfo<T, L> apiInfo(String singular) {
if (!this.apiInfo.containsKey(singular)) {
throw new IllegalArgumentException("No API for '" + singular + "' registered!");
}
return (ApiInfo<T, L>) this.apiInfo.get(singular);
}

public <T extends KubernetesObject> SharedIndexInformer<T> informer(String singular) {
ApiInfo<T, ?> api = apiInfo(singular);
return informerFactory.getExistingSharedIndexInformer(api.type());
}

public SharedInformerFactory informerFactory() {
return informerFactory;
}

public <T extends KubernetesObject> T fetch(String singular, String namespace, String name) {
return (T) lister(singular).namespace(namespace).get(name);
}

public <T extends KubernetesObject> Lister<T> lister(String singular) {
return new Lister<T>((Indexer<T>) informer(singular).getIndexer());
}

public Duration failureRetryDuration() {
return Duration.ofMinutes(5);
}

public Duration resyncPeriod() {
return Duration.ofMinutes(1);
}

public static class ApiInfo<T extends KubernetesObject, L extends KubernetesListObject> {
private final String singular;
private final String plural;
private final String group;
private final String version;
private final Class<T> type;
private final Class<L> list;

public ApiInfo(String singular, String plural, String group, String version, Class<T> type, Class<L> list) {
this.singular = singular;
this.plural = plural;
this.group = group;
this.version = version;
this.type = type;
this.list = list;
}

public GenericKubernetesApi<T, L> generic(ApiClient apiClient) {
return new GenericKubernetesApi<T, L>(type(), list(), group(), version(), plural(), apiClient);
}

public String singular() {
return singular;
}

public String plural() {
return plural;
}

public String group() {
return group;
}

public String version() {
return version;
}

public Class<T> type() {
return type;
}

public Class<L> list() {
return list;
}
}
}
Loading

0 comments on commit fa33662

Please sign in to comment.