LanguageFilter.java

package kafka_tweetoscope.tweetsFilter;

import com.twitter.clientlib.model.Tweet;

/**
 * 
 * LanguageFilter
 * 
 */
public class LanguageFilter extends AbstractTweetFilter {
    private final String targetLanguage;

    public LanguageFilter(String inputTopic, String outputTopic, String bootstrapServer, String targetLanguage) {
        super(inputTopic, outputTopic, bootstrapServer);
        this.targetLanguage = targetLanguage;
    }

    @Override
    protected boolean filterTweet(Tweet tweet) {
        return tweet.getLang().equals(targetLanguage);
    }

    public static void main(String[] args) {
        // String language = args.length > 0 ? args[0] : "en";
        LanguageFilter filter = new LanguageFilter("raw-tweets", "filtered-tweets", "localhost:9092", "en");
        filter.run();
    }
}