Как читать из нескольких тем с помощью spark readStream (), которая
разные схемы и writeStream () для одной темы с помощью Spark
StructedSchema?
Я даю общую идею или указатели здесь ... может подойти вашему делу.
Я предполагаю, что вы используете сообщения avro, есть две темы, одна для сообщения, а другая для схемы, которую я называю темой сообщения и темой схемы.
Теперь подготовьте общую схему оболочки строк, скажем avro_yourrow_wrapper.avsc
, которая содержит различные сообщения схемы (поскольку вы сказали, что каждое сообщение имеет другую схему).
Например: измените этот образец в соответствии с вашими требованиями.
{
"type" : "record",
"name" : "generic_schema",
"namespace" : "yournamespace",
"fields" : [ {
"name" : "messagenameOrTableNames",
"type" : "string"
}, {
"name" : "schema",
"type" : "long"
}, {
"name" : "payload",
"type" : "bytes"
} ]
}
сохранить его в файл с именем avro_yourrow_wrapper.avsc, поскольку он статический ...
// Read the wrapper schema in your consumer.
val inputStream = getClass.getResourceAsStream("avro_yourrow_wrapper.avsc")
val source = scala.io.Source.fromInputStream(inputStream)
val wrapperSchema = try source.mkString finally source.close()
из искрового структурированного потока вы получите информационный кадр. Прочитайте схему оболочки, основанную на типе сообщения, примените специфичную для записи схему, прочитав тему схемы и тему сообщения, прочитав сообщение avro.
Теперь с помощью API-интерфейса twitter bijection (с GenericRecord
) вы можете декодировать сообщение в читаемый формат.
пример фрагмента псевдокода:
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import org.apache.avro.generic.GenericRecord
val schema = new Schema.Parser().parse(localschema.get( recordlevelschema).get)
val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
val record: GenericRecord = recordInjection.invert(bytes).get
log.info("record.getSchema" +record.getSchema)
record.getSchema.getFields.toArray().foreach(x =>log.info(x.toString))
А потом вы можете писать в отдельную тему по своему желанию.