Кафка создает поток, работает, но не печатает обработанный вывод из Кафки Топи c в Pyspark - PullRequest
0 голосов
/ 22 января 2020

Я использую Kafka версии 2.0, Spark версия 2.2.0.2.6.4.0-91, Python версия 2.7.5 Я запускаю этот код ниже, и он воспроизводится без ошибок, но счетчик не печатает в выходные данные.

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os

if __name__ == "__main__":

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 60)
    print("spark cotext set")

    zkQuorum, topic = 'master.hdp:2181','streamit'
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
    print("connection set")
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Моя команда отправки Spark:

/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client --packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kstream.py

Последняя часть моего выходного журнала показывает. Он получает поток, но не показывает желаемый обработанный вывод.

-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------

20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.0 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Starting job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Registering RDD 7 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:29:00 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 4 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.1 KB, free 366.2 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.2 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 4.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 71, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 172.16.0.21:51120
20/01/22 19:29:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 83 bytes
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 71) in 473 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:455) finished in 0.476 s
20/01/22 19:29:00 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:455, took 0.497775 s
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 6 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 6.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 72, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 72) in 123 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:455) finished in 0.125 s
20/01/22 19:29:00 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:455, took 0.136936 s
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------

20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Total delay: 0.811 s for time 1579701540000 ms (execution: 0.684 s)
20/01/22 19:29:00 INFO ReceivedBlockTracker: Deleting batches:
20/01/22 19:29:00 INFO InputInfoTracker: remove old batch metadata:
20/01/22 19:30:00 INFO JobScheduler: Added jobs for time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
-------------------------------------------
Time: 2020-01-22 19:30:00
-------------------------------------------

20/01/22 19:30:00 INFO JobScheduler: Finished job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.1 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Registering RDD 16 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:30:00 INFO DAGScheduler: Got job 4 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 8 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 8.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 73, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to 172.16.0.21:51120
20/01/22 19:30:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 83 bytes
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 73) in 120 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 8 (runJob at PythonRDD.scala:455) finished in 0.121 s
20/01/22 19:30:00 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:455, took 0.134627 s
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Got job 5 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 10 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 9)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 10.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 74, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 74) in 132 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 10 (runJob at PythonRDD.scala:455) finished in 0.133 s
20/01/22 19:30:00 INFO DAGScheduler: Job 5 finished: runJob at PythonRDD.scala:455, took 0.143611 s

Ответы [ 2 ]

1 голос
/ 29 февраля 2020

Дайте key.deserializer и value.deserializer в kafkaParams и используйте createDirectStream

kafkaParams = {"metadata.broker.list": config['kstream']['broker'], "auto_offset_reset" : 'earliest',"auto.create.topics.enable": "true","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"}
kvs = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
0 голосов
/ 22 января 2020

Похоже, вы не используете регистратор, а вместо этого print() переходите к стандартному выводу. Для того, чтобы войти в ваши журналы, вам нужно настроить регистратор. Например, следующее получает логгер от SparkContext:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("Example info")
LOGGER.error("Example error")

...
...