AbstractTweetsProducer.java

package kafka_tweetoscope.tweetsProducer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.VoidSerializer;

import kafka_tweetoscope.tweetsSerializer.TweetSerializer;

import com.twitter.clientlib.model.Tweet;

public abstract class AbstractTweetsProducer {
    protected String bootstrapServer;
    protected String topicName;
    protected KafkaProducer<Void, Tweet> producer;


    public AbstractTweetsProducer(String bootstrapServer, String topicName) {
        this.bootstrapServer = bootstrapServer;
        this.topicName = topicName;
        this.producer = new KafkaProducer<Void, Tweet>(configureKafkaProducer());
    }

    public abstract void run();

    private Properties configureKafkaProducer() {
        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                VoidSerializer.class.getName());
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                TweetSerializer.class.getName());
        return producerProperties;
    }
}