HashtagTopology.java
package kafka_tweetoscope;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.time.Duration;
public class HashtagTopology {
private static final String INPUT_TOPIC = "hashtags";
private static final String OUTPUT_TOPIC = "hashtag-counts";
private final String bootstrapServer;
public HashtagTopology(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}
public Properties getStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hashtag-counter-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
// Configuration for immediate updates
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// Consumer configuration
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
public Topology createTopology() {
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, Long> inputStream = builder
.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));
// Create KTable with running counts
KTable<String, Long> countTable = inputStream
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.count(Materialized.as("hashtag-counts-store")); // Fixed syntax
// Convert KTable to KStream and send to output topic
countTable
.toStream()
.peek((key, value) -> System.out.printf("Hashtag: %s, Count: %d%n", key, value))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
private static CommandLine parseArguments(String[] args) {
Options options = new Options();
Option bootstrapOpt = Option.builder("b")
.longOpt("bootstrap-servers")
.hasArg()
.desc("Bootstrap servers (e.g., localhost:9092)")
.build();
options.addOption(bootstrapOpt);
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
try {
return parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("HashtagTopology", options);
System.exit(1);
return null;
}
}
public static void main(String[] args) {
// Parse command line arguments
CommandLine cmd = parseArguments(args);
String bootstrapServer = cmd.getOptionValue("bootstrap-servers", "localhost:9092");
// Create topology instance with bootstrap server
HashtagTopology topology = new HashtagTopology(bootstrapServer);
// Get config and create topology
Properties config = topology.getStreamsConfig();
Topology kafkaTopology = topology.createTopology();
// Create and start the KafkaStreams instance
KafkaStreams streams = new KafkaStreams(kafkaTopology, config);
// Clean up local state before starting
streams.cleanUp();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down streams...");
streams.close(Duration.ofSeconds(5));
}));
// Start the streams
System.out.println("Starting Kafka Streams...");
streams.start();
}
}