MockTwitterStreamScenario.java

package kafka_tweetoscope.tweetsProducer;

import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.clientlib.model.Tweet;

public class MockTwitterStreamScenario extends AbstractTweetsProducer {

    public MockTwitterStreamScenario(String bootstrapServer, String topicName) {
		super(bootstrapServer, topicName);
    }
    
	@Override
	public void run() {
		Tweet[] tweets = new Tweet[10];
		
		tweets[0] = new Tweet();
		tweets[0].setId("001");
		tweets[0].setCreatedAt(OffsetDateTime.of(2022, 6, 20, 11, 46, 23, 0, ZoneOffset.UTC));
		tweets[0].setText("Choisissez un #travail que vous aimez et vous n'aurez pas à travailler un seul jour de votre vie");
		tweets[0].setAuthorId("31");
		tweets[0].setConversationId("01");
		tweets[0].getGeo();
		tweets[0].setLang("fr");
		
		tweets[1] = new Tweet();
		tweets[1].setId("002");
		tweets[1].setCreatedAt(OffsetDateTime.of(2022, 6, 20, 11, 48, 20, 0, ZoneOffset.UTC));
		tweets[1].setText("Si on travaille pour gagner sa vie, pourquoi se tuer au #travail ?");
		tweets[1].setAuthorId("31");
		tweets[1].setConversationId("01");
		tweets[1].getGeo();
		tweets[1].setLang("fr");
		
		tweets[2] = new Tweet();
		tweets[2].setId("003");
		tweets[2].setCreatedAt(OffsetDateTime.of(2022, 6, 24, 6, 6, 13, 0, ZoneOffset.UTC));
		tweets[2].setText("#Failure is not the opposite of #success: its part of success.");
		tweets[2].setAuthorId("32");
		tweets[2].setConversationId("01");
		tweets[2].getGeo();
		tweets[2].setLang("en");

		tweets[3] = new Tweet();
		tweets[3].setId("004");
		tweets[3].setCreatedAt(OffsetDateTime.of(2022, 7, 2, 4, 44, 17, 0, ZoneOffset.UTC));
		tweets[3].setText("You are not your resume, you are your #work.");
		tweets[3].setAuthorId("34");
		tweets[3].setConversationId("01");
		tweets[3].getGeo();
		tweets[3].setLang("en");

		tweets[4] = new Tweet();
		tweets[4].setId("005");
		tweets[4].setCreatedAt(OffsetDateTime.of(2022, 8, 20, 10, 49, 23, 0, ZoneOffset.UTC));
		tweets[4].setText("People who wonder if the glass is half empty or half full miss the point. The glass is refillable.");
		tweets[4].setAuthorId("35");
		tweets[4].setConversationId("03");
		tweets[4].getGeo();
		tweets[4].setLang("en");

		tweets[5] = new Tweet();
		tweets[5].setId("006");
		tweets[5].setCreatedAt(OffsetDateTime.of(2022, 9, 20, 11, 23, 31, 0, ZoneOffset.UTC));
		tweets[5].setText("If you think you are too small to make a difference, try sleeping with a mosquito.");
		tweets[5].setAuthorId("33");
		tweets[5].setConversationId("04");
		tweets[5].getGeo();
		tweets[5].setLang("en");

		tweets[6] = new Tweet();
		tweets[6].setId("007");
		tweets[6].setCreatedAt(OffsetDateTime.of(2022, 11, 30, 2, 15, 0, 0, ZoneOffset.UTC));
		tweets[6].setText("Nothing will #work unless you do.");
		tweets[6].setAuthorId("34");
		tweets[6].setConversationId("03");
		tweets[6].getGeo();
		tweets[6].setLang("en");

		tweets[7] = new Tweet();
		tweets[7].setId("008");
		tweets[7].setCreatedAt(OffsetDateTime.of(2022, 12, 1, 8, 30, 20, 0, ZoneOffset.UTC));
		tweets[7].setText("If you get tired, learn to rest, not to quit.");
		tweets[7].setAuthorId("35");
		tweets[7].setConversationId("05");
		tweets[7].getGeo();
		tweets[7].setLang("en");

		tweets[8] = new Tweet();
		tweets[8].setId("009");
		tweets[8].setCreatedAt(OffsetDateTime.of(2023, 1, 2, 7, 55, 56, 0, ZoneOffset.UTC));
		tweets[8].setText("#Failure is #success in progress.");
		tweets[8].setAuthorId("33");
		tweets[8].setConversationId("02");
		tweets[8].getGeo();
		tweets[8].setLang("en");

		tweets[9] = new Tweet();
		tweets[9].setId("010");
		tweets[9].setCreatedAt(OffsetDateTime.of(2023, 8, 3, 12, 40, 25, 0, ZoneOffset.UTC));
		tweets[9].setText("The only place #success comes before #work is in the dictionary.");
		tweets[9].setAuthorId("36");
		tweets[9].setConversationId("07");
		tweets[9].getGeo();
		tweets[9].setLang("en");

		for (int i = 0; i < tweets.length; i++) {
            Tweet tweet = tweets[i];
            producer.send(new ProducerRecord<Void, Tweet>(topicName, null, tweet));
		}
        producer.flush(); // Ensure all messages are sent
        producer.close(); // Close the producer properly
	}

    public static void main(String[] args) {
        try {
            MockTwitterStreamScenario scenario = new MockTwitterStreamScenario("localhost:9092", "raw-tweets");
            scenario.run();
        } catch (Exception e) {
            System.err.println("Error in main: " + e.getMessage());
            e.printStackTrace();
        }
    }
}