Как обогатить данные потокового запроса и записать результат в Elasticsearch? - PullRequest
1 голос
/ 10 октября 2019

Для данного набора данных (originalData) мне необходимо отобразить значения, а затем подготовить новый набор данных, объединяющий результаты поиска изasticsearch.

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");
List<SGEvent> listSGEvent = new ArrayList<>();

originalData.foreach((ForeachFunction<Row>) row -> {
 SGEvent event = new SGEvent();
 String sourceKey=row.get(4).toString();
 String searchQuery = "select id from es_correlation where es_correlation.key='"+sourceKey+"'";
 Dataset<Row> result = spark.sqlContext().sql(searchQuery);
 String id = null;
 if (result != null) {
    result.show();
    id = result.first().toString();
  }
 event.setId(id);
 event.setKey(sourceKey);
 listSGEvent.add(event)
}
Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);
Dataset<Row> finalData = spark.createDataset(listSGEvent, eventEncoderSG).toDF();

finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();

Spark создает следующее исключение:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)

Допустим ли мой подход к объединению значения эластичного поиска с набором данных? Есть ли другое лучшее решение для этого?

Ответы [ 2 ]

1 голос
/ 14 октября 2019

Здесь есть несколько проблем.

Как говорит исключение, orignalData - это потоковый запрос ( потоковый набор данных ), и единственный способ выполнить его - использовать * 1006. *. Это одна проблема.

Вы сделали writeStream.start(), но с другим запросом finalData, который не потоковый, а пакетный. Это еще одна проблема.

В таких «обогащенных» случаях, как ваш, вы можете использовать потоковое соединение (оператор Dataset.join) или один из DataStreamWriter.foreach и DataStreamWriter.foreachBatch . Я думаю, что DataStreamWriter.foreachBatch будет более эффективным.

public DataStreamWriter foreachBatch (VoidFunction2function)

(специфично для Java) Задает выходные данные потокового запроса, которые будут обработаны с использованием предоставленной функции. Это поддерживается только в режимах микропакета (то есть, когда триггер не является непрерывным). В каждой микропартии предоставленная функция будет вызываться в каждой микропартии с (i) выходными строками в виде набора данных и (ii) идентификатором партии. BatchId может использоваться для дедупликации и транзакционной записи выходных данных (то есть предоставленного набора данных) во внешние системы. Выходной набор данных гарантированно совпадает для одного и того же batchId (при условии, что все операции являются детерминированными в запросе).

Мало того, что вы получите все данные потокового микропакета за один раз (первый входной аргумент типа Dataset<T>), но также и способ отправки другого задания Spark (по исполнителям) на основе данных.

Псевдокод может выглядеть следующим образом (я использую Scala какМне больше нравится язык):

val dsWriter = originalData.foreachBatch { case (data, batchId) =>
  // make sure the data is small enough to collect on the driver
  // Otherwise expect OOME
  // It'd also be nice to have a Java bean to convert the rows to proper types and names
  val localData = data.collect

  // Please note that localData is no longer Spark's Dataset
  // It's a local Java collection

  // Use Java Collection API to work with the localData
  // e.g. using Scala
  // You're mapping over localData (for a single micro-batch)
  // And creating finalData
  // I'm using the same names as your code to be as close to your initial idea as possible
  val finalData = localData.map { row =>
    // row is the old row from your original code
    // do something with it
    // e.g. using Java
    String sourceKey=row.get(4).toString();
    ...
  }

  // Time to save the data processed to ES
  // finalData is a local Java/Scala collection not Spark's DataFrame!
  // Let's convert it to a DataFrame (and leverage the Spark distributed platform)

  // Note that I'm almost using your code, but it's a batch query not a streaming one
  // We're inside foreachBatch
  finalData
    .toDF // Convert a local collection to a Spark DataFrame
    .write  // this creates a batch query
    .format("org.elasticsearch.spark.sql")
    .option("es.mapping.id", "id")
    .option("es.write.operation", "upsert")
    .option("checkpointLocation","/tmp/checkpoint/sg_event")
    .save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query
}

dsWriter - это DataStreamWriter, и теперь вы можете запустить его для запуска потокового запроса.

0 голосов
/ 15 октября 2019

Мне удалось достичь реального решения с помощью SQL-соединений. Пожалуйста, ознакомьтесь с кодом ниже.

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

orignalData.createOrReplaceTempView("stream_data");

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");

Dataset<Row> joinedData = spark.sqlContext().sql("select * from stream_data,es_correlation where es_correlation.key=stream_data.key");

// Or

/* By using Dataset Join Operator
 Dataset<Row> joinedData = orignalData.join(esFirst, "key");

*/

Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);

Dataset<SGEvent> finalData = joinedData.map((MapFunction<Row, SGEvent>) row -> {
 SGEvent event = new SGEvent();
 event.setId(row.get(0).toString());
 event.setKey(row.get(3).toString());
 return event;
},eventEncoderSG);


finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...