Flink Сериализация ZonedDateTime - PullRequest
0 голосов
/ 01 мая 2018

Мне нужно работать с часовыми поясами и нано вторым временным разрешением. Поэтому я использую ZonedDateTime. Очевидно, Apache Flink не сериализует ZonedDateTime должным образом. Он сериализует часть LocalDateTime, как и ожидалось, однако он забывает обрабатывать часовой пояс.

Когда я, например, регистрирую зонированную дату внутри функции карты потока Flink, я всегда получаю что-то вроде

 2018-03-01T04:10:30.773471918null

Принимая во внимание, что при получении данных я получаю соответствующую зону

 2018-03-01T04:10:30.773471918-05:00

Нуль относится к зоне. Позже, конечно, я получаю исключение нулевого указателя, так как я должен использовать правильное сравнение времени, которому нужна зона.

Как я могу это исправить? Спасибо за ответ.

1 Ответ

0 голосов
/ 01 мая 2018

Я не до конца понимаю, почему он не подхватывает сериализатор. Это решение по крайней мере работает: я реализовал сериализатор Kryo для ZonedDateTime

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime

class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
  setImmutable(true)

  override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
    ZonedDateTimeSerializer.write(out, obj)
  }

  override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
    ZonedDateTimeSerializer.read(in)
  }
}

object ZonedDateTimeSerializer {
  def write(out: Output, obj: ZonedDateTime): Unit = {
    LocalDateSerializer.write(out, obj.toLocalDate)
    LocalTimeSerializer.write(out, obj.toLocalTime)
    ZoneIdSerializer.write(out, obj.getZone)
  }

  def  read(in: Input): ZonedDateTime = {
    val date = LocalDateSerializer.read(in)
    val time = LocalTimeSerializer.read(in)
    val zone = ZoneIdSerializer.read(in)
    ZonedDateTime(date, time, zone)
  }
}

Я взял реализацию из новейшей реализации Kyro . Тогда я зарегистрировал это следующим образом:

    env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])

Кажется, это решает проблему. Не уверен, что это связано с тем, что я использую timesforscala, но я хочу использовать эту библиотеку, потому что она добавляет важные дополнения, от которых я зависел. Комментарии приветствуются.

...