MockTwitterStreamRandom.java
package kafka_tweetoscope.tweetsProducer;
import java.time.OffsetDateTime;
import java.util.Random;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.twitter.clientlib.model.Tweet;
public class MockTwitterStreamRandom extends AbstractTweetsProducer {
public MockTwitterStreamRandom(String bootstrapServer, String topicName) {
super(bootstrapServer, topicName);
}
@Override
public void run() {
String[] hashtags = { "fun", "bitCoin", "climate", "crypto", "CS", "Metz", "weather", "summer", "holidays",
"health", "running", "sport" };
String[] languages = { "fr", "en", "ru", "es", "it" };
Tweet tweet;
int nb = 0;
String text;
Random r = new Random();
while (true) {
// crafts a random Tweet
nb++;
tweet = new Tweet();
text = "Tweet " + nb;
for (int i = 0; i < (int) (4 * Math.random()); i++) {
double d;
do {
d = r.nextGaussian();
d = (int) (hashtags.length / 2 + d * hashtags.length / 2);
} while (d < 0 || d > hashtags.length - 1);
text += "#" + hashtags[(int) d] + " ";
}
tweet.setId("" + nb);
tweet.setText(text);
tweet.setLang(languages[(int) (Math.random() * languages.length)]);
OffsetDateTime now = OffsetDateTime.now();
tweet.createdAt(now);
producer.send(new ProducerRecord<Void, Tweet>(topicName, null, tweet));
// waits for a while
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MockTwitterStreamRandom mockStreamRandom = new MockTwitterStreamRandom("raw-tweets", "localhost:9092");
mockStreamRandom.run();
}
}