В качестве источника данных я использую поток kafka для приема твитов.
Я написал простое приложение для потоковой передачи искры.
Я могу использовать твиты и могу конвертировать записи в моем собственном классе дел.
Но я не могу записать в таблицу улья, расположенную в том же docker, что и искра.
Можете ли вы дать мне подсказку, как решить it.
import com.fhjoanneum.swd18.grp3.bigdata.convert.TweetToTwitterDbRecord
import com.fhjoanneum.swd18.grp3.bigdata.domain.Tweet
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql._
import org.json4s.jackson.JsonMethods.parse
import org.json4s.{DefaultFormats}
case object TwitterInputStream extends App with LazyLogging {
val spark = SparkSession
.builder()
.appName(s"TestApp")
.master("local[*]")
.config("hive.metastore.uris", "thrift://0.0.0.0:9083")
.enableHiveSupport()
.getOrCreate()
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("SET hive.exec.parallel=true")
spark.sql("SET hive.exec.parallel.thread.number=16")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.156:9092")
.option("subscribe", "twitter-status")
.option("startingOffsets", "latest") // From starting
.load()
import spark.implicits._
val testerDF = df.selectExpr("CAST(value AS STRING)").as[String]
val parsedMsgs = testerDF.map(value => {
implicit val formats = DefaultFormats
val tweet = parse(value).extract[Tweet]
tweet
})
// the following part causes my problems:
val query = parsedMsgs.map(TweetToTwitterDbRecord)
.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) =>
batchDs.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("grp3.tweets")
).start().awaitTermination()
// the commented part works:
// parsedMsgs.writeStream
// .format("console")
// .outputMode("append")
// .start()
// .awaitTermination()
}
Таблица, в которую я хочу написать, была создана этим оператором:
CREATE EXTERNAL TABLE `tweets`(
`id` BigInt,
`createdAt` String,
`text` String,
`userId` Int,
`geo` String,
`coordinates` String,
`place` String,
`quoteCount` Int,
`replyCount` Int,
`retweetCount` Int,
`favoriteCount` Int,
`timestampMs` BigInt
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET LOCATION '/data/bigdata/tweets'
TBLPROPERTIES ("parquet.compression"="SNAPPY");
Она не ломается. Я должен переключить свой ЛОГЕР на ОТЛАДКУ, чтобы увидеть, что происходит.
Ниже приведены последние строки моего вывода логгера:
2020-01-28 21:38:53 INFO ParquetWriteSupport:54 - Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "createdat",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "text",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "userid",
"type" : "integer",
"nullable" : false,
"metadata" : { }
}, {
"name" : "geo",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "coordinates",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "place",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "quotecount",
"type" : "integer",
"nullable" : false,
"metadata" : { }
}, {
"name" : "replycount",
"type" : "integer",
"nullable" : false,
"metadata" : { }
}, {
"name" : "retweetcount",
"type" : "integer",
"nullable" : false,
"metadata" : { }
}, {
"name" : "favoritecount",
"type" : "integer",
"nullable" : false,
"metadata" : { }
}, {
"name" : "timestampms",
"type" : "long",
"nullable" : true,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional int64 id;
optional binary createdat (UTF8);
optional binary text (UTF8);
required int32 userid;
optional binary geo (UTF8);
optional binary coordinates (UTF8);
optional binary place (UTF8);
required int32 quotecount;
required int32 replycount;
required int32 retweetcount;
required int32 favoritecount;
optional int64 timestampms;
}
2020-01-28 21:38:53 DEBUG DFSClient:1646 - /data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet: masked=rw-r--r--
2020-01-28 21:38:53 DEBUG Client:1026 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas sending #7
2020-01-28 21:38:53 DEBUG Client:1083 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas got value #7
2020-01-28 21:38:53 DEBUG ProtobufRpcEngine:253 - Call: create took 4ms
2020-01-28 21:38:53 DEBUG DFSClient:1802 - computePacketChunkSize: src=/data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet, chunkSize=516, chunksPerPacket=127, packetSize=65532
2020-01-28 21:38:53 DEBUG LeaseRenewer:301 - Lease renewer daemon for [DFSClient_NONMAPREDUCE_469557971_72] with renew id 1 started
2020-01-28 21:38:53 DEBUG ParquetFileWriter:281 - 0: start
2020-01-28 21:38:53 DEBUG MemoryManager:63 - Allocated total memory pool is: 3626971910
2020-01-28 21:38:53 INFO CodecPool:151 - Got brand-new compressor [.snappy]
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG MemoryManager:138 - Adjust block size from 134,217,728 to 134,217,728 for writer: org.apache.parquet.hadoop.InternalParquetRecordWriter@61628754
2020-01-28 21:38:53 DEBUG RecordConsumerLoggingWrapper:69 - <!-- flush -->
2020-01-28 21:38:53 INFO InternalParquetRecordWriter:165 - Flushing mem columnStore to file. allocated memory: 0
2020-01-28 21:38:53 DEBUG ParquetFileWriter:682 - 4: end
2020-01-28 21:38:54 DEBUG ParquetFileWriter:692 - 1209: footer length = 1205
2020-01-28 21:38:54 DEBUG BytesUtils:159 - write le int: 1205 => 181 4 0 0
2020-01-28 21:38:54 DEBUG DFSClient:1869 - DFSClient writeChunk allocating new packet seqno=0, src=/data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet, packetSize=65532, chunksPerPacket=127, bytesCurBlock=0
2020-01-28 21:38:54 DEBUG DFSClient:1815 - Queued packet 0
2020-01-28 21:38:54 DEBUG DFSClient:1815 - Queued packet 1
2020-01-28 21:38:54 DEBUG DFSClient:2133 - Waiting for ack for: 1
2020-01-28 21:38:54 DEBUG DFSClient:585 - Allocating new block
2020-01-28 21:38:54 DEBUG Client:1026 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas sending #8
2020-01-28 21:38:54 DEBUG Client:1083 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas got value #8
2020-01-28 21:38:54 DEBUG ProtobufRpcEngine:253 - Call: addBlock took 6ms
2020-01-28 21:38:54 DEBUG DFSClient:1390 - pipeline = 172.18.1.3:9866
2020-01-28 21:38:54 DEBUG DFSClient:1390 - pipeline = 172.18.1.2:9866
2020-01-28 21:38:54 DEBUG DFSClient:1601 - Connecting to datanode 172.18.1.3:9866
2020-01-28 21:38:54 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:38:54 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:38:57 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:38:57 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:39:00 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:39:00 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:39:03 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:39:03 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
Я действительно застрял. Я был бы благодарен за любую подсказку.
Спасибо.
Андреас
edit
Хорошо, через некоторое время он ломается со следующим сообщением:
2020-01-28 22:20:21 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/spark-f5efe1d0-c8d1-4b6b-bb60-352114a9cf2d
2020-01-28 22:20:21 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/temporaryReader-3bdb4248-1e34-460c-b0d0-78c01460ff63
2020-01-28 22:20:21 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/temporary-d59aa3d8-d255-4134-9793-20a892abaf38
2020-01-28 22:20:21 ERROR DFSClient:930 - Failed to close inode 16620
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128221820_0001_m_000000_1/part-00000-3e620934-eae5-4235-8a7b-2de07b269e8e-c000.snappy.parquet could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2135)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2771)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:876)
так что проблема docker? Как я могу привязать свои контейнеры к своему хосту?
Мой стартовый скрипт:
#!/bin/bash
# Bring the services up
function startServices {
docker start nodemaster node2 node3
sleep 5
echo ">> Starting hdfs ..."
docker exec -u hadoop -it nodemaster start-dfs.sh
sleep 5
echo ">> Starting yarn ..."
docker exec -u hadoop -d nodemaster start-yarn.sh
sleep 5
echo ">> Starting MR-JobHistory Server ..."
docker exec -u hadoop -d nodemaster mr-jobhistory-daemon.sh start historyserver
sleep 5
echo ">> Starting Spark ..."
docker exec -u hadoop -d nodemaster start-master.sh
docker exec -u hadoop -d node2 start-slave.sh nodemaster:7077
docker exec -u hadoop -d node3 start-slave.sh nodemaster:7077
sleep 5
echo ">> Starting Spark History Server ..."
docker exec -u hadoop nodemaster start-history-server.sh
sleep 5
echo ">> Preparing hdfs for hive ..."
docker exec -u hadoop -it nodemaster hdfs dfs -mkdir -p /tmp
docker exec -u hadoop -it nodemaster hdfs dfs -mkdir -p /user/hive/warehouse
docker exec -u hadoop -it nodemaster hdfs dfs -chmod g+w /tmp
docker exec -u hadoop -it nodemaster hdfs dfs -chmod g+w /user/hive/warehouse
sleep 5
echo ">> Starting Hive Metastore ..."
docker exec -u hadoop -d nodemaster hive --service metastore
echo "Hadoop info @ nodemaster: http://172.18.1.1:8088/cluster"
echo "DFS Health @ nodemaster : http://172.18.1.1:50070/dfshealth"
echo "MR-JobHistory Server @ nodemaster : http://172.18.1.1:19888"
echo "Spark info @ nodemaster : http://172.18.1.1:8080"
echo "Spark History Server @ nodemaster : http://172.18.1.1:18080"
}
function stopServices {
echo ">> Stopping Spark Master and slaves ..."
docker exec -u hadoop -d nodemaster stop-master.sh
docker exec -u hadoop -d node2 stop-slave.sh
docker exec -u hadoop -d node3 stop-slave.sh
echo ">> Stopping containers ..."
docker stop nodemaster node2 node3 psqlhms
}
if [[ $1 = "start" ]]; then
docker network create --subnet=172.18.0.0/16 hadoopnet # create custom network
# Starting Postresql Hive metastore
echo ">> Starting postgresql hive metastore ..."
docker run -d --net hadoopnet --ip 172.18.1.4 --hostname psqlhms --name psqlhms -it postgresql-hms
sleep 5
# 3 nodes
echo ">> Starting nodes master and worker nodes ..."
docker run -d --net hadoopnet --ip 172.18.1.1 --hostname nodemaster -p 9083:9083 -p 9000:9000 -p 7077:7077 -p 8080:8080 -p 8088:8088 -p 50070:50070 -p 6066:6066 -p 4040:4040 -p 20002:20002 --add-host node2:172.18.1.2 --add-host node3:172.18.1.3 --name nodemaster -it hive
docker run -d --net hadoopnet --ip 172.18.1.2 --hostname node2 -p 8081:8081 --add-host nodemaster:172.18.1.1 --add-host node3:172.18.1.3 --name node2 -it spark
docker run -d --net hadoopnet --ip 172.18.1.3 --hostname node3 -p 8082:8081 --add-host nodemaster:172.18.1.1 --add-host node2:172.18.1.2 --name node3 -it spark
# Format nodemaster
echo ">> Formatting hdfs ..."
docker exec -u hadoop -it nodemaster hdfs namenode -format
startServices
exit
fi
if [[ $1 = "stop" ]]; then
stopServices
docker rm nodemaster node2 node3 psqlhms
docker network rm hadoopnet
exit
fi
if [[ $1 = "uninstall" ]]; then
stopServices
docker rmi hadoop spark hive postgresql-hms -f
docker network rm hadoopnet
docker system prune -f
exit
fi
echo "Usage: cluster.sh start|stop|uninstall"
echo " start - start existing containers"
echo " stop - stop running processes"
echo " uninstall - remove all docker images"