pyspark потоковое DStreams в тему кафки - PullRequest
0 голосов
/ 08 июня 2018

Как это просто, возможно ли Stream Dtream в тему Kafka?

У меня есть потоковое задание Spark, которое выполняет всю обработку данных, теперь я хочу перенести данные в тему Kafka.Возможно ли это сделать в pyspark?

Ответы [ 2 ]

0 голосов
/ 18 января 2019

Если ваше сообщение в формате AVRO, мы можем serazlie сообщений и писать в kafka напрямую.

from pyspark import SparkConf, SparkContext
from kafka import KafkaProducer
from kafka.errors import KafkaError
from pyspark.sql import SQLContext, SparkSession

    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    import json
    from kafka import SimpleProducer, KafkaClient
    from kafka import KafkaProducer
    from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
    import avro.schema
    from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
    import pandas as pd


    ssc = StreamingContext(sc, 2)
    ssc = StreamingContext(sc, 2)
    topic = "test"
    brokers = "localhost:9092"
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)
    def handler(message):
        records = message.collect()
        for record in records:
             <Data processing whatever you want and creating the var_val_value,var_val_key pair >


               var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url} 
               avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
               avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
               avroProducer.flush()
0 голосов
/ 11 июня 2018

лучше преобразовать в json перед записью в kafka, в противном случае укажите столбцы ключа и значения, которые записываются в kafka.

    query = jdf.selectExpr("to_json(struct(*)) AS value")\
  .writeStream\
  .format("kafka")\
  .option("zookeeper.connect", "localhost:2181")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("topic", "test-spark")\
  .option("checkpointLocation", "/root/")\
  .outputMode("append")\
  .start()
...