Я попытался использовать примеры документации соединителя MongoDB-Spark, однако они не работают.В результате получается следующее исключение:
MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:8081, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Код в моем объекте Scala:
object SparkTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://localhost:8081/admin.new_col")
.config("spark.mongodb.output.uri", "mongodb://localhost:8081/admin.new_col")
.getOrCreate()
val sparkContext = spark.sparkContext
sparkContext.setLogLevel("ERROR")
val writeConfig = WriteConfig(Map("uri" -> "mongodb://localhost:8081/admin.new_col"))
val documents = sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
MongoSpark.save(documents, writeConfig)
}
}
Я не устанавливал Kafka и MongoDB, но использовал их в docker-compose:
version: '3.3'
services:
kafka:
image: spotify/kafka
ports:
- "9092:9092"
environment:
- ADVERTISED_HOST=localhost
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: user
MONGO_INITDB_ROOT_PASSWORD: password
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: user
ME_CONFIG_MONGODB_ADMINPASSWORD: password
Что может вызвать такое исключение?