pyspark. sql .utils.AnalysisException: не удалось найти источник данных: kafka - PullRequest
0 голосов
/ 14 февраля 2020

Я пытаюсь прочитать поток из Кафки, используя pyspark. Я использую spark версии 3.0.0-preview2 и spark-streaming-kafka-0-10_2.12 Перед этим я просто stat zookeeper, kafka и создаю новую топи c :

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties 
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic data_wm

Это мой код:

import pandas as pd
import os
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestApp").getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data_wm") \
  .load() 
value = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 

Вот как я запускаю свой скрипт:

sudo --preserve-env = pyspark / usr / local / spark / bin / pyspark --packages org. apache .spark: spark-streaming-kafka-0-10_2.12: 3.0.0-preview

Как результат для этой команды У меня есть это:

: resolving dependencies :: org.apache.spark#spark-submit-parent-0d7b2a8d-a860-4766-a4c7-141a902d8365;1.0
        confs: [default]
        found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.kafka#kafka-clients;2.3.1 in central
        found com.github.luben#zstd-jni;1.4.3-1 in central
        found org.lz4#lz4-java;1.6.0 in central
        found org.xerial.snappy#snappy-java;1.1.7.3 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 380ms :: artifacts dl 7ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.3-1 from central in [default]
        org.apache.kafka#kafka-clients;2.3.1 from central in [default]
        org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.lz4#lz4-java;1.6.0 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]

Но у меня всегда есть эта ошибка:

d> f = spark \ ... .readStream \ ... .format ("kafka") \. ..

.option ("kafka. bootstrap .servers", "localhost: 9092") \ ...
.option ("subscribe", "data_wm") \. .. .load () Traceback (последний вызов был последним): Файл "", строка 5, в файле "/usr/local/spark/python/pyspark/sql/streaming.py", строка 406, при загрузке возвращает self. _df (self._jreader.load ()) Файл "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", строка 1286, в вызов Файл " /usr/local/spark/python/pyspark/sql/utils.py ", строка 102, в формате deco повысить конвертированный pyspark. sql .utils.AnalysisException: не удалось найти источник данных: kafka. Пожалуйста, разверните приложение в соответствии с разделом развертывания «Структурированная потоковая передача + Руководство по интеграции Kafka ».;

Я не знаю причину этой ошибки, пожалуйста, помогите

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...