TweetFilter.java

package kafka_tweetoscope.tweetsFilter;

import org.apache.commons.cli.*;

public class TweetFilter {
    private AbstractTweetFilter filter;
    private String filterCase;

    // Constructor with filterCase and additional parameters
    public TweetFilter(String bootstrapServer, String inputTopic, String outputTopic, 
                      String filterCase, String... additionalParams) {
        this.filterCase = filterCase;
        initializeFilter(bootstrapServer, inputTopic, outputTopic, additionalParams);
    }

    private void initializeFilter(String bootstrapServer, String inputTopic, String outputTopic, 
                                String... additionalParams) {
        switch (filterCase.toLowerCase()) {
            case "lang":
                // Use the provided language parameter or default to "en"
                String language = (additionalParams != null && additionalParams.length > 0) 
                                ? additionalParams[0] : "en";
                this.filter = new LanguageFilter(inputTopic, outputTopic, bootstrapServer, language);
                break;
            case "size":
                // Use the provided size parameter or default to 10
                int minSize = (additionalParams != null && additionalParams.length > 0) 
                             ? Integer.parseInt(additionalParams[0]) : 10;
                this.filter = new MinimalSizeTweetFilter(inputTopic, outputTopic, bootstrapServer, minSize);
                break;
            default:
                this.filter = new EmptyFilter(inputTopic, outputTopic, bootstrapServer);
                break;
        }
    }

    public void start() {
        try {
            filter.run();
        } catch (Exception e) {
            System.err.println("TweetFilter Starting Error");
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }

    private static void printUsage() {
        System.out.println("Usage: java TweetsFilter [options]");
        System.out.println("Options:");
        System.out.println(" -b, --bootstrapServer <kafka bootstrapServer> Specify the name of the bootstrapServer (default: localhost:9092)");
        System.out.println(" -i, --inputTopic <topic> Specify the input topic name");
        System.out.println(" -o, --outputTopic <topic> Specify the output topic name");
        System.out.println(" -f, --filter <filterType> Specify the filter type (lang, size, etc.)");
        System.out.println(" -l, --language <language> Specify the language code for lang filter (default: en)");
        System.out.println(" -s, --size <minSize> Specify the minimal size for size filter (default: 10)");
        System.out.println(" -h, --help Show this help message");
    }

    public static void main(String[] args) {
        // Default values
        String bootstrapServer = "localhost:9092";
        String inputTopic = "raw-tweets";
        String outputTopic = "filtered-tweets";
        String filterCase = "empty";
        String filterParam = null;

        // Create command line options
        Options options = new Options();
        options.addOption("b", "bootstrapServer", true, "Kafka bootstrap server");
        options.addOption("i", "inputTopic", true, "Input topic name");
        options.addOption("o", "outputTopic", true, "Output topic name");
        options.addOption("f", "filter", true, "Filter type");
        options.addOption("l", "language", true, "Language code for lang filter");
        options.addOption("s", "size", true, "Minimal size for size filter");
        options.addOption("h", "help", false, "Show help message");

        CommandLineParser parser = new DefaultParser();
        try {
            CommandLine cmd = parser.parse(options, args);

            if (cmd.hasOption("h")) {
                printUsage();
                System.exit(0);
            }

            // Parse command line arguments
            if (cmd.hasOption("b")) {
                bootstrapServer = cmd.getOptionValue("b");
            }
            if (cmd.hasOption("i")) {
                inputTopic = cmd.getOptionValue("i");
            }
            if (cmd.hasOption("o")) {
                outputTopic = cmd.getOptionValue("o");
            }
            if (cmd.hasOption("f")) {
                filterCase = cmd.getOptionValue("f");
            }

            // Handle filter-specific parameters
            switch (filterCase.toLowerCase()) {
                case "lang":
                    if (cmd.hasOption("l")) {
                        filterParam = cmd.getOptionValue("l");
                    }
                    break;
                case "size":
                    if (cmd.hasOption("s")) {
                        filterParam = cmd.getOptionValue("s");
                    }
                    break;
            }

            // Create and start the filter
            TweetFilter filter = new TweetFilter(bootstrapServer, inputTopic, outputTopic, 
                                               filterCase, filterParam);
            filter.start();

        } catch (ParseException e) {
            System.err.println("Error parsing command line arguments: " + e.getMessage());
            printUsage();
            System.exit(1);
        } catch (NumberFormatException e) {
            System.err.println("Error: Size parameter must be a valid integer");
            printUsage();
            System.exit(1);
        }
    }
}