Я хочу запустить локальное задание Dynamodb без использования кластера EMR,
которые читают данные из некоторой таблицы и записывают их в файл parquet / CSV.
Я не нашел ни одного спарк-динамо-разъема, который бы поддерживал это, может быть, у вас есть идеи?
Пример моего кода:
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
object copyDynamoTable extends App {
val spark = SparkSession
.builder()
.appName("test")
.master("local")
.getOrCreate()
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "hen.poc.client") // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
val orders = spark.sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
println(orders.count)
Я получил следующее исключение:
18/09/05 17:06:41 INFO util.TaskCalculator: Cluster has 1 active nodes.
18/09/05 17:06:41 WARN util.ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.Files.readAllBytes(Files.java:3152)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)
at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:265)
at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:47)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at com.data.spark.dynamodb.copyDynamoTable$.delayedEndpoint$com$riskified$data$spark$dynamodb$copyDynamoTable$1(copyDynamoTable.scala:30)
at com.data.spark.dynamodb.copyDynamoTable$delayedInit$body.apply(copyDynamoTable.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.data.spark.dynamodb.copyDynamoTable$.main(copyDynamoTable.scala:9)
at com.data.spark.dynamodb.copyDynamoTable.main(copyDynamoTable.scala)
Exception in thread "main" java.lang.ArithmeticException: / by zero