Как записать данные в Spark в Kinesis Stream? - PullRequest
0 голосов
/ 09 июля 2019

Я создаю Dataframe из темы kafka с использованием потокового воспроизведения.Я хочу записать Dataframe в Kinesis Producer.Я понимаю, что на данный момент нет официального API для этого.Но есть несколько API, доступных через Интернет, но, к сожалению, ни один из них не работал для меня.Версия Spark: 2.2 Scala: 2.11

Я пытался использовать https://github.com/awslabs/kinesis-kafka-connector и собрать банку.Но получаю ошибки из-за противоречивых имен пакетов между этим jar и spark API.Пожалуйста, помогите.

########### Вот код для других:
spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"

import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger

val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()

val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))

xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()

Для jar spark-sql-kinesis_2.11-2.2.0.jar, перейдите к quoble , загрузите пакет для своей версии spark, создайте jar.

Если вы находитесь в корпоративной сети, установите прокси-сервер перед запуском spark.export http_proxy = http://server -ip: порт / export https_proxy = https://server -ip: порт /

1 Ответ

1 голос
/ 09 июля 2019

Kafka Connect - это сервис, к которому вы можете POST свои спецификации разъемов (в данном случае kinesis), который затем заботится о работе разъема. Он также поддерживает довольно много преобразований при обработке записей. Плагины Kafka Connect не предназначены для использования с приложениями Spark.

Если ваш вариант использования требует от вас выполнения бизнес-логики при обработке записей, то вы можете использовать подход Spark Streaming или Structured Streaming.

Если вы хотите использовать подход, основанный на Spark, ниже приведены 2 варианта, которые я могу придумать.

  1. Использовать структурированную потоковую передачу. Вы можете использовать потоковый соединитель Strucuted для Kinesis. Вы можете найти один здесь . Там могут быть и другие. Это единственный стабильный и открытый исходный разъем, о котором я знаю. Вы можете найти пример использования Kinesis в качестве раковины здесь .

  2. Используйте Библиотека Kinesis Producer или aws-java-sdk-kinesis , чтобы публиковать записи из вашего приложения Spark Streaming. Использование KPL является предпочтительным подходом здесь. Вы можете сделать mapPartitions и создать клиент Kinesis для каждого раздела и опубликовать записи, используя эти библиотеки. В документах AWS есть множество примеров для этих двух библиотек.

...