Искробезопасная скала: как я могу передать массив строк в фильтр? - PullRequest
0 голосов
/ 25 января 2019

Я хочу заменить строку «a» для массива строк, делая .contains () для проверки каждой строки в массиве. Это возможно?

val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.contains("a")))

Edit:

Также попробовал это (sc is sparkContext):

val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(a.contains(_)))

И получил следующую ошибку:

java.io.NotSerializableException: Объект org.apache.spark.streaming.twitter.TwitterInputDStream сериализуется, возможно, как часть закрытия операции RDD. Это связано с тем, что на объект DStream ссылаются из замыкания. Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого. Это было применено, чтобы избежать раздувания задач Spark ненужными объектами.

Затем я попытался передать массив перед использованием:

val aBroadcast = sc.broadcast(a)
val filtered = stream.flatMap(status => status.getText.split(" ").filter(aBroadcast.value.contains(_)))

И получил ту же ошибку.

Спасибо

1 Ответ

0 голосов
/ 26 января 2019

Как я понимаю, вопрос, который вы хотите увидеть, содержит ли текст состояния после разделения список слов, который является подмножеством a:

val a = Array("a1", "a2")
val filtered = stream.flatMap(status => status.getText.split(" ").filter(_.forall(a contains))
...