Я пытаюсь использовать SparkStreaming для чтения из топика Kafka c, обработки данных и отправки их в другой топик Kafka c, например, topi c -in, topi c -out
Я использую Spark 2.3.1 и Kafka 1.1.0
Я могу обрабатывать данные (с некоторыми предупреждениями, которые не влияют на конечные результаты) ...
Журнал :
INFO BlockManager: 54 - Использование org. apache .spark.storage.RandomBlockReplicationPolicy для политики репликации блоков
INFO BlockManagerMaster: 54 - Регистрация BlockManager BlockManagerId (драйвер, spark-master.had oop, 37981, нет)
INFO BlockManagerMasterEndpoint: 54 - Регистрация диспетчера блоков spark-master.had oop: 37981 с ОЗУ 366,3 МБ, BlockManagerId (драйвер, spark-master.had oop, 37981, нет) )
INFO BlockManagerMaster: 54 - зарегистрированный BlockManager BlockManagerId (драйвер, spark-master.had oop, 37981, отсутствует)
INFO BlockManager: 54 - инициализированный блок-менеджер: BlockManagerId (драйвер, spark-master.had * 1063) *, 37981, нет)
INFO Cont extHandler: 781 - Запущен osjs ServletContextHandler@4bb8855f {/ metrics / json,null,AVAILABLE,@Spark}
WARN Использовано: 66 - усечено строковое представление плана, поскольку он был слишком большим. Это поведение можно настроить, установив «spark.debug.maxToStringFields» в SparkEnv.conf.
WARN InternalKafkaConsumer: 66 - Пропустить отсутствующие записи в [477, 478) (GroupId: spark-kafka-source-e2c16752- 6e63-4cfd-95a6-9066e09eb699--491593354-executor, TopicPartition: topic_in-0).
WARN InternalKafkaConsumer: 66 - Пропускать отсутствующие записи в [958, 959) (GroupId: spark-kafka-source-e2c16752 -6e63-4cfd-95a6-9066e09eb699--491593354-executor, TopicPartition: topic_in-0).
WARN InternalKafkaConsumer: 66 - Пропускать отсутствующие записи в [1442, 1443) (GroupId: spark-kafka-source- e2c16752-6e63-4cfd-95a6-9066e09eb699--491593354-executor, TopicPartition: topic_in-0).
WARN InternalKafkaConsumer: 66 - Пропускать отсутствующие записи в [1919, 1920) (GroupId: spark-kafka-source -e2c16752-6e63-4cfd-95a6-9066e09eb699--491593354-executor, TopicPartition: topic_in-0).
+ 2.000-2.500 похожих предупреждений ...
Но, в некоторые казни, появляется эта ошибка в середине выполнение:
WARN Fetcher: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Fetcher: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in -0
WARN Fetcher: 594 - неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Fetcher: 594 - неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Сборщик: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Сборщик: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Сборщик: 594 - Неизвестная ошибка выборка данных для topi c -partition topic_in-0
WARN Fetcher: 594 - неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Fetcher: 594 - неизвестная ошибка выборки данных для topi c -part topic_in-0
WARN Fetcher: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in-0
WARN Fetcher: 594 - Неизвестная ошибка выборки данных для topi c -partition topic_in -0
Это происходит после того, как я запускаю выполнение и обрабатываю 100 000/300 000 записей, поэтому я не могу применить это решение , и не во всех выполнениях ... Это прерывает выполнение, Spark пытается обработать данные, но ничего не проходит через него, что вызывает у меня ужасную головную боль ...
Это мой код ...
import spark.implicits._
val raw_data = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_servers)
.option("group.id", "spark_kafka_consumer")
.option("startingOffsets", "latest")
.option("enable.auto.commit", true)
.option("failOnDataLoss", false)
.option("subscribe", "topic_in")
.load()
val schema = StructType(List(StructField("col1", StringType, false),
StructField("col2", StringType, true),
StructField("col3", StringType, true),
...))
var parsed_data = raw_data
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*")
// Some filters...
parsed_data = parsed_data.where($"col_12".contains("x12") || $"col_13".contains("x13"))
.withColumn("new_col",lit("1"))
// Some transformations
parsed_data = parsed_data.withColumn("col1", $"col1".cast(IntegerType))
.withColumn("col2", $"col2".cast(IntegerType))
.withColumn("col3", unix_timestamp($"col3", "yyyyMMddHHmmss"))...
val schema_out = struct($"col1", $"col2", $"col3"...)
var query = parsed_data.
select(to_json(schema_out).alias("value")).
writeStream.
format("kafka").
option("kafka.bootstrap.servers", kafka_servers).
option("zookeeper.connect", zookeeper_servers).
option("checkpointLocation", checkpoint_location).
option("topic", "topic_out").
start()
query.awaitTermination()