Извлечь значение столбца CSV для добавления атрибута - PullRequest
1 голос
/ 04 июня 2019

Я работаю с некоторыми CSV в NiFi, и мой конвейер производит несколько дубликатов. В результате я хотел бы использовать процессор DetectDuplicate, но для этого мне нужно иметь некоторый атрибут, с которым он может сравниваться для обнаружения дублирования. У меня есть процессор ExtractText, и я хотел бы использовать регулярное выражение для получения значения в столбце SHA1_BASE16.

Я попробовал следующую строку регулярных выражений (предложенную другом, я не совсем понимаю) в CSV ниже, но выделил неправильные поля и некоторые посторонние вещи. Как я могу заставить его захватывать только значение SHA1_BASE16?

RegEx

^[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,[^,]*,[^,]*,[^,]*,([^,]*),[^,]*,[^,]*,([^,]*)\S*

CSV

"USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4"
"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"

Фактический объем производства

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    66-79   "HASH_SOURCE"
Group 2.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"
Group 3.    274-291 "123.123.123.123"

Ожидаемый результат

Match 1
Full match  0-291   "USER_JID","CREATED_AT","UPLOAD_TIME","SHA1_BASE32","SHA1_BASE16","HASH_SOURCE","MESSAGE_TYPE","IPV4...
Group 1.    209-251 "FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0"

Ответы [ 2 ]

1 голос
/ 04 июня 2019

Я предполагаю, что у нас будет две строки по 40 символов, которые мы будем использовать в качестве первой левой границы, и применим это простое выражение:

.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+

, где наш желаемый результатнаходится в этой группе захвата:

("[A-Z0-9]{40}")

, которую мы можем использовать $1.

Демо

Тест

const regex = /.+"[A-Z0-9]{40}",("[A-Z0-9]{40}").+/gm;
const str = `"dreynolds","1932/04/01 20:23:35 UTC","2016/12/28 20:23:11 UTC","72F20077A79A0D4D90F4C0669FB6EA4BC5953293","FB1D928B83DEBCD2B2E53DF4C8D4C2AF99EB81E0","HOLLYWOOD","TWITTER","123.123.123.123"`;
let m;

while ((m = regex.exec(str)) !== null) {
    // This is necessary to avoid infinite loops with zero-width matches
    if (m.index === regex.lastIndex) {
        regex.lastIndex++;
    }
    
    // The result can be accessed through the `m`-variable.
    m.forEach((match, groupIndex) => {
        console.log(`Found match, group ${groupIndex}: ${match}`);
    });
}

RegEx Circuit

jex.im визуализирует регулярные выражения:

enter image description here

0 голосов
/ 04 июня 2019

В качестве альтернативы, вы можете использовать PartitionRecord для разделения записей на потоковые файлы, где каждая запись имеет одинаковое значение для поля раздела (в данном случае SHA1_BASE16). Он также установит атрибут в файле потока для значения раздела, который затем можно использовать в DetectDuplicate.

Для полей с большим количеством элементов (полей, в которых не будет много дубликатов), вы можете увидеть снижение производительности, поскольку в каждом файле исходящего потока может быть одна строка, поэтому для большого количества строк вы получите большое количество потоковых файлов. Сказав это, вместо DetectDuplicate downstream вы могли бы вместо RouteOnAttribute где record.count> 1. Это устраняет необходимость в DistributedMapCache.

Существует также вклад в добавление процессора DetectDuplicateRecord , который, я думаю, вам действительно нужен здесь. Этот вклад находится на рассмотрении, и я надеюсь, что он войдет в следующий выпуск NiFi.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...