Невозможно просмотреть потребительский вывод kafka при выполнении в ECLIPSE: PySpark - PullRequest
0 голосов
/ 07 октября 2018

Я установил kafka и zookeeper в системе Windows.я запустил серверы kafka и zookeeper, создал тему "javainuse-topic", запустил производителя и потребителя с помощью следующих команд

. \ bin \ windows \ zookeeper-server-start.bat. \ config \zookeeper.properties

. \ bin \ windows \ kafka-server-start.bat. \ config \ server.properties

. \ bin \ windows \ kafka-themes.bat --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic javainuse-topic

. \ bin \ windows \ kafka-console -roduction.bat --broker-list localhost: 9092 -topic javainuse-topic

. \ bin \ windows \ kafka-console-consumer.bat --bootstrap-server localhost: 9092 --topic javainuse-topic --from-begin

Я могу успешно передавать данные от производителя к потребителю.Итак, я написал ниже код в Eclipse и попытался выполнить его в локальном.но я не могу просмотреть данные о потребителях в моей консоли Eclipse.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


n_secs = 1
topic = "javainuse-topic"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'javainuse-topic', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
time.sleep(6) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

1 Ответ

0 голосов
/ 07 октября 2018

Вы можете попробовать еще раз, но на этот раз установите auto.offset.reset в 'earliest' (или 'smallest', если вы используете старого потребителя).

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'javainuse-topic', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'earliest'})
                        # Group ID is completely arbitrary
...