Scala / Spark Streaming Store преобразовал сообщения кафки в Hive - PullRequest
0 голосов
/ 28 января 2020

В качестве источника данных я использую поток kafka для приема твитов.

Я написал простое приложение для потоковой передачи искры.

Я могу использовать твиты и могу конвертировать записи в моем собственном классе дел.

Но я не могу записать в таблицу улья, расположенную в том же docker, что и искра.

Можете ли вы дать мне подсказку, как решить it.

import com.fhjoanneum.swd18.grp3.bigdata.convert.TweetToTwitterDbRecord
import com.fhjoanneum.swd18.grp3.bigdata.domain.Tweet
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql._
import org.json4s.jackson.JsonMethods.parse

import org.json4s.{DefaultFormats}



case object TwitterInputStream extends App with LazyLogging {


  val spark = SparkSession
    .builder()
    .appName(s"TestApp")
    .master("local[*]")
    .config("hive.metastore.uris", "thrift://0.0.0.0:9083")
    .enableHiveSupport()
    .getOrCreate()


  spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
  spark.sql("SET hive.exec.parallel=true")
  spark.sql("SET hive.exec.parallel.thread.number=16")


  val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.1.156:9092")
    .option("subscribe", "twitter-status")
    .option("startingOffsets", "latest") // From starting
    .load()

  import spark.implicits._

  val testerDF = df.selectExpr("CAST(value AS STRING)").as[String]

  val parsedMsgs = testerDF.map(value => {

    implicit val formats = DefaultFormats
    val tweet = parse(value).extract[Tweet]

    tweet
  })


// the following part causes my problems:

  val query = parsedMsgs.map(TweetToTwitterDbRecord)
    .writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) =>

    batchDs.write
      .format("parquet")
      .mode(SaveMode.Append)
      .insertInto("grp3.tweets")

  ).start().awaitTermination()


// the commented part works:

//  parsedMsgs.writeStream
//    .format("console")
//    .outputMode("append")
//    .start()
//    .awaitTermination()

}

Таблица, в которую я хочу написать, была создана этим оператором:

CREATE EXTERNAL TABLE `tweets`(
 `id` BigInt,
 `createdAt` String,
 `text` String,
 `userId` Int,
 `geo` String,
 `coordinates` String,
 `place` String,
 `quoteCount` Int,
 `replyCount` Int,
 `retweetCount` Int,
 `favoriteCount` Int,
 `timestampMs` BigInt
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET LOCATION '/data/bigdata/tweets'
TBLPROPERTIES ("parquet.compression"="SNAPPY");

Она не ломается. Я должен переключить свой ЛОГЕР на ОТЛАДКУ, чтобы увидеть, что происходит.

Ниже приведены последние строки моего вывода логгера:

2020-01-28 21:38:53 INFO  ParquetWriteSupport:54 - Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "createdat",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "text",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "userid",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "geo",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "coordinates",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "place",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "quotecount",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "replycount",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "retweetcount",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "favoritecount",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "timestampms",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  } ]
}
and corresponding Parquet message type:
message spark_schema {
  optional int64 id;
  optional binary createdat (UTF8);
  optional binary text (UTF8);
  required int32 userid;
  optional binary geo (UTF8);
  optional binary coordinates (UTF8);
  optional binary place (UTF8);
  required int32 quotecount;
  required int32 replycount;
  required int32 retweetcount;
  required int32 favoritecount;
  optional int64 timestampms;
}


