Я собрал клиента Kafka, который извлекает данные из записи Kafka в Elasticsearch, программа работает, как ожидается, в течение дня или двух, а затем Spark прекращает сбор данных. Журналы Kafka генерируются, и поток искры работает, но данные не собираются. Ниже используется код:
# For Spark
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
# For Kafka
from pyspark.streaming.kafka import KafkaUtils
# Name of Spark App
conf = SparkConf().setAppName("test_topic")
# Spark and Spark streaming configuration
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
# Kafka Enpoints
zkQuorum = '192.0.23.1:2181'
topic = 'test_topic'
# Elastic Search write endpoint
es_write_conf = {
"es.nodes" : "192.000.0.1",
"es.port" : "9200",
"es.resource" : "test_index/test_type",
"es.input.json": "true",
"es.nodes.ingest.only": "true"
}
# Create a kafka Stream
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "cyd-pcs-bro-streaming-consumer", {topic: 1})
# Print stream to console
kafkaStream_json = kafkaStream.map(lambda x: x[1])
kafkaStream_json.pprint()
#Write Stream to ElasticSearch
kafkaStream.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
)
# Start the stream and keep it running unless terminated
ssc.start()
ssc.awaitTermination()
- Есть ли что-то еще, что мой код должен делать, или способ углубиться в проблему (журналы ничего не указывают)?
- Кроме того, если у меня все в порядке, чтобы иметь одно приложение Spark для каждой темы, есть ли другая причина, по которой я бы хотел использовать
KafkaUtils.createDirectStream
. Так как я не хочу иметь накладные расходы на управление смещениями.
Используемый язык: Pyspark
Выполнение кода:
sudo $SPARK_HOME/spark-submit --master local[2] --jars /home/user/jars/elasticsearch-hadoop-6.3.2.jar,/home/user/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/user/code/test_stream.py
Это вывод потока, когда данные не собираются:
-------------------------------------------
Time: 2018-08-29 12:23:46
-------------------------------------------
18/08/29 12:23:46 INFO JobScheduler: Finished job streaming job 1535525626000 ms.0 from job set of time 1535525626000 ms
18/08/29 12:23:46 INFO JobScheduler: Total delay: 0.030 s for time 1535525626000 ms (execution: 0.007 s)
18/08/29 12:23:46 INFO PythonRDD: Removing RDD 115 from persistence list
18/08/29 12:23:46 INFO BlockManager: Removing RDD 115
18/08/29 12:23:46 INFO BlockRDD: Removing RDD 114 from persistence list
18/08/29 12:23:46 INFO BlockManager: Removing RDD 114
18/08/29 12:23:46 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[114] at createStream at NativeMethodAccessorImpl.java:0 of time 1535525626000 ms
18/08/29 12:23:46 INFO ReceivedBlockTracker: Deleting batches: 1535525624000 ms
18/08/29 12:23:46 INFO InputInfoTracker: remove old batch metadata: 1535525624000 ms
18/08/29 12:23:47 INFO JobScheduler: Added jobs for time 1535525627000 ms
18/08/29 12:23:47 INFO JobScheduler: Starting job streaming job 1535525627000 ms.0 from job set of time 1535525627000 ms
-------------------------------------------
Time: 2018-08-29 12:23:47
-------------------------------------------
18/08/29 12:23:47 INFO JobScheduler: Finished job streaming job 1535525627000 ms.0 from job set of time 1535525627000 ms
18/08/29 12:23:47 INFO JobScheduler: Total delay: 0.025 s for time 1535525627000 ms (execution: 0.005 s)
18/08/29 12:23:47 INFO PythonRDD: Removing RDD 117 from persistence list
18/08/29 12:23:47 INFO BlockRDD: Removing RDD 116 from persistence list
18/08/29 12:23:47 INFO BlockManager: Removing RDD 117
18/08/29 12:23:47 INFO BlockManager: Removing RDD 116
18/08/29 12:23:47 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[116] at createStream at NativeMethodAccessorImpl.java:0 of time 1535525627000 ms
18/08/29 12:23:47 INFO ReceivedBlockTracker: Deleting batches: 1535525625000 ms
18/08/29 12:23:47 INFO InputInfoTracker: remove old batch metadata: 1535525625000 ms
18/08/29 12:23:48 INFO JobScheduler: Added jobs for time 1535525628000 ms
18/08/29 12:23:48 INFO JobScheduler: Starting job streaming job 1535525628000 ms.0 from job set of time 1535525628000 ms