Skip to content

Commit

Permalink
Tap bus/channel reports
Browse files Browse the repository at this point in the history
  • Loading branch information
lennartkoopmann committed May 1, 2022
1 parent 4a92924 commit 4815d97
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,24 @@ public class StatusResource {
@Inject
private NzymeLeader nzyme;

/*
* TODO: auth system
* reject if tap name > 50chars (do the same in tap config loading)
*/

@POST
public Response status(StatusReport report) {
LOG.debug("Received status from tap [{}]: memory_total: {}, memory_free: {}, memory_used: {}, cpu: {}%, bytes processed: {}, avg bytes processed: {}",
LOG.debug("Received status from tap [{}]: memory_total: {}, memory_free: {}, memory_used: {}, cpu: {}%, bytes processed: {}, avg bytes processed: {}, bus: {}",
report.tapName(),
report.systemMetrics().memoryTotal(),
report.systemMetrics().memoryFree(),
report.systemMetrics().memoryTotal()-report.systemMetrics().memoryFree(),
report.systemMetrics().cpuLoad(),
report.processedBytes().total(),
report.processedBytes().average()
report.processedBytes().average(),
report.bus()
);

if (report.tapName().length() > 50) {
LOG.debug("Tap name [{}] exceeds maximum length of 50 characters.", report.tapName());
return Response.status(Response.Status.FORBIDDEN).build();
}

nzyme.getTapManager().registerTapStatus(report);

return Response.status(Response.Status.CREATED).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* This file is part of nzyme.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package horse.wtf.nzyme.rest.resources.taps.reports;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;

import java.util.List;

@AutoValue
public abstract class BusReport {

public abstract List<ChannelReport> channels();

@JsonCreator
public static BusReport create(@JsonProperty("channels") List<ChannelReport> channels) {
return builder()
.channels(channels)
.build();
}

public static Builder builder() {
return new AutoValue_BusReport.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder channels(List<ChannelReport> channels);

public abstract BusReport build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* This file is part of nzyme.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/

package horse.wtf.nzyme.rest.resources.taps.reports;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;

@AutoValue
public abstract class ChannelReport {

public abstract String name();
public abstract Long capacity();
public abstract Long watermark();
public abstract TotalWithAverage errors();
public abstract TotalWithAverage throughputBytes();
public abstract TotalWithAverage throughputMessages();

@JsonCreator
public static ChannelReport create(@JsonProperty("name") String name,
@JsonProperty("capacity") Long capacity,
@JsonProperty("watermark") Long watermark,
@JsonProperty("errors") TotalWithAverage errors,
@JsonProperty("throughput_bytes") TotalWithAverage throughputBytes,
@JsonProperty("throughput_messages") TotalWithAverage throughputMessages) {
return builder()
.name(name)
.capacity(capacity)
.watermark(watermark)
.errors(errors)
.throughputBytes(throughputBytes)
.throughputMessages(throughputMessages)
.build();
}

public static Builder builder() {
return new AutoValue_ChannelReport.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder name(String name);

public abstract Builder capacity(Long capacity);

public abstract Builder watermark(Long watermark);

public abstract Builder errors(TotalWithAverage errors);

public abstract Builder throughputBytes(TotalWithAverage throughputBytes);

public abstract Builder throughputMessages(TotalWithAverage throughputMessages);

public abstract ChannelReport build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ public abstract class StatusReport {
public abstract String tapName();
public abstract DateTime timestamp();
public abstract TotalWithAverage processedBytes();
public abstract BusReport bus();
public abstract SystemMetrics systemMetrics();

@JsonCreator
public static StatusReport create(@JsonProperty("tap_name") String tapName,
@JsonProperty("timestamp") DateTime timestamp,
@JsonProperty("processed_bytes") TotalWithAverage processedBytes,
@JsonProperty("bus") BusReport bus,
@JsonProperty("system_metrics") SystemMetrics systemMetrics) {
return builder()
.tapName(tapName)
.timestamp(timestamp)
.processedBytes(processedBytes)
.systemMetrics(systemMetrics)
.bus(bus)
.build();
}

Expand All @@ -55,6 +58,8 @@ public abstract static class Builder {

public abstract Builder processedBytes(TotalWithAverage processedBytes);

public abstract Builder bus(BusReport bus);

public abstract Builder systemMetrics(SystemMetrics systemMetrics);

public abstract StatusReport build();
Expand Down
100 changes: 93 additions & 7 deletions src/main/java/horse/wtf/nzyme/taps/TapManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package horse.wtf.nzyme.taps;

import horse.wtf.nzyme.NzymeLeader;
import horse.wtf.nzyme.rest.resources.taps.reports.ChannelReport;
import horse.wtf.nzyme.rest.resources.taps.reports.StatusReport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,7 +21,7 @@ public TapManager(NzymeLeader nzyme) {
}

public void registerTapStatus(StatusReport report) {
long count = nzyme.getDatabase().withHandle(handle ->
long tapCount = nzyme.getDatabase().withHandle(handle ->
handle.createQuery("SELECT COUNT(*) AS count FROM taps WHERE name = :name")
.bind("name", report.tapName())
.mapTo(Long.class)
Expand All @@ -29,7 +30,7 @@ public void registerTapStatus(StatusReport report) {

DateTime now = DateTime.now();

if (count == 0) {
if (tapCount == 0) {
LOG.info("Registering first report from new tap [{}].", report.tapName());

nzyme.getDatabase().useHandle(handle ->
Expand All @@ -50,11 +51,6 @@ public void registerTapStatus(StatusReport report) {
.execute()
);
} else {
if (count > 1) {
LOG.warn("Found multiple tap status entries for tap [{}]. This should never happen and can lead " +
"to inconsistencies.", report.tapName());
}

LOG.debug("Registering report from existing tap [{}].", report.tapName());

nzyme.getDatabase().useHandle(handle ->
Expand All @@ -76,6 +72,96 @@ public void registerTapStatus(StatusReport report) {
);
}

// Register bus.
long busCount = nzyme.getDatabase().withHandle(handle ->
handle.createQuery("SELECT COUNT(*) AS count FROM tap_buses WHERE tap_name = :tap_name")
.bind("tap_name", report.tapName())
.mapTo(Long.class)
.one()
);

if (busCount == 0) {
nzyme.getDatabase().useHandle(handle ->
handle.createUpdate("INSERT INTO tap_buses(tap_name, created_at, updated_at) " +
"VALUES(:tap_name, :created_at, :updated_at)")
.bind("tap_name", report.tapName())
.bind("created_at", now)
.bind("updated_at", now)
.execute()
);
} else {
nzyme.getDatabase().useHandle(handle ->
handle.createUpdate("UPDATE tap_buses SET updated_at = :updated_at WHERE tap_name = :tap_name")
.bind("tap_name", report.tapName())
.bind("updated_at", now)
.execute()
);
}

Long busId = nzyme.getDatabase().withHandle(handle ->
handle.createQuery("SELECT id FROM tap_buses WHERE tap_name = :tap_name")
.bind("tap_name", report.tapName())
.mapTo(Long.class)
.one()
);

// Register bus channels.
for (ChannelReport channel : report.bus().channels()) {
long channelCount = nzyme.getDatabase().withHandle(handle ->
handle.createQuery("SELECT COUNT(*) AS count FROM bus_channels " +
"WHERE bus_id = :bus_id AND name = :channel_name")
.bind("bus_id", busId)
.bind("channel_name", channel.name())
.mapTo(Long.class)
.one()
);

if (channelCount == 0) {
nzyme.getDatabase().withHandle(handle ->
handle.createUpdate("INSERT INTO bus_channels(name, bus_id, capacity, watermark, errors_total, " +
"errors_average, throughput_bytes_total, throughput_bytes_average, " +
"throughput_messages_total, throughput_messages_average, created_at, updated_at) " +
"VALUES(:name, :bus_id, :capacity, :watermark, :errors_total, :errors_average, " +
":throughput_bytes_total, :throughput_bytes_average, :throughput_messages_total, " +
":throughput_messages_average, :created_at, :updated_at)")
.bind("name", channel.name())
.bind("bus_id", busId)
.bind("capacity", channel.capacity())
.bind("watermark", channel.watermark())
.bind("errors_total", channel.errors().total())
.bind("errors_average", channel.errors().average())
.bind("throughput_bytes_total", channel.throughputBytes().total())
.bind("throughput_bytes_average", channel.throughputBytes().average())
.bind("throughput_messages_total", channel.throughputMessages().total())
.bind("throughput_messages_average", channel.throughputMessages().average())
.bind("created_at", now)
.bind("updated_at", now)
.execute()
);
} else {
nzyme.getDatabase().withHandle(handle ->
handle.createUpdate("UPDATE bus_channels SET capacity = :capacity, watermark = :watermark, " +
"errors_total = :errors_total, errors_average = :errors_average, " +
"throughput_bytes_total = :throughput_bytes_total, " +
"throughput_bytes_average = :throughput_bytes_average, " +
"throughput_messages_total = :throughput_messages_total, " +
"throughput_messages_average = :throughput_messages_average, " +
"updated_at = :updated_at WHERE bus_id = :bus_id AND name = :name")
.bind("name", channel.name())
.bind("bus_id", busId)
.bind("capacity", channel.capacity())
.bind("watermark", channel.watermark())
.bind("errors_total", channel.errors().total())
.bind("errors_average", channel.errors().average())
.bind("throughput_bytes_total", channel.throughputBytes().total())
.bind("throughput_bytes_average", channel.throughputBytes().average())
.bind("throughput_messages_total", channel.throughputMessages().total())
.bind("throughput_messages_average", channel.throughputMessages().average())
.bind("updated_at", now)
.execute()
);
}
}
}

public List<Tap> findAllTaps() {
Expand Down
Loading

0 comments on commit 4815d97

Please sign in to comment.