Pyspark: Dstream для выбора столбцов - PullRequest
0 голосов
/ 11 июня 2018

У меня есть Dstream от kafka, я хочу выбрать из него столбцы.Ниже приведен код, который я реализую, но он дает мне ошибки.

import os
import findspark
findspark.init('/home/gsingh/anaconda2/lib/python2.7/site-packages/pyspark/')
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaProducer
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
from pyspark import sql
import json
import requests


def result(y):
    return y


def handler(rdd):
    records = rdd.collect()

    for record in records:
        producer.send('opportunityProcessed', bytes(record))
        producer.flush()


if __name__ == '__main__':

    findspark.add_packages("org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0")
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    # os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0 pyspark-shell'
    sc = SparkContext(appName="PythonSparkStreamingKafka")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 1)
    sqlContext = sql.SQLContext(sc)

    print('ssc =================== {} {}')
    kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', "spark-streaming-opportunity", {'testing': 1})
    print('contexts =================== {} {}')

    lines = kafkaStream.map(lambda x: x[1])

    lines.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

Прямо сейчас он читает из kafka, а затем пишет в другую тему.Моя цель - выбрать столбцы из DStream и затем выполнить некоторые манипуляции с ним.Итак, моя проблема в том, как выбрать столбцы из DStream?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...