AbstractTweetsProducer.java
package kafka_tweetoscope.tweetsProducer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.VoidSerializer;
import kafka_tweetoscope.tweetsSerializer.TweetSerializer;
import com.twitter.clientlib.model.Tweet;
public abstract class AbstractTweetsProducer {
protected String bootstrapServer;
protected String topicName;
protected KafkaProducer<Void, Tweet> producer;
public AbstractTweetsProducer(String bootstrapServer, String topicName) {
this.bootstrapServer = bootstrapServer;
this.topicName = topicName;
this.producer = new KafkaProducer<Void, Tweet>(configureKafkaProducer());
}
public abstract void run();
private Properties configureKafkaProducer() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
VoidSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
TweetSerializer.class.getName());
return producerProperties;
}
}