Как я могу разделить двухфазный большой файл Json на NiFi - PullRequest
1 голос
/ 23 октября 2019

Я использую NiFi для восстановления и помещаю в Kafka много данных. Я на самом деле на стадии тестирования, и я использую большой файл Json.

Мой файл Json содержит 500K записей.

На самом деле, у меня есть процессор getFile для получения файла иa SplitJson.

Выражение JsonPath: $..posts.*

Эта конфигурация работает с небольшим файлом, который записывает 50K записей, но для больших файлов она падает.

Мой файл JsonПохоже, что с 500K регистраций в "posts":[]

{ 
    "meta":{ 
        "requestid":"request1000",
        "http_code":200,
        "network":"twitter",
        "query_type":"realtime",
        "limit":10,
        "page":0
    },
    "posts":[ 
        { 
            "network":"twitter",
            "posted":"posted1",
            "postid":"id1",
            "text":"text1",
            "lang":"lang1",
            "type":"type1",
            "sentiment":"sentiment1",
            "url":"url1"
        },
        { 
            "network":"twitter",
            "posted":"posted2",
            "postid":"id2",
            "text":"text2",
            "lang":"lang2",
            "type":"type2",
            "sentiment":"sentiment2",
            "url":"url2"
        }
    ]
}

Я прочитал некоторые документы по этой проблеме, но, темы для текстового файла, и ораторы предлагают связать много SplitText для постепенного разделения файла. С такой жесткой структурой, как мой Json, я не понимаю, как я могу это сделать.

Я ищу решение, которое бы она хорошо выполнила при записи 500K.

Ответы [ 3 ]

1 голос
/ 23 октября 2019

К сожалению, я думаю, что этот случай (большой массив внутри записи) сейчас не очень хорошо обрабатывается ...

SplitJson требует, чтобы весь файл потока был считан в память, и он также не имеетисходящий размер сплита. Так что это не сработает.

Обычно SplitRecord будет правильным решением, но в настоящее время есть два устройства чтения записей JSON - JsonTreeReader и JsonPathReader. Обе эти записи потока, но проблема здесь в том, что существует только одна огромная запись, поэтому каждая из них будет считывать весь документ в память.

Было предпринято несколько попыток решить эту конкретную проблему, но, к сожалению, ни однаиз них сделали это в выпуске.

В этом PR, который сейчас закрыт, добавлен новый считыватель записей JSON, который может передавать записи, начиная с пути JSON, который в вашем случае может быть $ .posts:

https://github.com/apache/nifi/pull/3222

С этим считывателем вы даже не делите разделение, вы просто отправляете файл потока в PublishKafkaRecord_2_0 (или любую подходящую версию PublishKafkaRecord), и он будет читать каждую запись и публиковать ее в Kafka.

Существует также открытый PR для нового процессора SelectJson, который выглядит так, как будто он может помочь:

https://github.com/apache/nifi/pull/3455

1 голос
/ 23 октября 2019

Попробуйте использовать SplitRecord процессор в NiFi.

Определить запись Контроллер устройства чтения / записи служб в процессоре SplitRecord.

Затем настройте Records Per Splitна 1 и используйте отношение Splits для дальнейшей обработки.

(OR)

, если вы хотите сгладить и разветвить запись, используйте ForkRecord процессор в NiFi.

Для использования см. по этой ссылке.

0 голосов
/ 30 октября 2019

У меня была та же проблема с json, и я использовал для записи потокового парсера

Использование ExeuteGroovyScript процессора со следующим кодом.

Он должен разбивать большой входящий файл на маленькие:

@Grab(group='acme.groovy', module='acmejson', version='20191029')
import groovyx.acme.json.AcmeJsonParser
import groovyx.acme.json.AcmeJsonOutput

def ff=session.get()
if(!ff)return

def objMeta=null
def count=0


ff.read().withReader("UTF-8"){reader->
    new AcmeJsonParser().withFilter{
        onValue('$.meta'){ 
            //just remember it to use later
            objMeta=it 
        }
        onValue('$.posts.[*]'){objPost->
            def ffOut = ff.clone(false) //clone without content
            ffOut.post_index=count      //add attribite with index
            //write small json
            ffOut.write("UTF-8"){writer->
                AcmeJsonOutput.writeJson([meta:objMeta, post:objPost], writer, true)
            }
            REL_SUCCESS << ffOut        //transfer to success
            count++
        }
    }.parse(reader)
}
ff.remove()

пример выходного файла:

{
  "meta": {
    "requestid": "request1000",
    "http_code": 200,
    "network": "twitter",
    "query_type": "realtime",
    "limit": 10,
    "page": 0
  },
  "post": {
    "network": "twitter",
    "posted": "posted11",
    "postid": "id11",
    "text": "text11",
    "lang": "lang11",
    "type": "type11",
    "sentiment": "sentiment11",
    "url": "url11"
  }
}
...