У меня есть этот код:
import org.apache.spark.sql.SparkSession
object TopicIngester {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]") // remove this later
.appName("Ingester")
.getOrCreate()
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667" /*my.cluster.com:9092 in case of cloudera*/)
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
.write
.parquet("/user/maria_dev/test")
spark.stop()
}
}
Когда я запускаю его в песочнице hortonworks, все работает нормально. Все доступные данные считываются из темы test
и сохраняются в папку /user/maria_dev/test
.
У меня также есть тема с тем же именем в моем кластере cloudera, и по какой-то причине она застревает на .parquet("/path/to/folder")
и никогда не заканчивается, как если бы она ждала больше данных навсегда или что-то в этом роде.
В чем может быть проблема?