Здесь есть несколько проблем.
Как говорит исключение, orignalData
- это потоковый запрос ( потоковый набор данных ), и единственный способ выполнить его - использовать * 1006. *. Это одна проблема.
Вы сделали writeStream.start()
, но с другим запросом finalData
, который не потоковый, а пакетный. Это еще одна проблема.
В таких «обогащенных» случаях, как ваш, вы можете использовать потоковое соединение (оператор Dataset.join
) или один из DataStreamWriter.foreach и DataStreamWriter.foreachBatch . Я думаю, что DataStreamWriter.foreachBatch
будет более эффективным.
public DataStreamWriter foreachBatch (VoidFunction2function)
(специфично для Java) Задает выходные данные потокового запроса, которые будут обработаны с использованием предоставленной функции. Это поддерживается только в режимах микропакета (то есть, когда триггер не является непрерывным). В каждой микропартии предоставленная функция будет вызываться в каждой микропартии с (i) выходными строками в виде набора данных и (ii) идентификатором партии. BatchId может использоваться для дедупликации и транзакционной записи выходных данных (то есть предоставленного набора данных) во внешние системы. Выходной набор данных гарантированно совпадает для одного и того же batchId (при условии, что все операции являются детерминированными в запросе).
Мало того, что вы получите все данные потокового микропакета за один раз (первый входной аргумент типа Dataset<T>
), но также и способ отправки другого задания Spark (по исполнителям) на основе данных.
Псевдокод может выглядеть следующим образом (я использую Scala какМне больше нравится язык):
val dsWriter = originalData.foreachBatch { case (data, batchId) =>
// make sure the data is small enough to collect on the driver
// Otherwise expect OOME
// It'd also be nice to have a Java bean to convert the rows to proper types and names
val localData = data.collect
// Please note that localData is no longer Spark's Dataset
// It's a local Java collection
// Use Java Collection API to work with the localData
// e.g. using Scala
// You're mapping over localData (for a single micro-batch)
// And creating finalData
// I'm using the same names as your code to be as close to your initial idea as possible
val finalData = localData.map { row =>
// row is the old row from your original code
// do something with it
// e.g. using Java
String sourceKey=row.get(4).toString();
...
}
// Time to save the data processed to ES
// finalData is a local Java/Scala collection not Spark's DataFrame!
// Let's convert it to a DataFrame (and leverage the Spark distributed platform)
// Note that I'm almost using your code, but it's a batch query not a streaming one
// We're inside foreachBatch
finalData
.toDF // Convert a local collection to a Spark DataFrame
.write // this creates a batch query
.format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "id")
.option("es.write.operation", "upsert")
.option("checkpointLocation","/tmp/checkpoint/sg_event")
.save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query
}
dsWriter
- это DataStreamWriter
, и теперь вы можете запустить его для запуска потокового запроса.