Как передать потоковые данные из искры в Mongodb? - PullRequest
0 голосов
/ 04 июня 2018

Я использую pyspark для чтения потоковых данных из Kafka, а затем хочу передать эти данные в mongodb.Я включил все необходимые пакеты, но выдает ошибку, что
UnsupportedOperationException: источник данных com.mongodb.spark.sql.DefaultSource не поддерживает потоковую запись

Следующие ссылкине относятся к моему вопросу

Запись в mongoDB из Spark

Spark в MongoDB через Mesos

Вот полныйтрассировка стека ошибок

Трассировка (последний последний вызов): файл "/home/b3ds/kafka-spark.py", строка 85, в .option ("com.mongodb.spark.sql.DefaultSource "," mongodb: // localhost: 27017 / twitter.test ") \ File" /home/b3ds/hdp/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py ", строка 827, встартовый файл "/home/b3ds/hdp/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", строка 1133, в вызов файл "/ home / b3ds/hdp/spark/python/lib/pyspark.zip/pyspark/sql/utils.py ", строка 63, в файле deco" /home/b3ds/hdp/spark/python/lib/py4j-0.10.4-src.zip / py4j / protocol.py ", строка 319, в get_return_value py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o122.start.: java.lang.UnsupportedOperationException: источник данных com.mongodb.spark.sql.DefaultSource не поддерживает потоковую запись в org.apache.spark.sql.execution.datasources.DataSource.createSink (DataSource.scala: 287) в org.apache.spark.sql..invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.inkeEReflectionEngine.java:357) в py4j.Gateway.invoke (Gateway.java:280) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.commands.CallCommand.execute (CallCommand.java:79) вpy4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run (Thread.java:748)

Вот мой код pyspark

from __future__ import print_function
import sys
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType
from pyspark.sql.types import *
import json
from pyspark.sql.functions import struct
from pyspark.sql.functions import *
import datetime

json_schema = StructType([
  StructField("twitterid", StringType(), True),
  StructField("created_at", StringType(), True),
  StructField("tweet", StringType(), True),
  StructField("screen_name", StringType(), True)
])

def parse_json(df):
    twitterid   = json.loads(df[0])['id']
    created_at  = json.loads(df[0])['created_at']
    tweet       = json.loads(df[0])['text']
    tweet       = json.loads(df[0])['text']
    screen_name = json.loads(df[0])['user']['screen_name']
    return [twitterid, created_at, tweet, screen_name]

def convert_twitter_date(timestamp_str):
    output_ts = datetime.datetime.strptime(timestamp_str.replace('+0000 ',''), '%a %b %d %H:%M:%S %Y')
    return output_ts

if __name__ == "__main__":

        spark = SparkSession\
                        .builder\
                        .appName("StructuredNetworkWordCount")\
                        .config("spark.mongodb.input.uri","mongodb://192.168.1.16:27017/twitter.test")\
                        .config("spark.mongodb.output.uri","mongodb://192.168.1.16:27017/twitter.test")\
                        .getOrCreate()
        events = spark\
                        .readStream\
                        .format("kafka")\
                        .option("kafka.bootstrap.servers", "localhost:9092")\
                        .option("subscribe", "twitter")\
                        .load()
        events = events.selectExpr("CAST(value as String)")

        udf_parse_json = udf(parse_json , json_schema)
        udf_convert_twitter_date = udf(convert_twitter_date, TimestampType())
        jsonoutput = events.withColumn("parsed_field", udf_parse_json(struct([events[x] for x in events.columns]))) \
                                        .where(col("parsed_field").isNotNull()) \
                                        .withColumn("created_at", col("parsed_field.created_at")) \
                                        .withColumn("screen_name", col("parsed_field.screen_name")) \
                                        .withColumn("tweet", col("parsed_field.tweet")) \
                                        .withColumn("created_at_ts", udf_convert_twitter_date(col("parsed_field.created_at")))

        windowedCounts = jsonoutput.groupBy(window(jsonoutput.created_at_ts, "1 minutes", "15 seconds"),jsonoutput.screen_name)$

        mongooutput = jsonoutput \
                        .writeStream \
                        .format("com.mongodb.spark.sql.DefaultSource")\
                        .option("com.mongodb.spark.sql.DefaultSource","mongodb://localhost:27017/twitter.test")\
                        .start()
        mongooutput.awaitTermination()

Я видел документацию mongodb, в которой говорится, что он поддерживает спарк к сливу монго

https://docs.mongodb.com/spark-connector/master/scala/streaming/

1 Ответ

0 голосов
/ 04 июня 2018

Я видел документацию mongodb, в которой говорится, что он поддерживает приемник от искры к монго

Что касается документации, так это то, что вы можете использовать стандартный RDD API для записи каждого RDD с использованием устаревшей потоковой передачи.(DStream) API.

Он не предполагает, что MongoDB поддерживает структурированную потоковую передачу, и не поддерживает.Поскольку вы используете PySpark, где forEach writer недоступен, вам придется подождать, пока (если вообще когда-либо) пакет MongoDB не будет обновлен для поддержки потоковых операций.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...