Как динамически генерировать наборы данных на основе схемы? - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть несколько схем, как показано ниже, с разными именами столбцов и типами данных.Я хочу сгенерировать тестовые / смоделированные данные, используя DataFrame с Scala для каждой схемы, и сохранить их в файле паркета.

Ниже приведен пример схемы (из образца json) для динамического генерирования данных с использованием фиктивных значений в нем..

val schema1 = StructType(
  List(
    StructField("a", DoubleType, true),
    StructField("aa", StringType, true)
    StructField("p", LongType, true),
    StructField("pp", StringType, true)
  )
)

Мне нужен rdd / dataframe, подобный этому, с 1000 строками, каждая из которых основана на количестве столбцов в приведенной выше схеме.

val data = Seq(
  Row(1d, "happy", 1L, "Iam"),
  Row(2d, "sad", 2L, "Iam"),
  Row(3d, "glad", 3L, "Iam")
)

В основном .. как и эти 200 наборов данных существуют длячто мне нужно генерировать данные динамически, написание отдельных программ для каждой схемы просто невозможно для меня.

Pls.помогите мне с вашими идеями или импл.как я новичок в спарке.

Можно ли генерировать динамические данные на основе схем разных типов?

Ответы [ 3 ]

0 голосов
/ 15 декабря 2018

Вы могли бы сделать что-то вроде этого

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

import scala.util.Random

object Test extends App {

  val structType: StructType = StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("aa", StringType, true),
      StructField("p", LongType, true),
      StructField("pp", StringType, true)
    )
  )

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .config(new SparkConf())
    .getOrCreate()

  import spark.implicits._

  val df = createRandomDF(structType, 1000)

  def createRandomDF(structType: StructType, size: Int, rnd: Random = new Random()): DataFrame ={
    spark.read.schema(structType).json((0 to size).map { _ => compact(randomJson(rnd, structType))}.toDS())
  }

  def randomJson(rnd: Random, dataType: DataType): JValue = {

    dataType match {
      case v: DoubleType =>
        json4s.JDouble(rnd.nextDouble())
      case v: StringType =>
        JString(rnd.nextString(10))
      case v: IntegerType =>
        JInt(rnd.nextInt())
      case v: LongType =>
        JInt(rnd.nextLong())
      case v: FloatType =>
        JDouble(rnd.nextFloat())
      case v: BooleanType =>
        JBool(rnd.nextBoolean())
      case v: ArrayType =>
        val size = rnd.nextInt(10)
        JArray(
          (0 to size).map(_ => randomJson(rnd, v.elementType)).toList
        )
      case v: StructType =>
        JObject(
          v.fields.flatMap {
            f =>
              if (f.nullable && rnd.nextBoolean())
                None
              else
                Some(JField(f.name, randomJson(rnd, f.dataType)))
          }.toList
        )
    }
  }
}
0 голосов
/ 17 декабря 2018

Используя совет @ JacekLaskowski, вы можете генерировать динамические данные, используя генераторы с ScalaCheck (Gen) на основе ожидаемых вами полей / типов.

Это может выглядеть так:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
import org.scalacheck._

import scala.collection.JavaConverters._

val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
  ("a", DoubleType) -> Gen.choose(0.0, 100.0),
  ("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
  ("p", LongType) -> Gen.choose(0L, 10L),
  ("pp", StringType) -> Gen.oneOf("Iam", "You're")
)

val schemas = Map(
  "schema1" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("aa", StringType, true),
      StructField("p", LongType, true),
      StructField("pp", StringType, true)
    )),
  "schema2" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("pp", StringType, true),
      StructField("p", LongType, true)
    )
  )
)

val numRecords = 1000

schemas.foreach {
  case (name, schema) =>
    // create a data frame
    spark.createDataFrame(
      // of #numRecords records
      (0 until numRecords).map { _ =>
        // each of them a row
        Row.fromSeq(schema.fields.map(field => {
          // with fields based on the schema's fieldname & type else null
          dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
        }))
      }.asJava, schema)
      // store to parquet
      .write.mode(SaveMode.Overwrite).parquet(name)
}
0 голосов
/ 14 декабря 2018

ScalaCheck - это среда для генерации данных, вы генерируете необработанные данные на основе схемы, используя ваши собственные генераторы.

Посетите Документация ScalaCheck .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...