KubeVisualiser.java

package kafka_tweetoscope;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

public class KubeVisualiser {
    private static final String HASHTAG_COUNTS_TOPIC = "hashtag-counts";
    private final Map<String, Long> hashtagCounts = new ConcurrentHashMap<>();
    private final int numLeaders;
    private final int reportIntervalSeconds;
    private final Properties props;
    private volatile boolean running = true;

    public KubeVisualiser(String bootstrapServers, int numLeaders, int reportIntervalSeconds) {
        this.numLeaders = numLeaders;
        this.reportIntervalSeconds = reportIntervalSeconds;
        
        // Configure Kafka Consumer
        this.props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "text-visualiser-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    }

    private void printLeaderboard() {
        // Convert map to sorted list
        List<Map.Entry<String, Long>> sorted = new ArrayList<>(hashtagCounts.entrySet());
        sorted.sort((a, b) -> b.getValue().compareTo(a.getValue()));
        
        // Take top N
        List<Map.Entry<String, Long>> leaders = 
            sorted.subList(0, Math.min(numLeaders, sorted.size()));

        // Print the leaderboard
        System.out.println("\n=== Top " + numLeaders + " Hashtags ===");
        System.out.println("Timestamp: " + new Date());
        System.out.println("------------------------");
        for (int i = 0; i < leaders.size(); i++) {
            Map.Entry<String, Long> entry = leaders.get(i);
            System.out.printf("%d. %-20s %d%n", 
                i + 1, entry.getKey(), entry.getValue());
        }
        System.out.println("------------------------\n");
    }

    public void start() {
        // Create consumer
        try (KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(props)) {
            // Subscribe to topic
            consumer.subscribe(Collections.singletonList(HASHTAG_COUNTS_TOPIC));

            // Start reporting thread
            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
            executor.scheduleAtFixedRate(
                this::printLeaderboard, 
                reportIntervalSeconds, 
                reportIntervalSeconds, 
                TimeUnit.SECONDS
            );

            // Consume messages
            while (running) {
                ConsumerRecords<String, Long> records = 
                    consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, Long> record : records) {
                    hashtagCounts.put(record.key(), record.value());
                }
            }

            // Cleanup
            executor.shutdown();
        }
    }

    public void stop() {
        running = false;
    }

    public static void main(String[] args) {
        if (args.length < 3) {
            System.out.println("Usage: KubeVisualiser <bootstrap-servers> <num-leaders> <report-interval-seconds>");
            System.exit(1);
        }

        String bootstrapServers = args[0];
        int numLeaders = Integer.parseInt(args[1]);
        int reportInterval = Integer.parseInt(args[2]);

        KubeVisualiser visualiser = 
            new KubeVisualiser(bootstrapServers, numLeaders, reportInterval);
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(visualiser::stop));

        // Start visualiser
        visualiser.start();
    }
}