TweetsProducer.java
package kafka_tweetoscope.tweetsProducer;
public class TweetsProducer {
private String source;
private String topicName;
private String boostrapServer;
private AbstractTweetsProducer tweetsProducer;
private String fileName;
public TweetsProducer(String source, String boostrapServer, String topicName, String fileName) {
this.source = source;
this.boostrapServer = boostrapServer;
this.topicName = topicName;
this.fileName = fileName;
switch (this.source) {
case "random":
tweetsProducer = new MockTwitterStreamRandom(this.boostrapServer, this.topicName);
break;
case "recorded":
tweetsProducer = new MockTwitterStreamRecorded(this.boostrapServer, this.topicName, this.fileName);
break;
case "scenario":
tweetsProducer = new MockTwitterStreamScenario(this.boostrapServer, this.topicName);
break;
default:
throw new IllegalArgumentException(
"Invalid Tweet Source must be in random, recorded, scenario"
);
}
}
public void start() {
try{
tweetsProducer.run();
} catch (Exception e) {
System.err.println("Error while running");
System.err.println(e.getMessage());
}
}
private static void printUsage() {
System.out.println("Usage: java TweetsProducer [options]");
System.out.println("Options:");
System.out.println(" -b, --bootstrapServer <kafka boostrapServer> Specify the name of the boostrapServer (default: localhost:9092)");
System.out.println(" -s, --source Source for tweets");
System.out.println(" -f --file Filename for recorded source");
System.out.println(" -h, --help Show this help message");
}
public static void main(String[] args){
String boostrapServer = "localhost:9092";
String source = "scenario";
String topicName = "raw-tweets";
String fileName = "mini";
for (int i=0; i < args.length; i++) {
switch (args[i]) {
case("-b"):
case("--boostrapServer"):
if (i+1 < args.length) {
boostrapServer=args[++i];
}
break;
case("-s"):
case("--source"):
if (i+1 < args.length) {
source = args[++i];
}
break;
case("-f"):
case("--file"):
if (i+1 < args.length) {
fileName = args[++i];
}
break;
case("-h"):
case("--help"):
printUsage();
System.exit(0);
default:
printUsage();
System.exit(1);
}
}
TweetsProducer tweetsProducer = new TweetsProducer(source, boostrapServer, topicName, fileName);
tweetsProducer.start();
}
}