Несовпадение типов при вызове java generi c Java метода из Scala кода - PullRequest
0 голосов
/ 06 февраля 2020

У меня n-количество Java классов с одним суперклассом - модель данных. Список классов является входным параметром для метода Scala, в котором я хочу создать resultStreams и должен создать вызов Java generi c методов из метода процесса. Не могли бы вы написать, как это решить? Я пытался использовать [_ <: SpecificRecordBase], [SpecificRecordBase] при вызове метода, но результат был тот же. </p>

Ошибка

Error:(146, 88) type mismatch;
 found   : Class[_$3] where type _$3 <: org.apache.avro.specific.SpecificRecordBase
 required: Class[org.apache.avro.specific.SpecificRecordBase]
Note: _$3 <: org.apache.avro.specific.SpecificRecordBase, but Java-defined class Class is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: org.apache.avro.specific.SpecificRecordBase`. (SLS 3.2.10)
                AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

Scala код

object GenerickRunnerStackOverFlow  {
  def process(inputClasses :  List[Class[_ <: SpecificRecordBase]],): Unit = {
    val newStream: DataStream[KafkaSourceType] = env.addSource(....)).uid(...).filter(...)

    val resultStreams = inputClasses .map(
      cl => newStream.map(record =>
                AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

        ...
  }

    def main(args: Array[String]): Unit = {
        val topicToClasses: List[Class[_ <: SpecificRecordBase]] = List(Types.RECORD_1.getClassType, Types.RECORD_1.getClassType.getClassType)
        process(topicToClasses)

    }
}

Java метод spe c

public static <A extends SpecificRecord> A deSerializeAvroObject(byte[] object, Class<A> clazz){ ...}

Модель

    public class Record1 extends SpecificRecordBase {}
    public class Record2 extends SpecificRecordBase {}
    ...
    public enum Types {
      RECORD_1(Record1.class),
      RECORD_2(Record2.class);
      ....

      private Class<? extends SpecificRecordBase> clazz;
      public Class<? extends SpecificRecordBase> getClassType() {return this.clazz;}
}

Также у меня та же ошибка сообщения с Scala метод addSink:

def addSink(sinkFunction : org.apache.flink.streaming.api.functions.sink.SinkFunction[T]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }

Я пишу метод оболочки:

def addSinkWithSpecificRecordBase[A <: SpecificRecordBase](
    stream: DataStream[A],
    sink: BucketingSink[A]): DataStreamSink[A] = stream.addSink(sink)

В результате выполнения:

val result = topicToSinkStream.foreach { el =>
  val stream: DataStream[_ <: SpecificRecordBase] = el._2._1
  val sink: BucketingSink[_ <: SpecificRecordBase] = el._2._2
  addSinkWithSpecificRecordBase(stream, sink)
}

Произошла ошибка:

Error:(209, 37) type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[_$9] where type _$9 <: org.apache.avro.specific.SpecificRecordBase
 required: org.apache.flink.streaming.api.scala.DataStream[org.apache.avro.specific.SpecificRecordBase]
Note: _$9 <: org.apache.avro.specific.SpecificRecordBase, but class DataStream is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
      addSinkWithSpecificRecordBase(stream, sink)

Где topicToSinkStream:

Map[String, (DataStream[_ <: SpecificRecordBase], BucketingSink[_ <: SpecificRecordBase])]

Я также попытался исключить SpecificRecordBase в методе gemeri c description и Добавьте + и - к описанию параметров метода. Но результата нет.

1 Ответ

0 голосов
/ 06 февраля 2020

Проблема в том, что тип AvroHelper.deSerializeAvroObject(record.value, cl) равен SpecificRecordBase (_ <: SpecificRecordBase допускается только в типе параметры , но не здесь). Исправление заключается в извлечении вспомогательной функции:

def processClass[A <: SpecificRecordBase](cl: Class[A], newStream: DataStream[KafkaSourceType]) = 
  newStream.map(record => AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

(если вы определите ее локально, вы также можете использовать newStream без указания аргумента), а затем

val resultStreams = inputClasses.map(cl => processClass(cl, newStream))
...