JSONDeserializationSchema () устарел во Flink? - PullRequest
0 голосов
/ 28 мая 2020

Я новичок во Flink и делаю что-то очень похожее на ссылку ниже.

Не вижу сообщения при опускании потока kafka и не вижу сообщения печати во flink 1.2

Я также пытаюсь добавить JSONDeserializationSchema () в качестве десериализатора для моего ввода Kafka JSON сообщение без ключа.

Но я обнаружил, что JSONDeserializationSchema () отсутствует.

Пожалуйста дайте мне знать, если я делаю что-то не так.

enter image description here

Ответы [ 2 ]

1 голос
/ 28 мая 2020

JSONDeserializationSchema был удален в Flink 1.8 после того, как ранее был объявлен устаревшим.

Рекомендуемый подход - написать десериализатор, реализующий DeserializationSchema<T>. Вот пример, который я скопировал из Flink Operations Playground :

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
 * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
 *
 */
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

    private static final long serialVersionUID = 1L;

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, ClickEvent.class);
    }

    @Override
    public boolean isEndOfStream(ClickEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ClickEvent> getProducedType() {
        return TypeInformation.of(ClickEvent.class);
    }
}

Для производителя Kafka вы захотите реализовать KafkaSerializationSchema<T>, и вы найдете примеры этого в том же проекте.

0 голосов
/ 29 мая 2020

Чтобы решить проблему чтения неключевых JSON сообщений от Kafka, я использовал класс case и парсер JSON.

Следующий код создает класс case и анализирует поле JSON с помощью play API.

import play.api.libs.json.JsValue

object CustomerModel {

  def readElement(jsonElement: JsValue): Customer = {
    val id = (jsonElement \ "id").get.toString().toInt
    val name = (jsonElement \ "name").get.toString()
    Customer(id,name)
  }
case class Customer(id: Int, name: String)
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx.xxx.0.114:9092")
properties.setProperty("group.id", "test-grp")

val consumer = new FlinkKafkaConsumer[String]("customer", new SimpleStringSchema(), properties)
val stream1 = env.addSource(consumer).rebalance

val stream2:DataStream[Customer]= stream1.map( str =>{Try(CustomerModel.readElement(Json.parse(str))).getOrElse(Customer(0,Try(CustomerModel.readElement(Json.parse(str))).toString))
    })

stream2.print("stream2")
env.execute("This is Kafka+Flink")

}

Метод Try позволяет преодолеть исключение, возникшее при анализе данных, и возвращает исключение в одном из полей (если мы хотим), иначе он может просто вернуть объект класса case с любым заданные поля или поля по умолчанию.

Пример вывода кода:

stream2:1> Customer(1,"Thanh")
stream2:1> Customer(5,"Huy")
stream2:3> Customer(0,Failure(com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
 at [Source: ; line: 1, column: 0]))

Я не уверен, что это лучший подход, но на данный момент он работает для меня.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...