HashtagTopology.java

package kafka_tweetoscope;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.commons.cli.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;
import java.time.Duration;

public class HashtagTopology {
    private static final String INPUT_TOPIC = "hashtags";
    private static final String OUTPUT_TOPIC = "hashtag-counts";
    private final String bootstrapServer;

    public HashtagTopology(String bootstrapServer) {
        this.bootstrapServer = bootstrapServer;
    }

    public Properties getStreamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hashtag-counter-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
        
        // Configuration for immediate updates
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
        // props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        
        // Consumer configuration
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        return props;
    }

    public Topology createTopology() {
        StreamsBuilder builder = new StreamsBuilder();
    
        // Read from input topic
        KStream<String, Long> inputStream = builder
            .stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.Long()));
    
        // Create KTable with running counts
        KTable<String, Long> countTable = inputStream
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .count(Materialized.as("hashtag-counts-store"));  // Fixed syntax
    
        // Convert KTable to KStream and send to output topic
        countTable
            .toStream()
            .peek((key, value) -> System.out.printf("Hashtag: %s, Count: %d%n", key, value))
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    
        return builder.build();
    }

    private static CommandLine parseArguments(String[] args) {
        Options options = new Options();
        Option bootstrapOpt = Option.builder("b")
            .longOpt("bootstrap-servers")
            .hasArg()
            .desc("Bootstrap servers (e.g., localhost:9092)")
            .build();
        
        options.addOption(bootstrapOpt);
        CommandLineParser parser = new DefaultParser();
        HelpFormatter formatter = new HelpFormatter();
        
        try {
            return parser.parse(options, args);
        } catch (ParseException e) {
            System.out.println(e.getMessage());
            formatter.printHelp("HashtagTopology", options);
            System.exit(1);
            return null;
        }
    }

    public static void main(String[] args) {
        // Parse command line arguments
        CommandLine cmd = parseArguments(args);
        String bootstrapServer = cmd.getOptionValue("bootstrap-servers", "localhost:9092");

        // Create topology instance with bootstrap server
        HashtagTopology topology = new HashtagTopology(bootstrapServer);

        // Get config and create topology
        Properties config = topology.getStreamsConfig();
        Topology kafkaTopology = topology.createTopology();

        // Create and start the KafkaStreams instance
        KafkaStreams streams = new KafkaStreams(kafkaTopology, config);

        // Clean up local state before starting
        streams.cleanUp();

        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down streams...");
            streams.close(Duration.ofSeconds(5));
        }));

        // Start the streams
        System.out.println("Starting Kafka Streams...");
        streams.start();
    }
}