TweetFilter.java
- package kafka_tweetoscope.tweetsFilter;
- import org.apache.commons.cli.*;
- public class TweetFilter {
- private AbstractTweetFilter filter;
- private String filterCase;
- // Constructor with filterCase and additional parameters
- public TweetFilter(String bootstrapServer, String inputTopic, String outputTopic,
- String filterCase, String... additionalParams) {
- this.filterCase = filterCase;
- initializeFilter(bootstrapServer, inputTopic, outputTopic, additionalParams);
- }
- private void initializeFilter(String bootstrapServer, String inputTopic, String outputTopic,
- String... additionalParams) {
- switch (filterCase.toLowerCase()) {
- case "lang":
- // Use the provided language parameter or default to "en"
- String language = (additionalParams != null && additionalParams.length > 0)
- ? additionalParams[0] : "en";
- this.filter = new LanguageFilter(inputTopic, outputTopic, bootstrapServer, language);
- break;
- case "size":
- // Use the provided size parameter or default to 10
- int minSize = (additionalParams != null && additionalParams.length > 0)
- ? Integer.parseInt(additionalParams[0]) : 10;
- this.filter = new MinimalSizeTweetFilter(inputTopic, outputTopic, bootstrapServer, minSize);
- break;
- default:
- this.filter = new EmptyFilter(inputTopic, outputTopic, bootstrapServer);
- break;
- }
- }
- public void start() {
- try {
- filter.run();
- } catch (Exception e) {
- System.err.println("TweetFilter Starting Error");
- System.err.println(e.getMessage());
- e.printStackTrace();
- }
- }
- private static void printUsage() {
- System.out.println("Usage: java TweetsFilter [options]");
- System.out.println("Options:");
- System.out.println(" -b, --bootstrapServer <kafka bootstrapServer> Specify the name of the bootstrapServer (default: localhost:9092)");
- System.out.println(" -i, --inputTopic <topic> Specify the input topic name");
- System.out.println(" -o, --outputTopic <topic> Specify the output topic name");
- System.out.println(" -f, --filter <filterType> Specify the filter type (lang, size, etc.)");
- System.out.println(" -l, --language <language> Specify the language code for lang filter (default: en)");
- System.out.println(" -s, --size <minSize> Specify the minimal size for size filter (default: 10)");
- System.out.println(" -h, --help Show this help message");
- }
- public static void main(String[] args) {
- // Default values
- String bootstrapServer = "localhost:9092";
- String inputTopic = "raw-tweets";
- String outputTopic = "filtered-tweets";
- String filterCase = "empty";
- String filterParam = null;
- // Create command line options
- Options options = new Options();
- options.addOption("b", "bootstrapServer", true, "Kafka bootstrap server");
- options.addOption("i", "inputTopic", true, "Input topic name");
- options.addOption("o", "outputTopic", true, "Output topic name");
- options.addOption("f", "filter", true, "Filter type");
- options.addOption("l", "language", true, "Language code for lang filter");
- options.addOption("s", "size", true, "Minimal size for size filter");
- options.addOption("h", "help", false, "Show help message");
- CommandLineParser parser = new DefaultParser();
- try {
- CommandLine cmd = parser.parse(options, args);
- if (cmd.hasOption("h")) {
- printUsage();
- System.exit(0);
- }
- // Parse command line arguments
- if (cmd.hasOption("b")) {
- bootstrapServer = cmd.getOptionValue("b");
- }
- if (cmd.hasOption("i")) {
- inputTopic = cmd.getOptionValue("i");
- }
- if (cmd.hasOption("o")) {
- outputTopic = cmd.getOptionValue("o");
- }
- if (cmd.hasOption("f")) {
- filterCase = cmd.getOptionValue("f");
- }
- // Handle filter-specific parameters
- switch (filterCase.toLowerCase()) {
- case "lang":
- if (cmd.hasOption("l")) {
- filterParam = cmd.getOptionValue("l");
- }
- break;
- case "size":
- if (cmd.hasOption("s")) {
- filterParam = cmd.getOptionValue("s");
- }
- break;
- }
- // Create and start the filter
- TweetFilter filter = new TweetFilter(bootstrapServer, inputTopic, outputTopic,
- filterCase, filterParam);
- filter.start();
- } catch (ParseException e) {
- System.err.println("Error parsing command line arguments: " + e.getMessage());
- printUsage();
- System.exit(1);
- } catch (NumberFormatException e) {
- System.err.println("Error: Size parameter must be a valid integer");
- printUsage();
- System.exit(1);
- }
- }
- }