Spark работа чтения отсортированных файлов AVRO в dataframe, но запись в кафку без заказа - PullRequest
0 голосов
/ 20 сентября 2019

У меня есть файлы AVRO, отсортированные по ID, и у каждого ID есть папка с именем «ID = 234», а данные внутри папки имеют формат AVRO и отсортированы по дате.Я запускаю искровое задание, которое принимает входной путь и читает avro в датафрейме.Этот фрейм данных затем записывает в кафку тему с 5 разделами.

val properties: Properties = getProperties(args)


val spark = SparkSession.builder().master(properties.getProperty("master"))
  .appName(properties.getProperty("appName")).getOrCreate()
val sqlContext = spark.sqlContext

val sourcePath = properties.getProperty("sourcePath")

val dataDF = sqlContext.read.avro(sourcePath).as("data")
val count = dataDF.count();
val schemaRegAdd = properties.getProperty("schemaRegistry")

val schemaRegistryConfs = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> schemaRegAdd,
  SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME
)
val start = Instant.now

dataDF.select(functions.struct(properties.getProperty("message.key.name")).alias("key"), functions.struct("*").alias("value"))
  .toConfluentAvroWithPlainKey(properties.getProperty("topic"), properties.getProperty("schemaName"),
  properties.getProperty("schemaNamespace"))(schemaRegistryConfs)
  .write.format("kafka")
  .option("kafka.bootstrap.servers",properties.getProperty("kafka.brokers"))
  .option("topic",properties.getProperty("topic")).save()

}

Мой вариант использования заключается в последовательной записи всех сообщений от каждого идентификатора (отсортированных по дате), например, сначала следует добавить все отсортированные данные из одного идентификатора 1, а затем из идентификатора 2и так далее.Kafka сообщение имеет ключ в качестве идентификатора.

1 Ответ

0 голосов
/ 21 сентября 2019

Не забыл, что данные внутри RDD / набора данных случайны, когда вы выполняете преобразования, поэтому вы теряете порядок.

лучший способ добиться этого - прочитать файл один за другим и отправить его в kafka вместо чтения полного каталога в вашем val sourcePath = properties.getProperty("sourcePath")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...