У меня есть сервер NodeJS, отвечающий за потоковую передачу данных из API и передачу данных на локальный TCP-порт 8080, на котором Apache Spark прослушивает.
const net = require('net');
const client = new net.Socket();
const axios = require('axios');
client.connect(8080, '127.0.0.1');
client.on('connect', async () => {
const res = await axios.get('https://api.co.za', {
responseType: 'stream',
});
res.data.on('data', chunk => {
client.write(chunk);
});
});
Затем Apache Spark пытается прочитать данные с этого порта.
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
object DataStream {
def main(args: Array[String]) {
val sparkConfig = new SparkConf()
.setAppName("Data Stream")
.setMaster(sys.env.get("spark.master")
.getOrElse("local[*]"))
val sparkContext = new SparkContext(sparkConfig)
sparkContext.setLogLevel("ERROR")
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val data = streamingContext.socketTextStream("127.0.0.1", 8080)
data.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
Затем я открываю порт 8080 с помощью netcat: nc -l 8080
Вот моя проблема, если я сначала запускаю процесс Node, он отправляет данные в порт, но я не вижу, как Spark реагирует на данные. Если я сначала запускаю Spark, мой Node-процесс сообщает, что он пишет, но я не вижу данных, поступающих на порт 8080.
Если я отправляю данные напрямую через netcat после nc -l 8080
, Spark не испытывает проблем с их чтением.
Есть ли какая-то клиентская исключительность, происходящая с этими локальными портами? Есть ли альтернативный способ открытия порта, который будет использоваться таким образом?
ОС: Ubuntu 19.10