Как извлечь значения из строки kafka через spark при структурированной потоковой передаче? - PullRequest
2 голосов
/ 30 апреля 2019

Учитывая данные, которые я извлек из Kafka. Как извлечь из него значения с помощью сопоставления с образцом?

Дата кадра:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()

Моя проблема в том, что схема выглядит следующим образом:

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Этот двоичный тип - это то, что я не могу сопоставить с шаблоном. Как мне извлечь это значение, а затем проанализировать его?

1 Ответ

2 голосов
/ 30 апреля 2019

Вопрос: Как мне извлечь это значение, а затем проанализировать его?

Я предполагаю, что вы используете сообщение avro и можете попробовать, как показано ниже: (я не знаю, что выздесь выполняется попытка сопоставления с шаблоном) decodeAndParseObject функция использует биение твиттеров api с зависимостью ниже

<!-- https://mvnrepository.com/artifact/com.twitter/bijection-avro -->
<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.10</artifactId>
    <version>0.7.0</version>
</dependency>

val ds = df.select("value").as[Array[Byte]].map(x=>decodeAndParseObject(x))

где

import org.apache.avro.generic.GenericRecord
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
/**
* decode and parse binary based on your schema... your logic goes here
*/
def decodeAndParseObject(message: Array[Byte]) =  {

val schema = new Schema.Parser().parse("yourschemahere")

val recordInjection: Injection[GenericRecord, Array[Byte]] = 

GenericAvroCodecs.toBinary(schema)

val record: GenericRecord = recordInjection.invert(message).get
println(record.getSchema)
record.getSchema.getFields.toArray().foreach(println)
println("\n\n\n\n\n\n Record " + record.toString.replaceAll(",", "\n"))
//get the column and do pattern matching....
// Prepare another generic record .... I'm leaving it as blank here...

record   

}

Обновление: вы можете использовать описанную выше обобщенную запись и получить нужный столбец, используя record.get("yourcolumn"), и выполнить для этого случай сопоставления с шаблоном scala.

...