HashtagExtractor.java
package kafka_tweetoscope;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import com.twitter.twittertext.Extractor;
public class HashtagExtractor {
private final String bootstrapServer;
private final KafkaConsumer<Void, String> consumer;
private final KafkaProducer<String, Long> producer;
private final String inputTopic;
private final String outputTopic;
public HashtagExtractor(String bootstrapServer, String inputTopic, String outputTopic) {
this.bootstrapServer = bootstrapServer;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.consumer = new KafkaConsumer<>(configureConsumer());
this.producer = new KafkaProducer<>(configureProducer());
this.consumer.subscribe(Collections.singletonList(this.inputTopic));
}
public void run() {
try {
System.out.println("Starting to process tweets...");
Extractor twitterTextExtractor = new Extractor();
while (true) {
ConsumerRecords<Void, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Void, String> record : records) {
try {
String text = record.value();
List<String> hashtags = twitterTextExtractor.extractHashtags(text);
// Send each hashtag to the output topic
for (String hashtag : hashtags) {
String key = hashtag.toLowerCase();
Long value = (long) 1;
System.out.printf("Sending: Key='%s', Value=%d%n", key, value);
producer.send(
new ProducerRecord<>(outputTopic, key, value)
);
}
} catch (Exception e) {
System.err.println("Error processing tweet: " + e.getMessage());
}
}
// Ensure messages are sent
producer.flush();
}
} catch (Exception e) {
System.err.println("Error in main processing loop: " + e.getMessage());
e.printStackTrace();
} finally {
try {
consumer.close();
producer.close();
System.out.println("Closed consumer and producer");
} catch (Exception e) {
System.err.println("Error closing consumer/producer: " + e.getMessage());
}
}
}
private static void printUsage() {
System.out.println("Usage: java HashtagExtractor [options]");
System.out.println("Options:");
System.out.println(" -b, --bootstrapServer <kafka boostrapServer> Specify the name of the boostrapServer (default: localhost:9092)");
System.out.println(" -h, --help Show this help message");
}
public static void main(String[] args) {
String bootsrapServer = "localhost:9092";
for (int i=0; i<args.length; i++) {
switch (args[i]) {
case("-b"):
case("--bootsrapServer"):
if (i+1 < args.length) {
bootsrapServer = args[++i];
}
break;
case("-h"):
case("--help"):
printUsage();
System.exit(1);
break;
default:
printUsage();
System.exit(0);
}
}
HashtagExtractor extractor = new HashtagExtractor(bootsrapServer,"filtered-tweets", "hashtags");
extractor.run();
}
private Properties configureConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "hashtags-extractor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return consumerProps;
}
private Properties configureProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
return producerProps;
}
}