2020-01-28 21:38:53 DEBUG DFSClient:1646 - /data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet: masked=rw-r--r--
2020-01-28 21:38:53 DEBUG Client:1026 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas sending #7
2020-01-28 21:38:53 DEBUG Client:1083 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas got value #7
2020-01-28 21:38:53 DEBUG ProtobufRpcEngine:253 - Call: create took 4ms
2020-01-28 21:38:53 DEBUG DFSClient:1802 - computePacketChunkSize: src=/data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet, chunkSize=516, chunksPerPacket=127, packetSize=65532
2020-01-28 21:38:53 DEBUG LeaseRenewer:301 - Lease renewer daemon for [DFSClient_NONMAPREDUCE_469557971_72] with renew id 1 started
2020-01-28 21:38:53 DEBUG ParquetFileWriter:281 - 0: start
2020-01-28 21:38:53 DEBUG MemoryManager:63 - Allocated total memory pool is: 3626971910
2020-01-28 21:38:53 INFO  CodecPool:151 - Got brand-new compressor [.snappy]
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG RunLengthBitPackingHybridEncoder:119 - Encoding: RunLengthBitPackingHybridEncoder with bithWidth: 1 initialCapacity 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 64
2020-01-28 21:38:53 DEBUG CapacityByteArrayOutputStream:276 - initial slab of size 1024
2020-01-28 21:38:53 DEBUG MemoryManager:138 - Adjust block size from 134,217,728 to 134,217,728 for writer: org.apache.parquet.hadoop.InternalParquetRecordWriter@61628754
2020-01-28 21:38:53 DEBUG RecordConsumerLoggingWrapper:69 - <!-- flush -->
2020-01-28 21:38:53 INFO  InternalParquetRecordWriter:165 - Flushing mem columnStore to file. allocated memory: 0
2020-01-28 21:38:53 DEBUG ParquetFileWriter:682 - 4: end
2020-01-28 21:38:54 DEBUG ParquetFileWriter:692 - 1209: footer length = 1205
2020-01-28 21:38:54 DEBUG BytesUtils:159 - write le int: 1205 => 181 4 0 0
2020-01-28 21:38:54 DEBUG DFSClient:1869 - DFSClient writeChunk allocating new packet seqno=0, src=/data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128213853_0001_m_000000_1/part-00000-72cb3fde-ecbe-4969-aac5-becbd65a147d-c000.snappy.parquet, packetSize=65532, chunksPerPacket=127, bytesCurBlock=0
2020-01-28 21:38:54 DEBUG DFSClient:1815 - Queued packet 0
2020-01-28 21:38:54 DEBUG DFSClient:1815 - Queued packet 1
2020-01-28 21:38:54 DEBUG DFSClient:2133 - Waiting for ack for: 1
2020-01-28 21:38:54 DEBUG DFSClient:585 - Allocating new block
2020-01-28 21:38:54 DEBUG Client:1026 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas sending #8
2020-01-28 21:38:54 DEBUG Client:1083 - IPC Client (1473539708) connection to nodemaster/0.0.0.0:9000 from andreas got value #8
2020-01-28 21:38:54 DEBUG ProtobufRpcEngine:253 - Call: addBlock took 6ms
2020-01-28 21:38:54 DEBUG DFSClient:1390 - pipeline = 172.18.1.3:9866
2020-01-28 21:38:54 DEBUG DFSClient:1390 - pipeline = 172.18.1.2:9866
2020-01-28 21:38:54 DEBUG DFSClient:1601 - Connecting to datanode 172.18.1.3:9866
2020-01-28 21:38:54 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:38:54 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:38:57 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:38:57 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:39:00 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:39:00 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response
2020-01-28 21:39:03 DEBUG AbstractCoordinator:833 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Sending Heartbeat request to coordinator 192.168.1.156:9092 (id: 2147482646 rack: null)
2020-01-28 21:39:03 DEBUG AbstractCoordinator:846 - [Consumer clientId=consumer-1, groupId=spark-kafka-source-d50ae41c-0b12-45c2-838f-c83c7a7e856d-1198433466-driver-0] Received successful Heartbeat response

Я действительно застрял. Я был бы благодарен за любую подсказку.

Спасибо.

Андреас

edit

Хорошо, через некоторое время он ломается со следующим сообщением:

2020-01-28 22:20:21 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/spark-f5efe1d0-c8d1-4b6b-bb60-352114a9cf2d
2020-01-28 22:20:21 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/temporaryReader-3bdb4248-1e34-460c-b0d0-78c01460ff63
2020-01-28 22:20:21 INFO  ShutdownHookManager:54 - Deleting directory /private/var/folders/9g/24386ccd2lg11pqzxj2w5f0r0000gn/T/temporary-d59aa3d8-d255-4134-9793-20a892abaf38
2020-01-28 22:20:21 ERROR DFSClient:930 - Failed to close inode 16620
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /data/bigdata/tweets/_temporary/0/_temporary/attempt_20200128221820_0001_m_000000_1/part-00000-3e620934-eae5-4235-8a7b-2de07b269e8e-c000.snappy.parquet could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2135)
    at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2771)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:876)

так что проблема docker? Как я могу привязать свои контейнеры к своему хосту?

Мой стартовый скрипт:

