Фильтр луча по преобразованию: перегруженное значение метода не может быть применено к SimpleFunction - PullRequest
1 голос
/ 11 марта 2019

В следующем коде я пытаюсь прочитать файл CSV, расположенный в dataFile, используя TextIO Beam и отфильтровать его строку заголовка, но я получаю сообщение об ошибке компиляции с этим сообщением:

Error:(ROW, COLUMN) overloaded method value by with alternatives:
  [T, PredicateT <: org.apache.beam.sdk.transforms.SerializableFunction[T,Boolean]](x$1: PredicateT)org.apache.beam.sdk.transforms.Filter[T] <and>
  [T, PredicateT <: org.apache.beam.sdk.transforms.ProcessFunction[T,Boolean]](x$1: PredicateT)org.apache.beam.sdk.transforms.Filter[T]
 cannot be applied to (org.apache.beam.sdk.transforms.SimpleFunction[String,Boolean])
              .by(nonHeaderFilter))

Код:

val nonHeaderFilter: SimpleFunction[String, Boolean] = new SimpleFunction[String, Boolean]() {
    override def apply(input: String): Boolean = {
        input != MyClass.CsvHeader
    }
}

def readDataFile(input: PBegin, dataFile: String): PCollection[String] = {
    input
    .apply("Read Data File", TextIO.read().from(dataFile))
    .apply("Filter Header Line", Filter.by(nonHeaderFilter))
}

Я думаю, что проблема связана с тем фактом, что SerializableFunction является ProcessFunction, а SimpleFunction является SerializableFunction.Почему-то это неправильно обрабатывается в Scala.

Есть какие-либо рекомендации, чтобы избежать этой проблемы, или я что-то неправильно понял?

Редактировать (обходной путь):

Чтобы временно решить проблему, на случай, если кто-то еще столкнулся с этой проблемой, я создал статический метод Java для предоставления необходимого фильтра:

import org.apache.beam.sdk.transforms.Filter;

public class BeamTransformProvider {

    public static Filter<String> notEqualFilter(String value) {
        return Filter.by(input -> !input.equals(value));
    }

}

, который можно использовать, как показано ниже:

def readDataFile(input: PBegin, dataFile: String): PCollection[String] = {
    input
    .apply("Read Data File", TextIO.read().from(dataFile))
    .apply("Filter Header Line", BeamTransformProvider.notEqualFilter(MyClass.CsvHeader))
}
...