Потеря данных при записи Spark Streaming в Кассандре - PullRequest
0 голосов
/ 07 ноября 2019

Я записываю данные на кассандру и в текстовый файл. Через несколько минут я прекращаю процесс. Затем, например, у меня есть 82035 строк в Кассандре, и у меня есть 96749 строк в файле.

Я нашел, по-видимому, действительные данные в текстовом файле, которого нет в базе данных. Например:

promerar|082220.80|4158.5985417|00506.7613786
MOLIIUO|082220|4107.4749|00444.2117
josehrin|082220|4159.1124|00455.1298

Это код:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
count =0

def saveToCassandra(rows):
    if not rows.isEmpty():
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="puntos", keyspace="test_rtk")\
        .save()


def saveCoord(rdd):
    rdd.foreach(lambda x: open("/tmp/p1", "a").write(x[0]+"|"+x[2]+"|"+x[3]+"|"+x[5]+"\n"))


ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 2})
data = kvs.map(lambda x: x[1].split(","))
rows= data.map(lambda x:Row(long=x[5],lat=x[3],date=time.strftime("%Y-%m-%d"),time=x[2],user=x[0]))
rows.foreachRDD(saveToCassandra)
data.foreachRDD(saveCoord)

ssc.start()

Это создание таблицы в Кассандре:

CREATE KEYSPACE test_rtk WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_rtk.puntos(
 long text,
 lat text,
 time text,
 date text,
 user TEXT,
PRIMARY KEY (time,long,lat)
);

Можете ли вы помочь моей?

1 Ответ

0 голосов
/ 07 ноября 2019

Проверьте здесь, чтобы найти лучшие решения

Руководство по программированию Spark Streaming

...