Искровой поток не может использовать искра SQL - PullRequest
0 голосов
/ 20 декабря 2018

У меня возникла проблема во время потоковой передачи искры.Я получаю пустые записи после того, как они передаются в потоковом режиме и передаются методу parse.

Мой код:

import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoders
import org.apache.spark.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, 
IntegerType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, 
IntegerType}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql._

val conf = new SparkConf().setAppName("streamHive").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true")

val ssc = new StreamingContext(conf, Seconds(5))    

val sc=ssc.sparkContext

val lines = ssc.textFileStream("file:///home/sadr/testHive")

case class Prices(name: String, age: String,sex: String, location: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

def parse (rdd : org.apache.spark.rdd.RDD[String] ) = 
{
var l = rdd.map(_.split(","))
val prices = l.map(p => Prices(p(0),p(1),p(2),p(3)))
val pricesDf = sqlContext.createDataFrame(prices)
pricesDf.registerTempTable("prices")
pricesDf.show()
var x = sqlContext.sql("select count(*) from prices")
x.show()}
lines.foreachRDD { rdd => parse(rdd)} 
lines.print()
ssc.start()

Мой входной файл:

cat test1.csv

Riaz,32,M,uk
tony,23,M,india
manu,33,M,china
imart,34,F,AUS

Iполучаю этот вывод:

lines.foreachRDD { rdd => parse(rdd)}

lines.print()

ssc.start()

scala> +----+---+---+--------+
|name|age|sex|location|
+----+---+---+--------+
+----+---+---+--------+

Я использую Spark версии 2.3 .... Я ПОЛУЧАЮ СЛЕДУЮЩУЮ ОШИБКУ ПОСЛЕ ДОБАВЛЕНИЯ X.SHOW ()

I AM GETTING FOLLOWING ERROR AFTER ADDING X.SHOW()

1 Ответ

0 голосов
/ 20 декабря 2018

Не уверен, что вы действительно можете читать потоки.

textFileStream читает только новые файлы, добавленные в каталог после запуска программы, а не существующие.Был ли файл уже там?Если да, удалите его из каталога, запустите программу и снова скопируйте файл?

...