Я написал этот код для отправки потоковых данных, поступающих из twitter в elasticsearch, я добавил все необходимые зависимости, но у меня проблема с двумя функциями toDS и toES, пожалуйста, помогите мне решить эту проблему, это мой код: `package org .lansrod.visualisation
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.sql.{Row, SparkSession}
object twitter {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("twitter")
val ssc = new StreamingContext(conf, Seconds(5)) // spark streaming context
val ACCESS_TOKEN = "my access token"
val ACCESS_SECRET = "my access secret"
val CONSUMER_KEY = "my consumer key"
val CONSUMER_SECRET = "my consumer secret"
val cb = new ConfigurationBuilder
cb.setDebugEnabled(true).setOAuthConsumerKey(CONSUMER_KEY)
.setOAuthConsumerSecret(CONSUMER_SECRET)
.setOAuthAccessToken(ACCESS_TOKEN)
.setOAuthAccessTokenSecret(ACCESS_SECRET)
val auth = new OAuthAuthorization(cb.build) //avoir l'authorisation
val tweets = TwitterUtils.createStream(ssc, Some(auth))
ssc.start()
ssc.awaitTermination()
tweets.foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val caseClassDS = rdd.toDS()
caseClassDS.saveToEs("spark/docs")
}
}
}
`
и мой build.sbt выглядит следующим образом:
scalaVersion := "2.11.0"
val sparkVersion = "2.4.6"
`libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided", //provided
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.elasticsearch" %% "elasticsearch-spark-20" % "7.6.1" ,
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" % "spark-streaming-twitter_2.11" % "1.6.1" exclude("org.twitter4j",
"twitter4j"),
"org.twitter4j" % "twitter4j-core" % "2.2.0",
"org.twitter4j" % "twitter4j-stream" % "2.2.0",
"org.apache.spark`enter code here`" %% "spark-mllib" % sparkVersion)