HashtagVisualizer.java

package kafka_tweetoscope;

import java.awt.Color;
import java.awt.Dimension;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import javax.swing.BorderFactory;
import javax.swing.JFrame;
import javax.swing.SwingUtilities;

import org.apache.commons.cli.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.AxisLocation;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.data.category.DefaultCategoryDataset;

public class HashtagVisualizer extends JFrame implements Runnable {
    private static final String ROW_KEY = "hashtag";
    private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
    
    private final DefaultCategoryDataset dataset;
    private final int nbLeaders;
    private final String bootstrapServer;
    private final String topic;
    private final AtomicBoolean running;
    private final Map<String, Long> hashtagCounts;
    private KafkaConsumer<String, Long> consumer;
    private Thread consumerThread;

    public HashtagVisualizer(String bootstrapServer, String topic, int nbLeaders) {
        this.bootstrapServer = bootstrapServer;
        this.topic = topic;
        this.nbLeaders = nbLeaders;
        this.running = new AtomicBoolean(true);
        this.hashtagCounts = new HashMap<>();
        this.dataset = new DefaultCategoryDataset();

        // Setup the chart
        setupChart();
        
        // Initialize and start the consumer
        initializeConsumer();
        startConsumerThread();
    }

    private void setupChart() {
        JFreeChart chart = ChartFactory.createBarChart(
            "Most Popular Hashtags",    // title
            "",                         // category axis label
            "Number of occurrences",    // value axis label
            dataset,                    // dataset
            PlotOrientation.HORIZONTAL, // orientation
            false,                      // legend
            true,                       // tooltips
            false                       // urls
        );
        
        chart.getCategoryPlot().setRangeAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
        chart.getCategoryPlot().setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
        
        ChartPanel chartPanel = new ChartPanel(chart);
        chartPanel.setBorder(BorderFactory.createEmptyBorder(5, 5, 5, 5));
        chartPanel.setBackground(Color.white);
        chartPanel.setPreferredSize(new Dimension(500, 300));
        
        this.add(chartPanel);
        this.pack();
        this.setTitle("Kafka Hashtag Visualizer");
        this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        this.setVisible(true);
    }

    private void initializeConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "hashtag-visualizer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topic));
    }

    private void startConsumerThread() {
        consumerThread = new Thread(this);
        consumerThread.setName("HashtagVisualizer-Consumer");
        consumerThread.start();
    }

    @Override
    public void run() {
        try {
            while (running.get()) {
                ConsumerRecords<String, Long> records = consumer.poll(POLL_TIMEOUT);
                boolean needsUpdate = false;

                for (ConsumerRecord<String, Long> record : records) {
                    String hashtag = record.key();
                    Long count = record.value();
                    
                    if (hashtag != null && count != null) {
                        hashtagCounts.merge(hashtag, count, Long::sum);
                        needsUpdate = true;
                    }
                }

                if (needsUpdate) {
                    updateVisualization();
                }
            }
        } catch (Exception e) {
            System.err.println("Error in consumer thread: " + e.getMessage());
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private void updateVisualization() {
        SwingUtilities.invokeLater(() -> {
            dataset.clear();
            
            // Get top hashtags sorted by count
            Stream<Map.Entry<String, Long>> sortedTopHashtags = 
                hashtagCounts.entrySet().stream()
                    .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                    .limit(nbLeaders);

            // Update the dataset with new values
            sortedTopHashtags.forEach(entry -> 
                dataset.setValue(entry.getValue(), ROW_KEY, entry.getKey())
            );

            // Add padding if necessary
            for (int i = hashtagCounts.size(); i < nbLeaders; i++) {
                dataset.setValue(0, ROW_KEY, "");
            }
        });
    }

    public void shutdown() {
        running.set(false);
        if (consumerThread != null) {
            consumerThread.interrupt();
        }
    }
    private static CommandLine parseArguments(String[] args) {
        Options options = new Options();
        Option bootstrapOpt = Option.builder("b")
            .longOpt("bootstrap-servers")
            .hasArg()
            .desc("Bootstrap servers (e.g., localhost:9092)")
            .build();
        
        options.addOption(bootstrapOpt);
        CommandLineParser parser = new DefaultParser();
        HelpFormatter formatter = new HelpFormatter();
        
        try {
            return parser.parse(options, args);
        } catch (ParseException e) {
            System.out.println(e.getMessage());
            formatter.printHelp("HashtagTopology", options);
            System.exit(1);
            return null;
        }
    }

    public static void main(String[] args) {
        CommandLine cmd = parseArguments(args);
        String bootstrapServer = cmd.getOptionValue("bootstrap-servers", "localhost:9092");        
        String topic = "hashtag-counts";
        int nbLeaders = 10;  // Show top 10 hashtags

        HashtagVisualizer visualizer = new HashtagVisualizer(bootstrapServer, topic, nbLeaders);
        
        // Add shutdown hook for graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down visualizer...");
            visualizer.shutdown();
        }));
    }
}