Невозможно запустить JAR - Spark Twitter Streaming с Java
/ 11 ноября 2019

Я использую Spark 2.4.3 в автономном режиме в Ubuntu. Я использую Maven для создания файла JAR. Ниже приведен код, который я пытаюсь запустить и предназначенный для потоковой передачи данных из Twitter. Как только Spark запущен, Spark Master будет в Используемая версия Java: 1.8.

package SparkTwitter.SparkJavaTwitter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;

import scala.Tuple2;
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.OAuthAuthorization;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

import com.google.common.collect.Iterables;

public class TwitterStream {

    public static void main(String[] args) {
        // Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
        final SparkConf sparkConf = new SparkConf().setAppName("Twitter Data Processing").setMaster("local[2]");
        // Create Streaming context using spark configuration and duration for which messages will be batched and fed to Spark Core
        final JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Duration.apply(10000));

        // Prepare configuration for Twitter authentication and authorization
        final Configuration conf = new ConfigurationBuilder().setDebugEnabled(false)
                                        .setOAuthConsumerKey("customer key")
                                        .setOAuthConsumerSecret("customer key secret")
                                        .setOAuthAccessToken("Access token")
                                        .setOAuthAccessTokenSecret("Access token secret")
        // Create Twitter authorization object by passing prepared configuration containing consumer and access keys and tokens
        final Authorization twitterAuth = new OAuthAuthorization(conf);
        // Create a data stream using streaming context and Twitter authorization
        final JavaReceiverInputDStream<Status> inputDStream = TwitterUtils.createStream(streamingContext, twitterAuth, new String[]{});
        // Create a new stream by filtering the non english tweets from earlier streams
        final JavaDStream<Status> enTweetsDStream = inputDStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
        // Convert stream to pair stream with key as user screen name and value as tweet text
        final JavaPairDStream<String, String> userTweetsStream = 
                                    (status) -> new Tuple2<String, String>(status.getUser().getScreenName(), status.getText())

        // Group the tweets for each user
        final JavaPairDStream<String, Iterable<String>> tweetsReducedByUser = userTweetsStream.groupByKey();
        // Create a new pair stream by replacing iterable of tweets in older pair stream to number of tweets
        final JavaPairDStream<String, Integer> tweetsMappedByUser = tweetsReducedByUser.mapToPair(
                    userTweets -> new Tuple2<String, Integer>(userTweets._1, Iterables.size(userTweets._2))
        // Iterate over the stream's RDDs and print each element on console
        tweetsMappedByUser.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)pairRDD -> {
            pairRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() {

                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t._1() + "," + t._2());

        // Triggers the start of processing. Nothing happens if streaming context is not started
        // Keeps the processing live by halting here unless terminated manually




<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">






    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter -->


Для выполнения кода я использую следующую команду

./bin/spark-submit --class SparkTwitter.SparkJavaTwitter.TwitterStream /home/hadoop/eclipse-workspace/SparkJavaTwitter/target/SparkJavaTwitter-0.0.1-SNAPSHOT.jar

Нижеэто вывод, который я получаю.

19/11/10 22:17:58 WARN Utils: Your hostname, hadoop-VirtualBox resolves to a loopback address:; using instead (on interface enp0s3)
19/11/10 22:17:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/10 22:17:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Warning: Failed to load SparkTwitter.SparkJavaTwitter.TwitterStream: twitter4j/auth/Authorization
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Я запустил программу подсчета слов таким же образом, и она отлично работает. Когда я собираю JAR, он также успешно строится. Нужно ли указывать какие-либо дополнительные параметры во время работы JAR?
