Как загрузить данные Kafka topi c в Spark Dstream в Python - PullRequest
0 голосов
/ 06 августа 2020

Я использую Spark 3.0.0 с Python. У меня есть test_topic в Kafka, который создается из csv.

Приведенный ниже код потребляет из этого topi c в Spark, но я где-то читал, что он должен быть в DStream, прежде чем я смогу сделайте на нем какой-либо ML.

import json
from json import loads
from kafka import KafkaConsumer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "test")
ssc = StreamingContext(sc, 1)

consumer = KafkaConsumer('test_topic',
                    bootstrap_servers =['localhost:9092'],
                    api_version=(0, 10))

Потребитель возвращает <kafka.consumer.group.KafkaConsumer at 0x13bf55b0>

Как мне отредактировать приведенный выше код, чтобы получить DStream?

Я новичок Пожалуйста, укажите на любые допущенные глупые ошибки.

РЕДАКТИРОВАТЬ: Ниже мой код производителя:

import json
import csv
from json import dumps
from kafka import KafkaProducer
from time import sleep

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
value_serializer=lambda x:dumps(x)

with open('test_data.csv') as file:
reader = csv.DictReader(file, delimiter=';')
for row in reader:
    producer.send('test_topic', json.dumps(row).encode('utf=8'))
    sleep(2)
    print ('Message sent ', row)

Ответы [ 2 ]

1 голос
/ 06 августа 2020

Давно я не делал немного Spark, но позвольте мне помочь вам!

Во-первых, поскольку вы используете Spark 3.0.0, вы можете использовать Spark Structured Streaming, API будет намного проще в использовании, поскольку он основан на фреймах данных. Как вы можете видеть здесь, в ссылке на документы , есть руководство по интеграции для kafka с PySpark в режиме структурированной потоковой передачи.

Это было бы так же просто, как этот запрос:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Затем вы можете поиграть с этим фреймом данных, используя конвейеры ML, чтобы применить некоторые методы и модели ML, которые вам нужны. Как вы можете видеть в этой записной книжке DataBricks , у них есть несколько примеров структурированной потоковой передачи с помощью машинного обучения. Это написано в формате Scala, но это будет хорошим источником вдохновения. Вы можете объединить его с ML PySpark docs , чтобы перевести его в Python

РЕДАКТИРОВАТЬ: Фактические ШАГИ, которым нужно следовать, чтобы заставить его работать между PySpark и Kafka

1 - Настройка Kafka

Итак, сначала я настраиваю свою локальную Kafka:

wget https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
tar -xzf kafka_2.11-0.10.2.0.tgz

Я открываю 4 оболочки, чтобы запустить zookeeper / server / create_topi c / write_topi c скрипты:

  • Zookeeper
cd kafka_2.11-0.10.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Сервер
cd kafka_2.11-0.10.2.0
bin/kafka-server-start.sh config/server.properties
  • Создать топи c и проверить создание
cd kafka_2.11-0.10.2.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
  • Тестовое сообщение в топах c (напишите их интерактивно в оболочке для целей тестирования):
cd kafka_2.11-0.10.2.0
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2 - Настройка PySpark

Получение дополнительных jar-файлов

Теперь, когда мы настроили нашу Kafka, мы настроим наш PySpark с указанием c загрузок jar-файлов:

  • spark -streaming-kafka-0-10-assembly_2.12-3.0.0.jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
  • spark- sql -kafka-0-10_2.12-3.0.0 .jar
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar
  • co mmons-pool2-2.8.0.jar
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar
  • kafka-clients-0.10.2.2.jar
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.2/kafka-clients-0.10.2.2.jar

Запустите команду оболочки PySpark

Не забудьте указать путь к папке для каждого jar-файла, если вы не находитесь в папке jars при выполнении команды pyspark.

PYSPARK_PYTHON=python3 $SPARK_HOME/bin/pyspark --jars spark-sql-kafka-0-10_2.12-3.0.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,kafka-clients-0.10.2.2.jar,commons-pool2-2.8.0.jar

3 - Запустите код PySpark

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

query = df \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .start()

Ура

0 голосов
/ 06 августа 2020

Вам необходимо использовать метод KafkaUtils createDirectStream.

Вот пример кода из официальной документации Spark :

from pyspark.streaming.kafka import KafkaUtils
 directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
...