Читайте из нескольких тем и пишите в одну тему - Spark Streaming - PullRequest
0 голосов
/ 12 июня 2019

Как читать из нескольких тем, используя spark readStream() с разными схемами, и writeStream() в одну тему, используя Spark StructedSchema.

Примечание. Каждая тема ввода имеет свою схему

1 Ответ

1 голос
/ 12 июня 2019

Как читать из нескольких тем с помощью spark readStream (), которая разные схемы и writeStream () для одной темы с помощью Spark StructedSchema?

Я даю общую идею или указатели здесь ... может подойти вашему делу.

Я предполагаю, что вы используете сообщения avro, есть две темы, одна для сообщения, а другая для схемы, которую я называю темой сообщения и темой схемы.

Теперь подготовьте общую схему оболочки строк, скажем avro_yourrow_wrapper.avsc, которая содержит различные сообщения схемы (поскольку вы сказали, что каждое сообщение имеет другую схему).

Например: измените этот образец в соответствии с вашими требованиями.

{
  "type" : "record",
  "name" : "generic_schema",
  "namespace" : "yournamespace",
  "fields" : [ {
    "name" : "messagenameOrTableNames",
    "type" : "string"
  }, {
    "name" : "schema",
    "type" : "long"
  }, {
    "name" : "payload",
    "type" : "bytes"
  } ]
}

сохранить его в файл с именем avro_yourrow_wrapper.avsc, поскольку он статический ...

// Read the wrapper schema in your consumer.
    val inputStream = getClass.getResourceAsStream("avro_yourrow_wrapper.avsc")
    val source = scala.io.Source.fromInputStream(inputStream)
    val wrapperSchema = try source.mkString finally source.close()

из искрового структурированного потока вы получите информационный кадр. Прочитайте схему оболочки, основанную на типе сообщения, примените специфичную для записи схему, прочитав тему схемы и тему сообщения, прочитав сообщение avro.

Теперь с помощью API-интерфейса twitter bijection (с GenericRecord) вы можете декодировать сообщение в читаемый формат.

пример фрагмента псевдокода:

import com.twitter.bijection.Injection
        import com.twitter.bijection.avro.GenericAvroCodecs
        import org.apache.avro.generic.GenericRecord
        val schema = new Schema.Parser().parse(localschema.get( recordlevelschema).get)
        val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
        val record: GenericRecord = recordInjection.invert(bytes).get
        log.info("record.getSchema" +record.getSchema)
        record.getSchema.getFields.toArray().foreach(x =>log.info(x.toString))

А потом вы можете писать в отдельную тему по своему желанию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...