TweetFilter.java
package kafka_tweetoscope.tweetsFilter;
import org.apache.commons.cli.*;
public class TweetFilter {
private AbstractTweetFilter filter;
private String filterCase;
// Constructor with filterCase and additional parameters
public TweetFilter(String bootstrapServer, String inputTopic, String outputTopic,
String filterCase, String... additionalParams) {
this.filterCase = filterCase;
initializeFilter(bootstrapServer, inputTopic, outputTopic, additionalParams);
}
private void initializeFilter(String bootstrapServer, String inputTopic, String outputTopic,
String... additionalParams) {
switch (filterCase.toLowerCase()) {
case "lang":
// Use the provided language parameter or default to "en"
String language = (additionalParams != null && additionalParams.length > 0)
? additionalParams[0] : "en";
this.filter = new LanguageFilter(inputTopic, outputTopic, bootstrapServer, language);
break;
case "size":
// Use the provided size parameter or default to 10
int minSize = (additionalParams != null && additionalParams.length > 0)
? Integer.parseInt(additionalParams[0]) : 10;
this.filter = new MinimalSizeTweetFilter(inputTopic, outputTopic, bootstrapServer, minSize);
break;
default:
this.filter = new EmptyFilter(inputTopic, outputTopic, bootstrapServer);
break;
}
}
public void start() {
try {
filter.run();
} catch (Exception e) {
System.err.println("TweetFilter Starting Error");
System.err.println(e.getMessage());
e.printStackTrace();
}
}
private static void printUsage() {
System.out.println("Usage: java TweetsFilter [options]");
System.out.println("Options:");
System.out.println(" -b, --bootstrapServer <kafka bootstrapServer> Specify the name of the bootstrapServer (default: localhost:9092)");
System.out.println(" -i, --inputTopic <topic> Specify the input topic name");
System.out.println(" -o, --outputTopic <topic> Specify the output topic name");
System.out.println(" -f, --filter <filterType> Specify the filter type (lang, size, etc.)");
System.out.println(" -l, --language <language> Specify the language code for lang filter (default: en)");
System.out.println(" -s, --size <minSize> Specify the minimal size for size filter (default: 10)");
System.out.println(" -h, --help Show this help message");
}
public static void main(String[] args) {
// Default values
String bootstrapServer = "localhost:9092";
String inputTopic = "raw-tweets";
String outputTopic = "filtered-tweets";
String filterCase = "empty";
String filterParam = null;
// Create command line options
Options options = new Options();
options.addOption("b", "bootstrapServer", true, "Kafka bootstrap server");
options.addOption("i", "inputTopic", true, "Input topic name");
options.addOption("o", "outputTopic", true, "Output topic name");
options.addOption("f", "filter", true, "Filter type");
options.addOption("l", "language", true, "Language code for lang filter");
options.addOption("s", "size", true, "Minimal size for size filter");
options.addOption("h", "help", false, "Show help message");
CommandLineParser parser = new DefaultParser();
try {
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("h")) {
printUsage();
System.exit(0);
}
// Parse command line arguments
if (cmd.hasOption("b")) {
bootstrapServer = cmd.getOptionValue("b");
}
if (cmd.hasOption("i")) {
inputTopic = cmd.getOptionValue("i");
}
if (cmd.hasOption("o")) {
outputTopic = cmd.getOptionValue("o");
}
if (cmd.hasOption("f")) {
filterCase = cmd.getOptionValue("f");
}
// Handle filter-specific parameters
switch (filterCase.toLowerCase()) {
case "lang":
if (cmd.hasOption("l")) {
filterParam = cmd.getOptionValue("l");
}
break;
case "size":
if (cmd.hasOption("s")) {
filterParam = cmd.getOptionValue("s");
}
break;
}
// Create and start the filter
TweetFilter filter = new TweetFilter(bootstrapServer, inputTopic, outputTopic,
filterCase, filterParam);
filter.start();
} catch (ParseException e) {
System.err.println("Error parsing command line arguments: " + e.getMessage());
printUsage();
System.exit(1);
} catch (NumberFormatException e) {
System.err.println("Error: Size parameter must be a valid integer");
printUsage();
System.exit(1);
}
}
}