Возникло исключение: pyspark.sql.utils.AnalysisException 'Запросы с потоковыми источниками должны выполняться с writeStream.start () ;; \ nkafka' - PullRequest
0 голосов
/ 01 февраля 2019

в коде, если не df.head (1) .isEmpty: я получил исключение,

Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

Я не знаю, как использовать при потоковой передаче данных.когда я использую jupyter, чтобы выполнить каждую строку, код в порядке, и я могу получить свой результат.но использовать .py это не хорошо.

моя цель заключается в следующем: я хочу использовать потоковую передачу, чтобы получать данные от kafka каждую секунду, затем я преобразую каждую партию данных обработки паром (одна партия означает данные за одну секунду, которую я получаю) в pandas dataframe, а затем я используюФункция панды, чтобы сделать что-то с данными, наконец, я отправляю результат в другую тему Кафки.

Пожалуйста, помогите мне и простите мой пул на английском, Большое спасибо.

sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data") \
  .load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

if not df.head(1).isEmpty:
  alertQuery = ds \
          .writeStream \
          .queryName("qalerts")\
          .format("memory")\
          .start()

  alerts = spark.sql("select * from qalerts")
  pdAlerts = alerts.toPandas()
  a = pdAlerts['value'].tolist()

  d = []
  for i in a:
      x = json.loads(i)
      d.append(x)

  df = pd.DataFrame(d)
  print(df)
  ds = df['jobID'].unique().tolist()


  dics = {}
  for source in ds:
      ids = df.loc[df['jobID'] == source, 'id'].tolist()
      dics[source]=ids

  print(dics)  
query = ds \
  .writeStream \
  .queryName("tableName") \
  .format("console") \
  .start()

query.awaitTermination()

1 Ответ

0 голосов
/ 02 февраля 2019

Удалите if not df.head(1).isEmpty: и все будет в порядке.

Причина исключения проста, т. Е. Потоковый запрос - это структурированный запрос, который никогда не заканчивается и выполняется постоянно.Просто невозможно посмотреть на один элемент, так как нет «единого элемента», но (возможно) тысяч элементов, и было бы трудно сказать, когда именно вы хотите заглянуть под обложки и увидеть толькоодин элемент.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...