Какой быстрый способ создания файлов данных паркета с помощью Spark для тестирования Hive / Presto / Drill / и т. Д.? - PullRequest
0 голосов
/ 06 июня 2019

Мне часто приходится создавать файлы паркета для тестирования компонентов инфраструктуры, таких как Hive, Presto, Drill и т. Д.

В сети есть на удивление несколько примеров наборов данных о паркете, и один из тех, с которыми я сталкиваюсьздесь https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet - это фиктивные данные для номеров кредитных карт, доходов и т. д. Мне не нравится иметь их в своих хранилищах данных, если кто-то считает их реальными.

Каков наилучший способ полученияфайлы данных паркета, когда вам нужно проверить?У меня обычно есть искра вокруг и в конечном итоге использую это;и я опубликую свое решение как ответ, так как, кажется, здесь не существует.Но мне любопытно, какие лучшие решения люди используют, используя искру или другие технологии.

Ответы [ 3 ]

0 голосов
/ 07 июня 2019

Полагаю, основная цель - генерировать данные, а не записывать их в определенном формате.

Давайте начнем с очень простого примера.

Для генерации произвольного DataFrame первое, что вам нужно, это его схема. В дальнейшем я буду использовать очень простую схему, моделирующую некоторые пользовательские транзакции.

val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)

Пакет com.holdenkarau.spark.testing имеет объект DataframeGenerator. У этого объекта есть два метода: два генерируют DataFrames: .arbitraryDataFrame (полностью случайный результат) и .arbitraryDataFrameWithCustomFields (где вы можете устанавливать собственные генераторы для данных атрибутов, другие будут генерироваться автоматически).

Генератор DataFrame получает sqlContext и схему в качестве входных данных.

val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema)

И функция для получения случайного DataFrame.

def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100), 10)
      .get

И вот результирующий набор данных:

+-----------+------------------------------+-----------------------+
|user_id    |ts                            |amount                 |
+-----------+------------------------------+-----------------------+
|-375726664 |1970-01-01 03:00:00.001       |-2.9945060451319086E271|
|0          |1970-01-01 02:59:59.999       |-4.774320614638788E-237|
|1          |215666-12-06 17:54:3333.972832|8.78381185978856E96    |
|-2147483648|1970-01-01 03:00:00.001       |1.6036825986813454E58  |
|568605722  |219978-07-03 23:47:3737.050592|6.632020739877623E-165 |
|-989197852 |1970-01-01 03:00:00.001       |8.92083260179676E233   |
|-2147483648|264209-01-26 00:54:2525.980256|-7.986228470636884E-216|
|0          |145365-06-27 03:25:5656.721168|-5.607570396263688E-45 |
|-1         |1970-01-01 02:59:59.999       |2.4723152616146036E-227|
|-2147483648|4961-05-03 05:19:42.439408    |1.9109576041021605E83  |
+-----------+------------------------------+-----------------------+

Полный код:

import co.featr.sia.utils.spark.getSparkSession
import com.holdenkarau.spark.testing.DataframeGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalacheck.rng.Seed
import org.scalacheck.{Arbitrary, Gen}

object GenerateData {
  Logger.getLogger("org").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val spark = spark.builder.master("local").getOrCreate()
    val runner = new GenerateData(spark)
    runner.run()
  }
}

class GenerateData(spark: SparkSession) {

  def run(): Unit = {
    val df: DataFrame = generateTransactionsDF()
    df.show(10, false)
  }

  def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100))
      .get

  lazy val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema, 10)

  lazy val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)
}
0 голосов
/ 08 июня 2019

Библиотека pyarrow для Python позволяет писать паркет из DataFrame от pandas с помощью всего нескольких строк кода.

https://arrow.apache.org/docs/python/parquet.html

0 голосов
/ 06 июня 2019

Мое обычное решение этой проблемы - использовать Spark и изменяемый список в Scala для создания простых примеров данных. Я привожу даты и другие типы данных по мере необходимости, но так я обычно и поступаю.

По сути, я просто превращаю изменяемый список во фрейм данных и объединяю количество целевых файлов, которые мне нужны в выводе, а затем сохраняю в паркет.

//Create a mutable list buffer based on a loop.
import scala.collection.mutable.ListBuffer
var lb = ListBuffer[(Int, Int, String)]()
for (i <- 1 to 5000) {
  lb += ((i, i*i, "Number is " + i + "."))
}

//Convert it to a data frame.
import spark.implicits._
val df = lb.toDF("value", "square", "description")

df.coalesce(5).write.mode(SaveMode.Overwrite).parquet("<your-hdfs-path>/name.parquet")

Было бы очень хорошо иметь способ сделать это без искры. Кроме того, если бы я хотел гораздо большие наборы данных, мне пришлось бы изменить это, чтобы избежать генерации записей в драйвере; это больше для небольших и средних наборов данных.

...