HashtagVisualizer.java
package kafka_tweetoscope;
import java.awt.Color;
import java.awt.Dimension;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.swing.BorderFactory;
import javax.swing.JFrame;
import javax.swing.SwingUtilities;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel;
import org.jfree.chart.JFreeChart;
import org.jfree.chart.axis.AxisLocation;
import org.jfree.chart.plot.PlotOrientation;
import org.jfree.data.category.DefaultCategoryDataset;
public class HashtagVisualizer extends JFrame implements Runnable {
private static final String ROW_KEY = "hashtag";
private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
private final DefaultCategoryDataset dataset;
private final int nbLeaders;
private final String bootstrapServer;
private final String topic;
private final AtomicBoolean running;
private final Map<String, Long> hashtagCounts;
private KafkaConsumer<String, Long> consumer;
private Thread consumerThread;
public HashtagVisualizer(String bootstrapServer, String topic, int nbLeaders) {
this.bootstrapServer = bootstrapServer;
this.topic = topic;
this.nbLeaders = nbLeaders;
this.running = new AtomicBoolean(true);
this.hashtagCounts = new HashMap<>();
this.dataset = new DefaultCategoryDataset();
// Setup the chart
setupChart();
// Initialize and start the consumer
initializeConsumer();
startConsumerThread();
}
private void setupChart() {
JFreeChart chart = ChartFactory.createBarChart(
"Most Popular Hashtags", // title
"", // category axis label
"Number of occurrences", // value axis label
dataset, // dataset
PlotOrientation.HORIZONTAL, // orientation
false, // legend
true, // tooltips
false // urls
);
chart.getCategoryPlot().setRangeAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
chart.getCategoryPlot().setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
ChartPanel chartPanel = new ChartPanel(chart);
chartPanel.setBorder(BorderFactory.createEmptyBorder(5, 5, 5, 5));
chartPanel.setBackground(Color.white);
chartPanel.setPreferredSize(new Dimension(500, 300));
this.add(chartPanel);
this.pack();
this.setTitle("Kafka Hashtag Visualizer");
this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
this.setVisible(true);
}
private void initializeConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hashtag-visualizer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
}
private void startConsumerThread() {
consumerThread = new Thread(this);
consumerThread.setName("HashtagVisualizer-Consumer");
consumerThread.start();
}
@Override
public void run() {
try {
while (running.get()) {
ConsumerRecords<String, Long> records = consumer.poll(POLL_TIMEOUT);
boolean needsUpdate = false;
for (ConsumerRecord<String, Long> record : records) {
String hashtag = record.key();
Long count = record.value();
if (hashtag != null && count != null) {
hashtagCounts.merge(hashtag, count, Long::sum);
needsUpdate = true;
}
}
if (needsUpdate) {
updateVisualization();
}
}
} catch (Exception e) {
System.err.println("Error in consumer thread: " + e.getMessage());
e.printStackTrace();
} finally {
consumer.close();
}
}
private void updateVisualization() {
SwingUtilities.invokeLater(() -> {
dataset.clear();
// Get top hashtags sorted by count
Stream<Map.Entry<String, Long>> sortedTopHashtags =
hashtagCounts.entrySet().stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.limit(nbLeaders);
// Update the dataset with new values
sortedTopHashtags.forEach(entry ->
dataset.setValue(entry.getValue(), ROW_KEY, entry.getKey())
);
// Add padding if necessary
for (int i = hashtagCounts.size(); i < nbLeaders; i++) {
dataset.setValue(0, ROW_KEY, "");
}
});
}
public void shutdown() {
running.set(false);
if (consumerThread != null) {
consumerThread.interrupt();
}
}
private static CommandLine parseArguments(String[] args) {
Options options = new Options();
Option bootstrapOpt = Option.builder("b")
.longOpt("bootstrap-servers")
.hasArg()
.desc("Bootstrap servers (e.g., localhost:9092)")
.build();
options.addOption(bootstrapOpt);
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
try {
return parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("HashtagTopology", options);
System.exit(1);
return null;
}
}
public static void main(String[] args) {
CommandLine cmd = parseArguments(args);
String bootstrapServer = cmd.getOptionValue("bootstrap-servers", "localhost:9092");
String topic = "hashtag-counts";
int nbLeaders = 10; // Show top 10 hashtags
HashtagVisualizer visualizer = new HashtagVisualizer(bootstrapServer, topic, nbLeaders);
// Add shutdown hook for graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down visualizer...");
visualizer.shutdown();
}));
}
}