Выходные данные Apache Nifi MergeContent противоречивы? - PullRequest
0 голосов
/ 05 июня 2018

Довольно новый для использования нифи.Нужна помощь с дизайном.Я пытаюсь создать простой поток с фиктивными CSV-файлами (на данный момент) в директории HDFS и добавить некоторые текстовые данные к каждой записи в каждом потоковом файле.

Входящие файлы:

dummy1.csv
dummy2.csv
dummy3.csv

содержимое:

"Eldon Base for stackable storage shelf, platinum",Muhammed MacIntyre,3,-213.25,38.94,35,Nunavut,Storage & Organization,0.8
"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",BarryFrench,293,457.81,208.16,68.02,Nunavut,Appliances,0.58
"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",Barry French,293,46.71,8.69,2.99,Nunavut,Binders and Binder Accessories,0.39
...

Желаемый вывод:

d17a3259-0718-4c7b-bee8-924266aebcc7,Mon Jun 04 16:36:56 EDT 2018,Fellowes Recycled Storage Drawers,Allen Rosenblatt,11137,395.12,111.03,8.64,Northwest Territories,Storage & Organization,0.78
25f17667-9216-4f1d-b69c-23403cd13464,Mon Jun 04 16:36:56 EDT 2018,Satellite Sectional Post Binders,Barry Weirich,11202,79.59,43.41,2.99,Northwest Territories,Binders and Binder Accessories,0.39
ce0b569f-5d93-4a54-b55e-09c18705f973,Mon Jun 04 16:36:56 EDT 2018,Deflect-o DuraMat Antistatic Studded Beveled Mat for Medium Pile Carpeting,Doug Bickford,11456,399.37,105.34,24.49,Northwest Territories,Office Furnishings,0.61

поток enter image description here splitText- enter image description here ReplaceText- enter image description here MergeContent-

enter image description here enter image description here

(это может быть плохой способ достичь того, что я пытаюсь получить, но я где-то виделэтот uuid лучше всего подходит для генерации уникального идентификатора сеанса. Поэтому подумайте о том, чтобы извлечь каждую строку из входящих данных в потоковый файл и сгенерировать uuid)

Но как-то, как вы можете видеть, порядок данных портится.Первые 3 строки не совпадают в выводе.Однако используемые мной тестовые данные (50000 записей), похоже, содержат данные в какой-то другой строке.Множественные тесты обычно показывают изменения порядка данных после 2001-й строки.

И да, я выполнил поиск похожих проблем здесь и попытался использовать метод дефрагментации при слиянии, но это не сработало.Я был бы признателен, если кто-то может объяснить, что здесь происходит и как я могу получить данные таким же образом с уникальным идентификатором session_id, отметкой времени для каждой записи.Есть ли какой-то параметр, который мне нужно изменить или изменить, чтобы получить правильный вывод?Я открыт для предложений, если есть лучший способ.

1 Ответ

0 голосов
/ 05 июня 2018

Прежде всего, спасибо за такой продуманный и подробный ответ.Я думаю, вы избавились от многих сомнений в том, как работает процессор!

Порядок слияния гарантирован только в режиме дефрагментации, поскольку он упорядочит файлы потоков в соответствии с их индексом фрагмента.Я не уверен, почему это не сработает, но если бы вы могли создать шаблон потока с примерами данных, которые показали проблему, было бы полезно отладить.

Я постараюсьповторить этот метод, используя чистый шаблон снова.Может быть проблема с каким-то параметром, и пишущий HDFS не может записать.

Я не уверен, что целью вашего потока является просто объединить исходный CSV, который был разделен, или объединить несколько разных CSV.Режим дефрагментации будет только повторно объединять исходный CSV, поэтому, если ListHDFS выбрал 10 CSV, после разделения и повторного объединения у вас снова должно быть 10 CSV.

Да, это именно то, что мне нужно,Разделите и объедините данные в соответствующие им файлы.У меня нет особой (пока) необходимости снова соединять выходные данные.

Подход разделения CSV до 1 строки на файл потока для управления каждой строкой является обычным подходом, однако он не будет работатьочень хорошо, если у вас много больших файлов CSV.Более эффективный подход состоит в том, чтобы пытаться манипулировать данными на месте, не разбивая .Обычно это можно сделать с помощью процессоров, ориентированных на записи.

  1. Я использовал этот подход исключительно инстинктивно и не осознавал, что это распространенный метод.Иногда файл данных может быть очень большим, что означает более миллиона записей в одном файле.Разве это не проблема с вводом / выводом в кластере?потому что это будет означать, что каждая запись = один файл потока = один уникальный UUID.Какое удобное количество потоковых файлов может обработать nifi?(я знаю, что это зависит от конфигурации кластера и постарается получить больше информации о кластере от администратора hdp)
  2. Что вы предлагаете, "пытаясь манипулировать данными на месте без разделения"?Можете ли вы привести пример или шаблон или процессор для использования?

В этом случае вам нужно будет определить схему для вашего CSV, которая включает все столбцы в ваших данных, плюс идентификатор сеансаи отметка времени.Затем, используя процессор UpdateRecord, вы будете использовать выражения пути записи, такие как / session_id = $ {UUID ()} и / timestamp = $ {now ()}.Это будет выполнять построчную передачу контента, обновлять каждую запись и записывать ее обратно, сохраняя все как один файл потока.

Это выглядит многообещающе.Можете ли вы поделиться простым шаблоном, извлекающим файлы из hdfs> processing> write hdfs files, но без разделения?

Я не хочу делиться шаблоном из-за ограничений.Но позвольте мне посмотреть, смогу ли я создать общий шаблон, и я поделюсь

Спасибо за вашу мудрость!:)

...