Полагаю, основная цель - генерировать данные, а не записывать их в определенном формате.
Давайте начнем с очень простого примера.
Для генерации произвольного DataFrame первое, что вам нужно, это его схема.
В дальнейшем я буду использовать очень простую схему, моделирующую некоторые пользовательские транзакции.
val transactionsSchema: StructType = new StructType()
.add("user_id", IntegerType)
.add("ts", TimestampType)
.add("amount", DoubleType)
Пакет com.holdenkarau.spark.testing
имеет объект DataframeGenerator
.
У этого объекта есть два метода: два генерируют DataFrames: .arbitraryDataFrame
(полностью случайный результат) и .arbitraryDataFrameWithCustomFields
(где вы можете устанавливать собственные генераторы для данных атрибутов, другие будут генерироваться автоматически).
Генератор DataFrame получает sqlContext и схему в качестве входных данных.
val transactionsDFGenerator: Arbitrary[DataFrame] =
DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema)
И функция для получения случайного DataFrame.
def generateTransactionsDF(): DataFrame =
transactionsDFGenerator
.arbitrary(Gen.Parameters.default, Seed(100), 10)
.get
И вот результирующий набор данных:
+-----------+------------------------------+-----------------------+
|user_id |ts |amount |
+-----------+------------------------------+-----------------------+
|-375726664 |1970-01-01 03:00:00.001 |-2.9945060451319086E271|
|0 |1970-01-01 02:59:59.999 |-4.774320614638788E-237|
|1 |215666-12-06 17:54:3333.972832|8.78381185978856E96 |
|-2147483648|1970-01-01 03:00:00.001 |1.6036825986813454E58 |
|568605722 |219978-07-03 23:47:3737.050592|6.632020739877623E-165 |
|-989197852 |1970-01-01 03:00:00.001 |8.92083260179676E233 |
|-2147483648|264209-01-26 00:54:2525.980256|-7.986228470636884E-216|
|0 |145365-06-27 03:25:5656.721168|-5.607570396263688E-45 |
|-1 |1970-01-01 02:59:59.999 |2.4723152616146036E-227|
|-2147483648|4961-05-03 05:19:42.439408 |1.9109576041021605E83 |
+-----------+------------------------------+-----------------------+
Полный код:
import co.featr.sia.utils.spark.getSparkSession
import com.holdenkarau.spark.testing.DataframeGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalacheck.rng.Seed
import org.scalacheck.{Arbitrary, Gen}
object GenerateData {
Logger.getLogger("org").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val spark = spark.builder.master("local").getOrCreate()
val runner = new GenerateData(spark)
runner.run()
}
}
class GenerateData(spark: SparkSession) {
def run(): Unit = {
val df: DataFrame = generateTransactionsDF()
df.show(10, false)
}
def generateTransactionsDF(): DataFrame =
transactionsDFGenerator
.arbitrary(Gen.Parameters.default, Seed(100))
.get
lazy val transactionsDFGenerator: Arbitrary[DataFrame] =
DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema, 10)
lazy val transactionsSchema: StructType = new StructType()
.add("user_id", IntegerType)
.add("ts", TimestampType)
.add("amount", DoubleType)
}