Как мне написать в Кафку, используя pyspark? - PullRequest
0 голосов
/ 07 мая 2018

Я пытаюсь написать Кафке, используя PySpark.
Я застрял на нулевой стадии:

[Stage 0:>                                                          (0 + 8) / 9]

Тогда я получаю сообщение об ошибке:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Код:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
 org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

def main():
    spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
     .getOrCreate();

    dirpath =  os.path.abspath(sys.argv[1])
    os.chdir(dirpath)

    mySchema = StructType([
     StructField("id", IntegerType()),StructField("name", StringType()),\
     StructField("year", IntegerType()),StructField("rating", DoubleType()),\
     StructField("duration", IntegerType())   ])
    streamingDataFrame = spark.readStream.schema(mySchema)
     .csv('file://' + dirpath + "/" )

    streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
     "to_json(struct(*)) AS value").\
      writeStream.format("kafka").option("topic", "topicName")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("checkpointLocation", "./chkpt").start()

Я использую HDP 2.6.

1 Ответ

0 голосов
/ 09 мая 2018

Как я уже упоминал в комментариях, Spark работает на нескольких машинах, и очень маловероятно, что все эти машины будут брокерами Kafka.

Использовать внешние адреса для кластера Kafka

.option("kafka.bootstrap.servers", "<kafka-broker-1>:9092,<kafka-broker-2>:9092")\  
...