Scala - Как отфильтровать KStream (Kafka Streams) - PullRequest
0 голосов
/ 27 марта 2020

Я новичок в Scala и пытаюсь отфильтровать KStream [String, JsonNode] на основе полей второго компонента.

Например, рабочий код Java такой :

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
...
import com.fasterxml.jackson.databind.JsonNode;
...
...
final KStream<String, JsonNode> source = streamsBuilder.stream(inputTopic,
                    Consumed.with(Serdes.String(), jsonSerde));


// filter and producer preprocessed
source.filter((k, v) -> v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
    .to(outputTopic, Produced.with(Serdes.String(), jsonSerde));

Я пробовал это:

import org.apache.kafka.streams.kstream.{Produced,Consumed,KStream};
import org.apache.kafka.streams.StreamsBuilder;
...
import com.fasterxml.jackson.databind.JsonNode;
...
var source:KStream[String, JsonNode] = streamsBuilder.stream(inputTopic, Consumed.`with`(Serdes.String(), jsonSerde));

source.filter({
  case (k:String ,v:JsonNode) => 
    (v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0)
  })
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

В вышеупомянутой попытке я получил:

Описание Resource Path Location Type отсутствует тип параметра для расширенного функция Типы аргументов анонимной функции должны быть полностью известны. (SLS 8.5) Ожидаемый тип: org. apache .kafka.streams.kstream.Predicate [? >: Строка,? >: com.faster xml .jackson.databind.JsonNode]

Я также попытался это сделать:

source.filter((_._2.get("total_cost").asDouble() > 0 && _._2.get("num_items").asInt() > 0))
  .to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

Как я могу отфильтровать этот объект в Scala? Заранее спасибо.

1 Ответ

3 голосов
/ 27 марта 2020

org.apache.kafka.streams.kstream.Predicate происходит из JavaAPI, в Scala 2.11 (я полагаю, вы используете его) вы должны явно реализовать интерфейс, таким образом:

source.filter(new Predicate[String, JsonNode]() {
  override def test(k: String, v: JsonNode): Boolean = {
    v.get("total_cost").asDouble() > 0 && v.get("num_items").asInt() > 0
  }
}).to(outputTopic, Produced.`with`(Serdes.String(), jsonSerde));

должно работать.

Подробнее о SAM s (Single Abstract Methods) можно найти здесь

Обратите внимание, что вам не нужно использовать Java API - есть первоклассный Scala API .

...