TweetFilter.java

  1. package kafka_tweetoscope.tweetsFilter;

  2. import org.apache.commons.cli.*;

  3. public class TweetFilter {
  4.     private AbstractTweetFilter filter;
  5.     private String filterCase;

  6.     // Constructor with filterCase and additional parameters
  7.     public TweetFilter(String bootstrapServer, String inputTopic, String outputTopic,
  8.                       String filterCase, String... additionalParams) {
  9.         this.filterCase = filterCase;
  10.         initializeFilter(bootstrapServer, inputTopic, outputTopic, additionalParams);
  11.     }

  12.     private void initializeFilter(String bootstrapServer, String inputTopic, String outputTopic,
  13.                                 String... additionalParams) {
  14.         switch (filterCase.toLowerCase()) {
  15.             case "lang":
  16.                 // Use the provided language parameter or default to "en"
  17.                 String language = (additionalParams != null && additionalParams.length > 0)
  18.                                 ? additionalParams[0] : "en";
  19.                 this.filter = new LanguageFilter(inputTopic, outputTopic, bootstrapServer, language);
  20.                 break;
  21.             case "size":
  22.                 // Use the provided size parameter or default to 10
  23.                 int minSize = (additionalParams != null && additionalParams.length > 0)
  24.                              ? Integer.parseInt(additionalParams[0]) : 10;
  25.                 this.filter = new MinimalSizeTweetFilter(inputTopic, outputTopic, bootstrapServer, minSize);
  26.                 break;
  27.             default:
  28.                 this.filter = new EmptyFilter(inputTopic, outputTopic, bootstrapServer);
  29.                 break;
  30.         }
  31.     }

  32.     public void start() {
  33.         try {
  34.             filter.run();
  35.         } catch (Exception e) {
  36.             System.err.println("TweetFilter Starting Error");
  37.             System.err.println(e.getMessage());
  38.             e.printStackTrace();
  39.         }
  40.     }

  41.     private static void printUsage() {
  42.         System.out.println("Usage: java TweetsFilter [options]");
  43.         System.out.println("Options:");
  44.         System.out.println(" -b, --bootstrapServer <kafka bootstrapServer> Specify the name of the bootstrapServer (default: localhost:9092)");
  45.         System.out.println(" -i, --inputTopic <topic> Specify the input topic name");
  46.         System.out.println(" -o, --outputTopic <topic> Specify the output topic name");
  47.         System.out.println(" -f, --filter <filterType> Specify the filter type (lang, size, etc.)");
  48.         System.out.println(" -l, --language <language> Specify the language code for lang filter (default: en)");
  49.         System.out.println(" -s, --size <minSize> Specify the minimal size for size filter (default: 10)");
  50.         System.out.println(" -h, --help Show this help message");
  51.     }

  52.     public static void main(String[] args) {
  53.         // Default values
  54.         String bootstrapServer = "localhost:9092";
  55.         String inputTopic = "raw-tweets";
  56.         String outputTopic = "filtered-tweets";
  57.         String filterCase = "empty";
  58.         String filterParam = null;

  59.         // Create command line options
  60.         Options options = new Options();
  61.         options.addOption("b", "bootstrapServer", true, "Kafka bootstrap server");
  62.         options.addOption("i", "inputTopic", true, "Input topic name");
  63.         options.addOption("o", "outputTopic", true, "Output topic name");
  64.         options.addOption("f", "filter", true, "Filter type");
  65.         options.addOption("l", "language", true, "Language code for lang filter");
  66.         options.addOption("s", "size", true, "Minimal size for size filter");
  67.         options.addOption("h", "help", false, "Show help message");

  68.         CommandLineParser parser = new DefaultParser();
  69.         try {
  70.             CommandLine cmd = parser.parse(options, args);

  71.             if (cmd.hasOption("h")) {
  72.                 printUsage();
  73.                 System.exit(0);
  74.             }

  75.             // Parse command line arguments
  76.             if (cmd.hasOption("b")) {
  77.                 bootstrapServer = cmd.getOptionValue("b");
  78.             }
  79.             if (cmd.hasOption("i")) {
  80.                 inputTopic = cmd.getOptionValue("i");
  81.             }
  82.             if (cmd.hasOption("o")) {
  83.                 outputTopic = cmd.getOptionValue("o");
  84.             }
  85.             if (cmd.hasOption("f")) {
  86.                 filterCase = cmd.getOptionValue("f");
  87.             }

  88.             // Handle filter-specific parameters
  89.             switch (filterCase.toLowerCase()) {
  90.                 case "lang":
  91.                     if (cmd.hasOption("l")) {
  92.                         filterParam = cmd.getOptionValue("l");
  93.                     }
  94.                     break;
  95.                 case "size":
  96.                     if (cmd.hasOption("s")) {
  97.                         filterParam = cmd.getOptionValue("s");
  98.                     }
  99.                     break;
  100.             }

  101.             // Create and start the filter
  102.             TweetFilter filter = new TweetFilter(bootstrapServer, inputTopic, outputTopic,
  103.                                                filterCase, filterParam);
  104.             filter.start();

  105.         } catch (ParseException e) {
  106.             System.err.println("Error parsing command line arguments: " + e.getMessage());
  107.             printUsage();
  108.             System.exit(1);
  109.         } catch (NumberFormatException e) {
  110.             System.err.println("Error: Size parameter must be a valid integer");
  111.             printUsage();
  112.             System.exit(1);
  113.         }
  114.     }
  115. }