Запускать локальное задание DynamoDB без EMR - PullRequest
0 голосов
/ 05 сентября 2018

Я хочу запустить локальное задание Dynamodb без использования кластера EMR, которые читают данные из некоторой таблицы и записывают их в файл parquet / CSV. Я не нашел ни одного спарк-динамо-разъема, который бы поддерживал это, может быть, у вас есть идеи?

Пример моего кода:

import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

object copyDynamoTable extends App {
  val spark = SparkSession
    .builder()
    .appName("test")
    .master("local")
    .getOrCreate()

  val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
  jobConf.set("dynamodb.servicename", "dynamodb")
  jobConf.set("dynamodb.input.tableName", "hen.poc.client") // Pointing to DynamoDB table
  jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
  jobConf.set("dynamodb.regionid", "us-east-1")
  jobConf.set("dynamodb.throughput.read", "1")
  jobConf.set("dynamodb.throughput.read.percent", "1")
  jobConf.set("dynamodb.version", "2011-12-05")

  jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

  val orders = spark.sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

  println(orders.count)

Я получил следующее исключение:

18/09/05 17:06:41 INFO util.TaskCalculator: Cluster has 1 active nodes.
18/09/05 17:06:41 WARN util.ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.Files.readAllBytes(Files.java:3152)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)
    at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
    at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:265)
    at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:47)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
    at com.data.spark.dynamodb.copyDynamoTable$.delayedEndpoint$com$riskified$data$spark$dynamodb$copyDynamoTable$1(copyDynamoTable.scala:30)
    at com.data.spark.dynamodb.copyDynamoTable$delayedInit$body.apply(copyDynamoTable.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.data.spark.dynamodb.copyDynamoTable$.main(copyDynamoTable.scala:9)
    at com.data.spark.dynamodb.copyDynamoTable.main(copyDynamoTable.scala)
Exception in thread "main" java.lang.ArithmeticException: / by zero

1 Ответ

0 голосов
/ 19 сентября 2018

Это файл, который присутствует в кластере EMR. Это делается для того, чтобы определить тип экземпляра, с которым он работает, чтобы определить некоторые параметры задания, такие как память. Очевидно, что при локальном запуске у вас не будет этого файла, так что ожидается.

Пожалуйста, следуйте нижеуказанной теме:

ЭМИ / github.com / вопросы / 50

...