Лучший подход для разбора большого структурированного файла с Apache spark - PullRequest
0 голосов
/ 31 октября 2018

У меня огромный текстовый файл (в ГБ) с плановыми текстовыми данными в каждой строке, который необходимо проанализировать и извлечь в структуру для дальнейшей обработки. В каждой строке есть текст длиной 200 символов, и у меня есть Регулярное выражение для разбора каждой строки и разделения на разные группы, которые впоследствии будут сохранены в данные плоского столбца

образец данных

1759387ACD06JAN1910MAR191234567ACRT

RegExp

(.{7})(.{3})(.{7})(.{7})(.{7})(.{4})

Структура данных

Customer ID, Code, From Date, To Date, TrasactionId, Product code
1759387,     ACD,  06JAN19,   10MAR19,  1234567,     ACRT

Пожалуйста, предложите ЛУЧШИЙ подход для анализа этих огромных данных и отправки в сетку In Memory, которая снова будет использоваться Spark Jobs для дальнейшей обработки при вызове соответствующих API.

Ответы [ 2 ]

0 голосов
/ 01 ноября 2018

Вы можете использовать подход DF. Скопируйте серийный файл в HDFS с помощью команды -copyFromLocal и используйте приведенный ниже код для разбора каждой записи

Я предполагаю пример записей в файле gireesh.txt, как показано ниже

1759387ACD06JAN1910MAR191234567ACRT
2759387ACD08JAN1910MAY191234567ACRY
3759387ACD03JAN1910FEB191234567ACRZ

Код искры

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.Encoders._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object Gireesh {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().appName("Operations..").master("local[*]").getOrCreate()
    import spark.implicits._
    val pat="""(.{7})(.{3})(.{7})(.{7})(.{7})(.{4})""".r
    val headers = List("custid","code","fdate","tdate","tranid","prdcode")
    val rdd = spark.sparkContext.textFile("in/gireesh.txt")
      .map( x => {
              val y = scala.collection.mutable.ArrayBuffer[String]()
              pat.findAllIn(x).matchData.foreach( m=> y.appendAll(m.subgroups))
           (y(0).toLong,y(1),y(2),y(3),y(4).toLong,y(5))
          }
      )
    val df = rdd.toDF(headers:_*)
    df.printSchema()
    df.show(false)

  }
}

дает следующие результаты.

root
 |-- custid: long (nullable = false)
 |-- code: string (nullable = true)
 |-- fdate: string (nullable = true)
 |-- tdate: string (nullable = true)
 |-- tranid: long (nullable = false)
 |-- prdcode: string (nullable = true)

+-------+----+-------+-------+-------+-------+
|custid |code|fdate  |tdate  |tranid |prdcode|
+-------+----+-------+-------+-------+-------+
|1759387|ACD |06JAN19|10MAR19|1234567|ACRT   |
|2759387|ACD |08JAN19|10MAY19|1234567|ACRY   |
|3759387|ACD |03JAN19|10FEB19|1234567|ACRZ   |
+-------+----+-------+-------+-------+-------+

EDIT1:

Карта может быть преобразована в отдельную функцию, как показано ниже.

def parse(record:String) = {
  val y = scala.collection.mutable.ArrayBuffer[String]()
  pat.findAllIn(record).matchData.foreach( m=> y.appendAll(m.subgroups))
  (y(0).toLong,y(1),y(2),y(3),y(4).toLong,y(5))
}
val rdd = spark.sparkContext.textFile("in/gireesh.txt")
  .map( x =>  parse(x) )
val df = rdd.toDF(headers:_*)
df.printSchema()
0 голосов
/ 31 октября 2018

Вы должны указать спарку, какой файл читать и как обрабатывать содержимое во время чтения.

Вот пример:

val numberOfPartitions = 5 // this needs to be optimized based on the size of the file and the available resources (e.g. memory)
val someObjectsRDD: RDD[SomeObject] =
        sparkContext.textFile("/path/to/your/file", numberOfPartitions)
            .mapPartitions( 
                { stringsFromFileIterator =>
                  stringsFromFileIterator.map(stringFromFile => //here process the raw string and return the result)
                }
                , preservesPartitioning = true
              )

В фрагменте кода SomeObject - это объект со структурой данных из вопроса

...