HashtagExtractor.java

package kafka_tweetoscope;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;

import com.twitter.twittertext.Extractor;

public class HashtagExtractor {
    private final String bootstrapServer;
    private final KafkaConsumer<Void, String> consumer;
    private final KafkaProducer<String, Long> producer;
    private final String inputTopic;
    private final String outputTopic;

    public HashtagExtractor(String bootstrapServer, String inputTopic, String outputTopic) {
        this.bootstrapServer = bootstrapServer;
        this.inputTopic = inputTopic;
        this.outputTopic = outputTopic;
        this.consumer = new KafkaConsumer<>(configureConsumer());
        this.producer = new KafkaProducer<>(configureProducer());
        this.consumer.subscribe(Collections.singletonList(this.inputTopic));
    }

    public void run() {
        try {
            System.out.println("Starting to process tweets...");
            Extractor twitterTextExtractor = new Extractor();
            
            while (true) {
                ConsumerRecords<Void, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<Void, String> record : records) {
                    try {
                        String text = record.value();
                        List<String> hashtags = twitterTextExtractor.extractHashtags(text);

                        // Send each hashtag to the output topic
                        for (String hashtag : hashtags) {
                            String key = hashtag.toLowerCase();
                            Long value = (long) 1;
                            System.out.printf("Sending: Key='%s', Value=%d%n", key, value);
                            producer.send(
                                new ProducerRecord<>(outputTopic, key, value)
                            );
                        }
                        
                    } catch (Exception e) {
                        System.err.println("Error processing tweet: " + e.getMessage());
                    }
                }
                // Ensure messages are sent
                producer.flush();
            }
        } catch (Exception e) {
            System.err.println("Error in main processing loop: " + e.getMessage());
            e.printStackTrace();
        } finally {
            try {
                consumer.close();
                producer.close();
                System.out.println("Closed consumer and producer");
            } catch (Exception e) {
                System.err.println("Error closing consumer/producer: " + e.getMessage());
            }
        }
    }

    private static void printUsage() {
        System.out.println("Usage: java HashtagExtractor [options]");
        System.out.println("Options:");
        System.out.println("  -b, --bootstrapServer     <kafka boostrapServer>  Specify the name of the boostrapServer (default: localhost:9092)");
        System.out.println("  -h, --help                                        Show this help message");
    }

    public static void main(String[] args) {
        String bootsrapServer = "localhost:9092";
        for (int i=0; i<args.length; i++) {
            switch (args[i]) {
                case("-b"):
                case("--bootsrapServer"):
                if (i+1 < args.length) {
                    bootsrapServer = args[++i];
                }
                break;
                case("-h"):
                case("--help"):
                    printUsage();
                    System.exit(1);
                break;
                default:
                    printUsage();
                    System.exit(0);
            }
        }
        HashtagExtractor extractor = new HashtagExtractor(bootsrapServer,"filtered-tweets", "hashtags");
        extractor.run();
    }

    private Properties configureConsumer() {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "hashtags-extractor");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return consumerProps;
    }

    private Properties configureProducer() {
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        return producerProps;
    }
}