KubeVisualiser.java
package kafka_tweetoscope;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class KubeVisualiser {
private static final String HASHTAG_COUNTS_TOPIC = "hashtag-counts";
private final Map<String, Long> hashtagCounts = new ConcurrentHashMap<>();
private final int numLeaders;
private final int reportIntervalSeconds;
private final Properties props;
private volatile boolean running = true;
public KubeVisualiser(String bootstrapServers, int numLeaders, int reportIntervalSeconds) {
this.numLeaders = numLeaders;
this.reportIntervalSeconds = reportIntervalSeconds;
// Configure Kafka Consumer
this.props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "text-visualiser-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
}
private void printLeaderboard() {
// Convert map to sorted list
List<Map.Entry<String, Long>> sorted = new ArrayList<>(hashtagCounts.entrySet());
sorted.sort((a, b) -> b.getValue().compareTo(a.getValue()));
// Take top N
List<Map.Entry<String, Long>> leaders =
sorted.subList(0, Math.min(numLeaders, sorted.size()));
// Print the leaderboard
System.out.println("\n=== Top " + numLeaders + " Hashtags ===");
System.out.println("Timestamp: " + new Date());
System.out.println("------------------------");
for (int i = 0; i < leaders.size(); i++) {
Map.Entry<String, Long> entry = leaders.get(i);
System.out.printf("%d. %-20s %d%n",
i + 1, entry.getKey(), entry.getValue());
}
System.out.println("------------------------\n");
}
public void start() {
// Create consumer
try (KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props)) {
// Subscribe to topic
consumer.subscribe(Collections.singletonList(HASHTAG_COUNTS_TOPIC));
// Start reporting thread
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(
this::printLeaderboard,
reportIntervalSeconds,
reportIntervalSeconds,
TimeUnit.SECONDS
);
// Consume messages
while (running) {
ConsumerRecords<String, Long> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Long> record : records) {
hashtagCounts.put(record.key(), record.value());
}
}
// Cleanup
executor.shutdown();
}
}
public void stop() {
running = false;
}
public static void main(String[] args) {
if (args.length < 3) {
System.out.println("Usage: KubeVisualiser <bootstrap-servers> <num-leaders> <report-interval-seconds>");
System.exit(1);
}
String bootstrapServers = args[0];
int numLeaders = Integer.parseInt(args[1]);
int reportInterval = Integer.parseInt(args[2]);
KubeVisualiser visualiser =
new KubeVisualiser(bootstrapServers, numLeaders, reportInterval);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(visualiser::stop));
// Start visualiser
visualiser.start();
}
}