Я пытаюсь использовать поток кинезиса и выполнять преобразования на нем с помощью потоковой искры, но не получаю результатов.
Теперь приведенный ниже код в условии if работает нормально. когда я получаю ненулевые записи в потоке, он печатает счет.
Если я использую logger.warn ("") в предложении if в случае операторов печати, я просто продолжаю получать строки ниже в моем журнале, а не фактическое количество:
INFO BlockManagerInfo: добавлен ввод-0-1547121901568 в памяти на ip-10-225-172-215.ec2.internal: 37820
Не знаю, почему spark не может выполнить count () при использовании логгера.
Также до сих пор я могу сохранять str rdd в формате hdf.
Мой поток выводит записи в формате JSON.
Теперь, если я попытаюсь проанализировать мой поток как json и загрузить, как показано ниже, снова я получаю только информацию ниже в моем журнале:
INFO BlockManagerInfo: добавлен ввод-0-1547121901577 в памяти на ip-10-225-172-70.ec2.internal: 39438 (размер: 6,4 МБ, бесплатно: 1238,7 МБ)
ИНФОРМАЦИЯ BlockManagerInfo: добавлен input-0-1547121901578 в памяти на ip-10-225-172-215.ec2.internal: 37820 (размер: 5,1 МБ, бесплатно: 1233,6 МБ)
ИНФОРМАЦИЯ BlockManagerInfo: добавлен input-0-1547121901578 в памяти на ip-10-225-172-70.ec2.internal: 39438 (размер: 5,1 МБ, бесплатно: 1233,6 МБ)
ИНФОРМАЦИЯ JobScheduler: добавлены задания на время 1547121755000 мс
Вот код, который я использую:
object KinesisWatch {
val logger: Logger = Logger.getLogger("##### Kinesis-Logs #####")
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.INFO)
logger.setLevel(Level.INFO)
// Kinesis, AWS and Spark streaming configurations
val kinesisConfig = Map(
("appName", "spark-kinesis"),
("streamName", "xxxx"),
("endpointUrl", "https://kinesis.us-east-1.amazonaws.com"),
("numStreams", "2"),
("regionName", "us-east-1"),
("awsAccessKey", "xxxxx"),
("awsSecretKey", "xxxxxx"),
("sparkStreamingBatchInterval", "5"),
("kinesisCheckpointInterval", "5"))
logger.warn("Setting up SparkConfig and StreamingContext")
val sparkSession = new SparkSessionWrapper().sparkSession
val sc = sparkSession.sparkContext
val sqlContext = sparkSession.sqlContext
val ssc = new StreamingContext(sc, Seconds(kinesisConfig("sparkStreamingBatchInterval").toInt))
val awsCredentials = SparkAWSCredentials.builder.basicCredentials(kinesisConfig("awsAccessKey"), kinesisConfig("awsSecretKey"))
val kinesisStreams = (0 until kinesisConfig("numStreams").toInt).map { i =>
KinesisInputDStream.builder
.streamingContext(ssc)
.endpointUrl(kinesisConfig("endpointUrl"))
.regionName(kinesisConfig("regionName"))
.streamName(kinesisConfig("streamName"))
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(kinesisConfig("appName"))
.checkpointInterval(Seconds(kinesisConfig("kinesisCheckpointInterval").toInt))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.kinesisCredentials(awsCredentials.build())
.build()
}
val unionStreams = ssc.union(kinesisStreams)
unionStreams.foreachRDD(rdd => {
if
(rdd.count()>0){
println("New records found\nmetrics count in the batch: %s".format(rdd.count()))
println("performing transformations")
val str = rdd.map(str => new String(str))
str.saveAsTextFile("/home/aman/EDA_BP_Kinesis")
}
else
println("No new record found")
})
ssc.start()
ssc.awaitTermination()
}
Не работает:
unionStreams.foreachRDD(rdd => {
if
(rdd.count()>0){
println("New records found\nmetrics count in the batch: %s".format(rdd.count()))
println("performing transformations")
val str = rdd.map(str => new String(str))
str.saveAsTextFile("/home/aman/EDA_BP_Kinesis")
import sparkSession.implicits._
val records = str.toString()
val df = sparkSession.read.json(Seq(records).toDS)
df.count()
df.show()
}
else
println("No new record found")
})
Я указал количество шардов 2, потоков 2 и ядер 4.