KafkaUtils.createStream прекращает сбор данных через некоторое время - PullRequest
0 голосов
/ 29 августа 2018

Я собрал клиента Kafka, который извлекает данные из записи Kafka в Elasticsearch, программа работает, как ожидается, в течение дня или двух, а затем Spark прекращает сбор данных. Журналы Kafka генерируются, и поток искры работает, но данные не собираются. Ниже используется код:

# For Spark
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext

# For Kafka
from pyspark.streaming.kafka import KafkaUtils

# Name of Spark App
conf = SparkConf().setAppName("test_topic")

# Spark and Spark streaming configuration
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)

# Kafka Enpoints
zkQuorum = '192.0.23.1:2181'
topic = 'test_topic'


# Elastic Search write endpoint
es_write_conf = {
    "es.nodes" : "192.000.0.1",
    "es.port" : "9200",
    "es.resource" : "test_index/test_type",
    "es.input.json": "true",
    "es.nodes.ingest.only": "true"
}


# Create a kafka Stream
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "cyd-pcs-bro-streaming-consumer", {topic: 1})

# Print stream to console
kafkaStream_json = kafkaStream.map(lambda x: x[1])
kafkaStream_json.pprint()

#Write Stream to ElasticSearch
kafkaStream.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)
)


# Start the stream and keep it running unless terminated
ssc.start()
ssc.awaitTermination()
  1. Есть ли что-то еще, что мой код должен делать, или способ углубиться в проблему (журналы ничего не указывают)?
  2. Кроме того, если у меня все в порядке, чтобы иметь одно приложение Spark для каждой темы, есть ли другая причина, по которой я бы хотел использовать KafkaUtils.createDirectStream. Так как я не хочу иметь накладные расходы на управление смещениями.

Используемый язык: Pyspark

Выполнение кода:

sudo $SPARK_HOME/spark-submit --master local[2] --jars /home/user/jars/elasticsearch-hadoop-6.3.2.jar,/home/user/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/user/code/test_stream.py

Это вывод потока, когда данные не собираются:

-------------------------------------------
Time: 2018-08-29 12:23:46
-------------------------------------------

18/08/29 12:23:46 INFO JobScheduler: Finished job streaming job 1535525626000 ms.0 from job set of time 1535525626000 ms
18/08/29 12:23:46 INFO JobScheduler: Total delay: 0.030 s for time 1535525626000 ms (execution: 0.007 s)
18/08/29 12:23:46 INFO PythonRDD: Removing RDD 115 from persistence list
18/08/29 12:23:46 INFO BlockManager: Removing RDD 115
18/08/29 12:23:46 INFO BlockRDD: Removing RDD 114 from persistence list
18/08/29 12:23:46 INFO BlockManager: Removing RDD 114
18/08/29 12:23:46 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[114] at createStream at NativeMethodAccessorImpl.java:0 of time 1535525626000 ms
18/08/29 12:23:46 INFO ReceivedBlockTracker: Deleting batches: 1535525624000 ms
18/08/29 12:23:46 INFO InputInfoTracker: remove old batch metadata: 1535525624000 ms
18/08/29 12:23:47 INFO JobScheduler: Added jobs for time 1535525627000 ms
18/08/29 12:23:47 INFO JobScheduler: Starting job streaming job 1535525627000 ms.0 from job set of time 1535525627000 ms
-------------------------------------------
Time: 2018-08-29 12:23:47
-------------------------------------------

18/08/29 12:23:47 INFO JobScheduler: Finished job streaming job 1535525627000 ms.0 from job set of time 1535525627000 ms
18/08/29 12:23:47 INFO JobScheduler: Total delay: 0.025 s for time 1535525627000 ms (execution: 0.005 s)
18/08/29 12:23:47 INFO PythonRDD: Removing RDD 117 from persistence list
18/08/29 12:23:47 INFO BlockRDD: Removing RDD 116 from persistence list
18/08/29 12:23:47 INFO BlockManager: Removing RDD 117
18/08/29 12:23:47 INFO BlockManager: Removing RDD 116
18/08/29 12:23:47 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[116] at createStream at NativeMethodAccessorImpl.java:0 of time 1535525627000 ms
18/08/29 12:23:47 INFO ReceivedBlockTracker: Deleting batches: 1535525625000 ms
18/08/29 12:23:47 INFO InputInfoTracker: remove old batch metadata: 1535525625000 ms
18/08/29 12:23:48 INFO JobScheduler: Added jobs for time 1535525628000 ms
18/08/29 12:23:48 INFO JobScheduler: Starting job streaming job 1535525628000 ms.0 from job set of time 1535525628000 ms
...