Skip to content

Commit

Permalink
KAFKA-8410: KTableProcessor migration groundwork (apache#10744)
Browse files Browse the repository at this point in the history
* Lay the groundwork for migrating KTable Processors to the new PAPI.
* Migrate the KTableFilter processor to prove that the groundwork works.

This is an effort to help break up apache#10507 into multiple PRs.

Reviewers: Boyang Chen <[email protected]>
  • Loading branch information
vvcephei authored May 28, 2021
1 parent 588ea44 commit f207bac
Show file tree
Hide file tree
Showing 70 changed files with 514 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableImpl<K, ?, V> parent;
private final Predicate<? super K, ? super V> predicate;
class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn, VIn> {
private final KTableImpl<KIn, ?, VIn> parent;
private final Predicate<? super KIn, ? super VIn> predicate;
private final boolean filterNot;
private final String queryableName;
private boolean sendOldValues;

KTableFilter(final KTableImpl<K, ?, V> parent,
final Predicate<? super K, ? super V> predicate,
KTableFilter(final KTableImpl<KIn, ?, VIn> parent,
final Predicate<? super KIn, ? super VIn> predicate,
final boolean filterNot,
final String queryableName) {
this.parent = parent;
Expand All @@ -45,7 +45,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
}

@Override
public Processor<K, Change<V>> get() {
public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
return new KTableFilterProcessor();
}

Expand All @@ -62,8 +62,8 @@ public boolean enableSendingOldValues(final boolean forceMaterialization) {
return sendOldValues;
}

private V computeValue(final K key, final V value) {
V newValue = null;
private VIn computeValue(final KIn key, final VIn value) {
VIn newValue = null;

if (value != null && (filterNot ^ predicate.test(key, value))) {
newValue = value;
Expand All @@ -72,11 +72,11 @@ private V computeValue(final K key, final V value) {
return newValue;
}

private ValueAndTimestamp<V> computeValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
ValueAndTimestamp<V> newValueAndTimestamp = null;
private ValueAndTimestamp<VIn> computeValue(final KIn key, final ValueAndTimestamp<VIn> valueAndTimestamp) {
ValueAndTimestamp<VIn> newValueAndTimestamp = null;

if (valueAndTimestamp != null) {
final V value = valueAndTimestamp.value();
final VIn value = valueAndTimestamp.value();
if (filterNot ^ predicate.test(key, value)) {
newValueAndTimestamp = valueAndTimestamp;
}
Expand All @@ -86,13 +86,14 @@ private ValueAndTimestamp<V> computeValue(final K key, final ValueAndTimestamp<V
}


private class KTableFilterProcessor extends AbstractProcessor<K, Change<V>> {
private TimestampedKeyValueStore<K, V> store;
private TimestampedTupleForwarder<K, V> tupleForwarder;
private class KTableFilterProcessor implements Processor<KIn, Change<VIn>, KIn, Change<VIn>> {
private ProcessorContext<KIn, Change<VIn>> context;
private TimestampedKeyValueStore<KIn, VIn> store;
private TimestampedTupleForwarder<KIn, VIn> tupleForwarder;

@Override
public void init(final ProcessorContext context) {
super.init(context);
public void init(final ProcessorContext<KIn, Change<VIn>> context) {
this.context = context;
if (queryableName != null) {
store = context.getStateStore(queryableName);
tupleForwarder = new TimestampedTupleForwarder<>(
Expand All @@ -104,23 +105,26 @@ public void init(final ProcessorContext context) {
}

@Override
public void process(final K key, final Change<V> change) {
final V newValue = computeValue(key, change.newValue);
final V oldValue = computeOldValue(key, change);
public void process(final Record<KIn, Change<VIn>> record) {
final KIn key = record.key();
final Change<VIn> change = record.value();

final VIn newValue = computeValue(key, change.newValue);
final VIn oldValue = computeOldValue(key, change);

if (sendOldValues && oldValue == null && newValue == null) {
return; // unnecessary to forward here.
}

if (queryableName != null) {
store.put(key, ValueAndTimestamp.make(newValue, context().timestamp()));
tupleForwarder.maybeForward(key, newValue, oldValue);
store.put(key, ValueAndTimestamp.make(newValue, record.timestamp()));
tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue)));
} else {
context().forward(key, new Change<>(newValue, oldValue));
context.forward(record.withValue(new Change<>(newValue, oldValue)));
}
}

private V computeOldValue(final K key, final Change<V> change) {
private VIn computeOldValue(final KIn key, final Change<VIn> change) {
if (!sendOldValues) {
return null;
}
Expand All @@ -132,16 +136,16 @@ private V computeOldValue(final K key, final Change<V> change) {
}

@Override
public KTableValueGetterSupplier<K, V> view() {
public KTableValueGetterSupplier<KIn, VIn> view() {
// if the KTable is materialized, use the materialized store to return getter value;
// otherwise rely on the parent getter and apply filter on-the-fly
if (queryableName != null) {
return new KTableMaterializedValueGetterSupplier<>(queryableName);
} else {
return new KTableValueGetterSupplier<K, V>() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
return new KTableValueGetterSupplier<KIn, VIn>() {
final KTableValueGetterSupplier<KIn, VIn> parentValueGetterSupplier = parent.valueGetterSupplier();

public KTableValueGetter<K, V> get() {
public KTableValueGetter<KIn, VIn> get() {
return new KTableFilterValueGetter(parentValueGetterSupplier.get());
}

Expand All @@ -154,20 +158,22 @@ public String[] storeNames() {
}


private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
private final KTableValueGetter<K, V> parentGetter;
private class KTableFilterValueGetter implements KTableValueGetter<KIn, VIn> {
private final KTableValueGetter<KIn, VIn> parentGetter;

KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
KTableFilterValueGetter(final KTableValueGetter<KIn, VIn> parentGetter) {
this.parentGetter = parentGetter;
}

@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
// This is the old processor context for compatibility with the other KTable processors.
// Once we migrte them all, we can swap this out.
parentGetter.init(context);
}

@Override
public ValueAndTimestamp<V> get(final K key) {
public ValueAndTimestamp<VIn> get(final KIn key) {
return computeValue(key, parentGetter.get(key));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private static final String TOPIC_SUFFIX = "-topic";
private static final String SINK_NAME = "KTABLE-SINK-";

private final ProcessorSupplier<?, ?> processorSupplier;
// Temporarily setting the processorSupplier to type Object so that we can transition from the
// old ProcessorSupplier to the new api.ProcessorSupplier. This works because all accesses to
// this field are guarded by typechecks anyway.
private final Object processorSupplier;

private final String queryableStoreName;

Expand All @@ -143,6 +146,19 @@ public KTableImpl(final String name,
this.queryableStoreName = queryableStoreName;
}

public KTableImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
final String queryableStoreName,
final org.apache.kafka.streams.processor.api.ProcessorSupplier<?, ?, ?, ?> newProcessorSupplier,
final GraphNode graphNode,
final InternalStreamsBuilder builder) {
super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
this.processorSupplier = newProcessorSupplier;
this.queryableStoreName = queryableStoreName;
}

@Override
public String queryableStoreName() {
return queryableStoreName;
Expand Down Expand Up @@ -179,7 +195,7 @@ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);

final KTableProcessorSupplier<K, V, V> processorSupplier =
final KTableNewProcessorSupplier<K, V, K, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);

final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
Expand All @@ -194,7 +210,7 @@ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,

builder.addGraphNode(this.graphNode, tableNode);

return new KTableImpl<>(
return new KTableImpl<K, V, V>(
name,
keySerde,
valueSerde,
Expand Down Expand Up @@ -816,6 +832,8 @@ public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
return new KTableSourceValueGetterSupplier<>(source.queryableName());
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
} else if (processorSupplier instanceof KTableNewProcessorSupplier) {
return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
} else {
return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
}
Expand All @@ -832,6 +850,12 @@ public boolean enableSendingOldValues(final boolean forceMaterialization) {
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
} else if (processorSupplier instanceof KTableNewProcessorSupplier) {
final KTableNewProcessorSupplier<?, ?, ?, ?> tableProcessorSupplier =
(KTableNewProcessorSupplier<?, ?, ?, ?>) processorSupplier;
if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
return false;
}
} else {
final KTableProcessorSupplier<K, S, V> tableProcessorSupplier = (KTableProcessorSupplier<K, S, V>) processorSupplier;
if (!tableProcessorSupplier.enableSendingOldValues(forceMaterialization)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public interface KTableNewProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {

KTableValueGetterSupplier<KOut, VOut> view();

/**
* Potentially enables sending old values.
* <p>
* If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to
* enable sending old values.
* <p>
* If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values <i>if</i>
* an upstream node is already materialized.
*
* @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old
* values.
* @return {@code true} if sending old values is enabled, i.e. either because {@code forceMaterialization} was
* {@code true} or some upstream node is materialized.
*/
boolean enableSendingOldValues(boolean forceMaterialization);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,45 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.internals.CacheFlushListener;

class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> {
private final InternalProcessorContext context;
class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Windowed<KOut>, VOut> {
private final InternalProcessorContext<Windowed<KOut>, Change<VOut>> context;

@SuppressWarnings("rawtypes")
private final ProcessorNode myNode;

@SuppressWarnings("unchecked")
SessionCacheFlushListener(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
this.context = (InternalProcessorContext<Windowed<KOut>, Change<VOut>>) context;
myNode = this.context.currentNode();
}

@Override
public void apply(final Windowed<K> key,
final V newValue,
final V oldValue,
public void apply(final Windowed<KOut> key,
final VOut newValue,
final VOut oldValue,
final long timestamp) {
final ProcessorNode prev = context.currentNode();
@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end()));
} finally {
context.setCurrentNode(prev);
}
}

@Override
public void apply(final Record<Windowed<KOut>, Change<VOut>> record) {
@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {
context.forward(record.withTimestamp(record.key().window().end()));
} finally {
context.setCurrentNode(prev);
}
}
}
Loading

0 comments on commit f207bac

Please sign in to comment.