TweetsProducer.java

package kafka_tweetoscope.tweetsProducer;

public class TweetsProducer {
    private String source;
    private String topicName;
    private String boostrapServer;
    private AbstractTweetsProducer tweetsProducer;
    private String fileName;
    
    public TweetsProducer(String source, String boostrapServer, String topicName, String fileName) {
        this.source = source;
        this.boostrapServer = boostrapServer;
        this.topicName = topicName;
        this.fileName = fileName;
        switch (this.source) {
            case "random":
                tweetsProducer = new MockTwitterStreamRandom(this.boostrapServer, this.topicName);
                break;
            case "recorded":
                tweetsProducer = new MockTwitterStreamRecorded(this.boostrapServer, this.topicName, this.fileName);
                break;
            case "scenario":
                tweetsProducer = new MockTwitterStreamScenario(this.boostrapServer, this.topicName);
                break;
            default:
                throw new IllegalArgumentException(
                    "Invalid Tweet Source must be in random, recorded, scenario"
                    );
        }
    }

    public void start() {
        try{
            tweetsProducer.run();
        } catch (Exception e) {
            System.err.println("Error while running");
            System.err.println(e.getMessage());
        }
    
    }
    private static void printUsage() {
        System.out.println("Usage: java TweetsProducer [options]");
        System.out.println("Options:");
        System.out.println("  -b, --bootstrapServer     <kafka boostrapServer>  Specify the name of the boostrapServer (default: localhost:9092)");
        System.out.println("  -s, --source                                      Source for tweets");
        System.out.println("  -f  --file                                        Filename for recorded source");
        System.out.println("  -h, --help                                        Show this help message");
    }

    public static void main(String[] args){
        String boostrapServer = "localhost:9092";
        String source = "scenario";
        String topicName = "raw-tweets";
        String fileName = "mini";

        for (int i=0; i < args.length; i++) {
            switch (args[i]) {
                case("-b"):
                case("--boostrapServer"):
                    if (i+1 < args.length) {
                        boostrapServer=args[++i];
                    }
                break;
                case("-s"):
                case("--source"):
                    if (i+1 < args.length) {
                        source = args[++i];
                    }
                break;
                case("-f"):
                case("--file"):
                    if (i+1 < args.length) {
                        fileName = args[++i];
                    }
                break;
                case("-h"):
                case("--help"):
                    printUsage();
                    System.exit(0);
                
                default:
                    printUsage();
                    System.exit(1);
                
            }
        }
        TweetsProducer tweetsProducer = new TweetsProducer(source, boostrapServer, topicName, fileName);
        tweetsProducer.start();
    }
}