MockTwitterStreamScenario.java
package kafka_tweetoscope.tweetsProducer;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.twitter.clientlib.model.Tweet;
public class MockTwitterStreamScenario extends AbstractTweetsProducer {
public MockTwitterStreamScenario(String bootstrapServer, String topicName) {
super(bootstrapServer, topicName);
}
@Override
public void run() {
Tweet[] tweets = new Tweet[10];
tweets[0] = new Tweet();
tweets[0].setId("001");
tweets[0].setCreatedAt(OffsetDateTime.of(2022, 6, 20, 11, 46, 23, 0, ZoneOffset.UTC));
tweets[0].setText("Choisissez un #travail que vous aimez et vous n'aurez pas à travailler un seul jour de votre vie");
tweets[0].setAuthorId("31");
tweets[0].setConversationId("01");
tweets[0].getGeo();
tweets[0].setLang("fr");
tweets[1] = new Tweet();
tweets[1].setId("002");
tweets[1].setCreatedAt(OffsetDateTime.of(2022, 6, 20, 11, 48, 20, 0, ZoneOffset.UTC));
tweets[1].setText("Si on travaille pour gagner sa vie, pourquoi se tuer au #travail ?");
tweets[1].setAuthorId("31");
tweets[1].setConversationId("01");
tweets[1].getGeo();
tweets[1].setLang("fr");
tweets[2] = new Tweet();
tweets[2].setId("003");
tweets[2].setCreatedAt(OffsetDateTime.of(2022, 6, 24, 6, 6, 13, 0, ZoneOffset.UTC));
tweets[2].setText("#Failure is not the opposite of #success: its part of success.");
tweets[2].setAuthorId("32");
tweets[2].setConversationId("01");
tweets[2].getGeo();
tweets[2].setLang("en");
tweets[3] = new Tweet();
tweets[3].setId("004");
tweets[3].setCreatedAt(OffsetDateTime.of(2022, 7, 2, 4, 44, 17, 0, ZoneOffset.UTC));
tweets[3].setText("You are not your resume, you are your #work.");
tweets[3].setAuthorId("34");
tweets[3].setConversationId("01");
tweets[3].getGeo();
tweets[3].setLang("en");
tweets[4] = new Tweet();
tweets[4].setId("005");
tweets[4].setCreatedAt(OffsetDateTime.of(2022, 8, 20, 10, 49, 23, 0, ZoneOffset.UTC));
tweets[4].setText("People who wonder if the glass is half empty or half full miss the point. The glass is refillable.");
tweets[4].setAuthorId("35");
tweets[4].setConversationId("03");
tweets[4].getGeo();
tweets[4].setLang("en");
tweets[5] = new Tweet();
tweets[5].setId("006");
tweets[5].setCreatedAt(OffsetDateTime.of(2022, 9, 20, 11, 23, 31, 0, ZoneOffset.UTC));
tweets[5].setText("If you think you are too small to make a difference, try sleeping with a mosquito.");
tweets[5].setAuthorId("33");
tweets[5].setConversationId("04");
tweets[5].getGeo();
tweets[5].setLang("en");
tweets[6] = new Tweet();
tweets[6].setId("007");
tweets[6].setCreatedAt(OffsetDateTime.of(2022, 11, 30, 2, 15, 0, 0, ZoneOffset.UTC));
tweets[6].setText("Nothing will #work unless you do.");
tweets[6].setAuthorId("34");
tweets[6].setConversationId("03");
tweets[6].getGeo();
tweets[6].setLang("en");
tweets[7] = new Tweet();
tweets[7].setId("008");
tweets[7].setCreatedAt(OffsetDateTime.of(2022, 12, 1, 8, 30, 20, 0, ZoneOffset.UTC));
tweets[7].setText("If you get tired, learn to rest, not to quit.");
tweets[7].setAuthorId("35");
tweets[7].setConversationId("05");
tweets[7].getGeo();
tweets[7].setLang("en");
tweets[8] = new Tweet();
tweets[8].setId("009");
tweets[8].setCreatedAt(OffsetDateTime.of(2023, 1, 2, 7, 55, 56, 0, ZoneOffset.UTC));
tweets[8].setText("#Failure is #success in progress.");
tweets[8].setAuthorId("33");
tweets[8].setConversationId("02");
tweets[8].getGeo();
tweets[8].setLang("en");
tweets[9] = new Tweet();
tweets[9].setId("010");
tweets[9].setCreatedAt(OffsetDateTime.of(2023, 8, 3, 12, 40, 25, 0, ZoneOffset.UTC));
tweets[9].setText("The only place #success comes before #work is in the dictionary.");
tweets[9].setAuthorId("36");
tweets[9].setConversationId("07");
tweets[9].getGeo();
tweets[9].setLang("en");
for (int i = 0; i < tweets.length; i++) {
Tweet tweet = tweets[i];
producer.send(new ProducerRecord<Void, Tweet>(topicName, null, tweet));
}
producer.flush(); // Ensure all messages are sent
producer.close(); // Close the producer properly
}
public static void main(String[] args) {
try {
MockTwitterStreamScenario scenario = new MockTwitterStreamScenario("localhost:9092", "raw-tweets");
scenario.run();
} catch (Exception e) {
System.err.println("Error in main: " + e.getMessage());
e.printStackTrace();
}
}
}