Ошибка при отправке сообщения Кафкой в ​​Spark Streaming - PullRequest
0 голосов
/ 06 апреля 2019

Я отправляю файл на потоковое воспроизведение с использованием kafka и spark.Искра является потребителем.Я отправляю данные примерно так: «cat ~ / WISDM_ar_v1.1_raw.txt | bin / kafka-console-producer.sh --broker-list localhost: 9092 --topic test».Затем пишите в консоль ">>>>>>>>>>>>>>>>>>>".Затем, когда спарк работает с данными и если Кафка заканчивает посылать сообщение, спарк останавливается, потому что кафка останавливается раньше.

Я использую Spark 2.4.0 и Кафка 2.1

отправка данных производителю кафки

cat ~/WISDM_ar_v1.1_raw.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test"

начать потоковую передачу с использованием jar

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,org.mongodb:mongo-java-driver:3.10.0 --class org.apache.spark.spark_streaming_kafka_0_10_2.App /home/mustafa/eclipse-workspace/sparkJava.jar

sparkJava

        BasicConfigurator.configure();

        mongoClient = new MongoClient(new ServerAddress("localhost", 27017));
        db = mongoClient.getDatabase("people"); 
        collection  =  db.getCollection("persondetails");
        Document document = new Document();



        SparkConf conf=new SparkConf().setAppName("kafka-sandbox").setMaster("local[*]");
        JavaSparkContext sc=new JavaSparkContext(conf);
        JavaStreamingContext ssc=new JavaStreamingContext(sc,new Duration(1000l));
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        kafkaParams.put("auto.offset.reset", "latest");

        kafkaParams.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,"0");       

        Collection<String> topics = Arrays.asList("bigdata" );

        JavaInputDStream<ConsumerRecord<String, String>> stream =
          KafkaUtils.createDirectStream(
                  ssc,
            LocationStrategies.PreferBrokers(),
            ConsumerStrategies.Subscribe(topics, kafkaParams)

          );


        stream.foreachRDD((rdd -> {
            System.out.println("new rdd "+rdd.partitions().size());
            rdd.foreach(record -> {


                ArrayList<String> list = new ArrayList<String>(Arrays.asList(record.value().split(",")));



                document.append("user", list.get(0))
                .append("activity", list.get(1))
                .append("timestamp", list.get(2))
                .append("x-acceleration", list.get(3))
                .append("y-accel", list.get(4))
                .append("z-accel", list.get(5).replace(";",""));


                collection.insertOne(document);
                document.clear();

            });
            }));



        ssc.start();
        ssc.awaitTermination();

        mongoClient.close();

Исключение:


Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
    at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:728)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
    at org.apache.spark.spark_streaming_kafka_0_10_2.App.lambda$main$74bb78aa$1(App.java:65)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    ... 2 more
2019-04-06 01:59:12 INFO  JobScheduler:54 - Stopped JobScheduler
73427 [Thread-1] INFO org.apache.spark.streaming.scheduler.JobScheduler  - Stopped JobScheduler
2019-04-06 01:59:12 INFO  ContextHandler:910 - Stopped o.s.j.s.ServletContextHandler@124d02b2{/streaming,null,UNAVAILABLE,@Spark}
73431 [Thread-1] INFO org.spark_project.jetty.server.handler.ContextHandler  - Stopped o.s.j.s.ServletContextHandler@124d02b2{/streaming,null,UNAVAILABLE,@Spark}

Я ожидаю, что все данные отправят push наМонго, но просто получи 310.000 данных из файла.

Ответы [ 2 ]

0 голосов
/ 06 апреля 2019

Я нашел ответ.Когда я читаю файл с кодом Java, он берет строки, которые имеют меньше параметра, чем 6, и возникает ошибка.Я исправил код, теперь он работает.

0 голосов
/ 06 апреля 2019

Вы используете kafkaParams.put("auto.offset.reset", "latest");, что означает, что вы читаете Spark с конца темы.

Если вы хотите, чтобы Spark прочитал все данные, которые в настоящее время находятся в созданной теме, вам нужно установить для этого значение "earliest"

Непонятно, что вы подразумеваете под Кафкой ".ранее "... Если процесс Kafka фактически останавливается, то проблема не в вашем Spark-коде


FWIW, нет необходимости использовать cat, поскольку Spark может читать и анализировать файлы CSVсам.Поэтому вообще не обязательно использовать Kafka, если у вас нет других потребителей

...