Pyspark: spark-submit не работает как CLI - PullRequest
0 голосов
/ 22 мая 2018

У меня есть pyspark для загрузки данных из файла TSV и сохранения его в виде файла паркета, а также сохранения в виде постоянной таблицы SQL.

Когда я запускаю его построчно через CLI pyspark, он работает точнокак и ожидалось.Когда я запускаю его как приложение с использованием spark-submit, оно запускается без каких-либо ошибок, но я получаю странные результаты: 1. данные перезаписываются, а не добавляются.2. Когда я запускаю SQL-запросы к нему, я не получаю никаких данных, даже если размер файлов паркетных файлов составляет несколько гигабайт (что я и ожидаю).Есть предложения?

Код:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
                StructField('subscriberId', StringType(), True),
                StructField('sourceIPv4Address', StringType(), True),
                StructField('destinationIPv4Address', StringType(), True),
                StructField('service',StringType(), True),
                StructField('baseService',StringType(), True),
                StructField('serverHostname', StringType(), True),
                StructField('rat', StringType(), True),
                StructField('userAgent', StringType(), True),
                StructField('accessPoint', StringType(), True),
                StructField('station', StringType(), True),
                StructField('device', StringType(), True),
                StructField('contentCategories', StringType(), True),
                StructField('incomingOctets', LongType(), True),
                StructField('outgoingOctets', LongType(), True),
                StructField('incomingShapingDrops', IntegerType(), True),
                StructField('outgoingShapingDrops', IntegerType(), True),
                StructField('qoeIncomingInternal', DoubleType(), True),
                StructField('qoeIncomingExternal', DoubleType(), True),
                StructField('qoeOutgoingInternal', DoubleType(), True),
                StructField('qoeOutgoingExternal', DoubleType(), True),
                StructField('incomingShapingLatency', DoubleType(), True),
                StructField('outgoingShapingLatency', DoubleType(), True),
                StructField('internalRtt', DoubleType(), True),
                StructField('externalRtt', DoubleType(), True),
                StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)

1 Ответ

0 голосов
/ 30 мая 2018

Как @ user8371915 предположил, что это похоже на это:

Spark может получить доступ к таблице Hive из pyspark, но не из spark-submit

Мне нужно было заменить

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

с

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

Это решило эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...