AbstractTweetFilter.java

package kafka_tweetoscope.tweetsFilter;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.apache.kafka.common.serialization.VoidSerializer;
import com.twitter.clientlib.model.Tweet;

import kafka_tweetoscope.tweetsSerializer.TweetDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public abstract class AbstractTweetFilter {
    protected KafkaConsumer<Void, Tweet> consumer;
    protected KafkaProducer<Void, String> producer;
    protected String inputTopic;
    protected String outputTopic;
    protected String bootstrapServer;

    /**
     * @param inputTopic
     * @param outputTopic
     * @param bootstrapServer
     */
    protected AbstractTweetFilter(String inputTopic, String outputTopic, String bootstrapServer) {

        this.inputTopic = inputTopic;
        this.outputTopic = outputTopic;
        this.bootstrapServer = bootstrapServer;
        this.consumer = new KafkaConsumer<Void, Tweet>(configureConsumer());
        this.producer = new KafkaProducer<Void, String>(configureProducer());
        this.consumer.subscribe(Collections.singletonList(inputTopic));
    }

    public void run() {
        try {
            System.out.println("Starting to process tweets...");
            
            while (true) {
                ConsumerRecords<Void, Tweet> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<Void, Tweet> record : records) {
                    try {
                        Tweet tweet = record.value();
                        if (filterTweet(tweet)) {
                            producer.send(new ProducerRecord<Void, String>(outputTopic, null, tweet.getText()));
                        }
    
                    } catch (Exception e) {
                        System.err.println("Error processing tweet: " + e.getMessage());
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("Error in main processing loop: " + e.getMessage());
            e.printStackTrace();
        } finally {
            try {
                consumer.close();
                producer.close();
                System.out.println("Closed consumer and producer");
            } catch (Exception e) {
                System.err.println("Error closing consumer/producer: " + e.getMessage());
            }
        }
    }

    // Abstract method to be implemented by specific filters
    protected abstract boolean filterTweet(Tweet tweet);



    protected Properties configureConsumer() {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "tweet-filter-group-" + this.getClass().getSimpleName());
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TweetDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return consumerProps;
    }

    protected Properties configureProducer() {
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return producerProps;
    }
}