AbstractTweetFilter.java
package kafka_tweetoscope.tweetsFilter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.apache.kafka.common.serialization.VoidSerializer;
import com.twitter.clientlib.model.Tweet;
import kafka_tweetoscope.tweetsSerializer.TweetDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public abstract class AbstractTweetFilter {
protected KafkaConsumer<Void, Tweet> consumer;
protected KafkaProducer<Void, String> producer;
protected String inputTopic;
protected String outputTopic;
protected String bootstrapServer;
/**
* @param inputTopic
* @param outputTopic
* @param bootstrapServer
*/
protected AbstractTweetFilter(String inputTopic, String outputTopic, String bootstrapServer) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.bootstrapServer = bootstrapServer;
this.consumer = new KafkaConsumer<Void, Tweet>(configureConsumer());
this.producer = new KafkaProducer<Void, String>(configureProducer());
this.consumer.subscribe(Collections.singletonList(inputTopic));
}
public void run() {
try {
System.out.println("Starting to process tweets...");
while (true) {
ConsumerRecords<Void, Tweet> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Void, Tweet> record : records) {
try {
Tweet tweet = record.value();
if (filterTweet(tweet)) {
producer.send(new ProducerRecord<Void, String>(outputTopic, null, tweet.getText()));
}
} catch (Exception e) {
System.err.println("Error processing tweet: " + e.getMessage());
}
}
}
} catch (Exception e) {
System.err.println("Error in main processing loop: " + e.getMessage());
e.printStackTrace();
} finally {
try {
consumer.close();
producer.close();
System.out.println("Closed consumer and producer");
} catch (Exception e) {
System.err.println("Error closing consumer/producer: " + e.getMessage());
}
}
}
// Abstract method to be implemented by specific filters
protected abstract boolean filterTweet(Tweet tweet);
protected Properties configureConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "tweet-filter-group-" + this.getClass().getSimpleName());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TweetDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return consumerProps;
}
protected Properties configureProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return producerProps;
}
}