Здравствуйте, я новичок в Spark. Я хотел бы сделать какой-нибудь проект Spark, который будет собирать и обрабатывать твиты из этой социальной сети с помощью модуля искровой потоковой передачи (для моего небольшого исследования в университете).Но у меня есть небольшая проблема, я не знаю, как теперь получать твиты только на английском языке. Может ли кто-нибудь помочь мне с этим? Я пытался выполнить операцию фильтрации с уже полученными данными, но у меня есть java.lang.NullPointerException в этой строке: "if (status.getPlace (). getCountryCode (). equals ("(us)")) ".Но это тоже плохое решение. Можно ли отфильтровать данные перед получением?Пожалуйста, помогите, я действительно этого не делаю сейчас. Я буду рад получить ваши подсказки.
package TwitterAnalysis;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.twitter.*;
import twitter4j.GeoLocation;
import twitter4j.Status;
public class Twitter {
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterOAuthKey.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterOAuthKey.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterOAuthKey.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterOAuthKey.accessTokenSecret);
}
public static void main(String [] args) {
setTwitterOAuth();
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkTwitter");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//filtering already received tweets
JavaDStream<Status> englishTweets=twitterStream.filter(
new Function <Status, Boolean>(){
public Boolean call (Status status){
if (status.getPlace().getCountryCode().equals("(us)")){
return true;
}else
{return false;}
}
}
);
//Without filter: Output text of all tweets
JavaDStream<String> statuses = englishTweets.map(
new Function<Status, String>() {
public String call(Status status) { return status.getText(); }
}
);
statuses.print();
jssc.start();
}
}