#!/bin/bash

# Bring the services up
function startServices {
  docker start nodemaster node2 node3
  sleep 5
  echo ">> Starting hdfs ..."
  docker exec -u hadoop -it nodemaster start-dfs.sh
  sleep 5
  echo ">> Starting yarn ..."
  docker exec -u hadoop -d nodemaster start-yarn.sh
  sleep 5
  echo ">> Starting MR-JobHistory Server ..."
  docker exec -u hadoop -d nodemaster mr-jobhistory-daemon.sh start historyserver
  sleep 5
  echo ">> Starting Spark ..."
  docker exec -u hadoop -d nodemaster start-master.sh
  docker exec -u hadoop -d node2 start-slave.sh nodemaster:7077
  docker exec -u hadoop -d node3 start-slave.sh nodemaster:7077
  sleep 5
  echo ">> Starting Spark History Server ..."
  docker exec -u hadoop nodemaster start-history-server.sh
  sleep 5
  echo ">> Preparing hdfs for hive ..."
  docker exec -u hadoop -it nodemaster hdfs dfs -mkdir -p /tmp
  docker exec -u hadoop -it nodemaster hdfs dfs -mkdir -p /user/hive/warehouse
  docker exec -u hadoop -it nodemaster hdfs dfs -chmod g+w /tmp
  docker exec -u hadoop -it nodemaster hdfs dfs -chmod g+w /user/hive/warehouse
  sleep 5
  echo ">> Starting Hive Metastore ..."
  docker exec -u hadoop -d nodemaster hive --service metastore
  echo "Hadoop info @ nodemaster: http://172.18.1.1:8088/cluster"
  echo "DFS Health @ nodemaster : http://172.18.1.1:50070/dfshealth"
  echo "MR-JobHistory Server @ nodemaster : http://172.18.1.1:19888"
  echo "Spark info @ nodemaster  : http://172.18.1.1:8080"
  echo "Spark History Server @ nodemaster : http://172.18.1.1:18080"
}

function stopServices {
  echo ">> Stopping Spark Master and slaves ..."
  docker exec -u hadoop -d nodemaster stop-master.sh
  docker exec -u hadoop -d node2 stop-slave.sh
  docker exec -u hadoop -d node3 stop-slave.sh
  echo ">> Stopping containers ..."
  docker stop nodemaster node2 node3 psqlhms
}

if [[ $1 = "start" ]]; then
  docker network create --subnet=172.18.0.0/16 hadoopnet # create custom network

  # Starting Postresql Hive metastore
  echo ">> Starting postgresql hive metastore ..."
  docker run -d --net hadoopnet --ip 172.18.1.4 --hostname psqlhms --name psqlhms -it postgresql-hms
  sleep 5

  # 3 nodes
  echo ">> Starting nodes master and worker nodes ..."
  docker run -d --net hadoopnet --ip 172.18.1.1 --hostname nodemaster -p 9083:9083 -p 9000:9000 -p 7077:7077 -p 8080:8080 -p 8088:8088 -p 50070:50070 -p 6066:6066 -p 4040:4040 -p 20002:20002 --add-host node2:172.18.1.2 --add-host node3:172.18.1.3 --name nodemaster -it hive
  docker run -d --net hadoopnet --ip 172.18.1.2 --hostname node2 -p 8081:8081 --add-host nodemaster:172.18.1.1 --add-host node3:172.18.1.3 --name node2 -it spark
  docker run -d --net hadoopnet --ip 172.18.1.3 --hostname node3 -p 8082:8081 --add-host nodemaster:172.18.1.1 --add-host node2:172.18.1.2 --name node3 -it spark

  # Format nodemaster
  echo ">> Formatting hdfs ..."
  docker exec -u hadoop -it nodemaster hdfs namenode -format
  startServices
  exit
fi


if [[ $1 = "stop" ]]; then
  stopServices
  docker rm nodemaster node2 node3 psqlhms
  docker network rm hadoopnet
  exit
fi


if [[ $1 = "uninstall" ]]; then
  stopServices
  docker rmi hadoop spark hive postgresql-hms -f
  docker network rm hadoopnet
  docker system prune -f
  exit
fi

echo "Usage: cluster.sh start|stop|uninstall"
echo "                 start  - start existing containers"
echo "                 stop   - stop running processes"
echo "                 uninstall - remove all docker images"
...