Json Проверка в Apache луче с использованием облачного потока данных Google - PullRequest
0 голосов
/ 12 февраля 2020

Я пытаюсь написать преобразование фильтра, используя Apache beam Java SDK, и мне нужно отфильтровать недействительные Json сообщения.

Если я создаю новый объект Gson для каждой проверки элемента, реализация работает отлично. Однако я хочу избежать создания объектов Gson для каждого элемента (пропускная способность составляет 1 КБ / сек) и проверить json.

Я создаю постоянный объект Gson в начале и инициализирую его в блоке stati c. Этот подход не работает. Не знаете, почему один и тот же объект нельзя использовать для анализа нескольких элементов, поскольку мы не меняем состояние объекта во время обработки?

// Gson object declared as constant
private static final Gson gsonObj=new Gson();

// Initialized GSon object during class loading before main method invocation
static {
    gsonObj = new Gson();
}

....

/*
enum to validate json messages.
 */
enum InputValidation implements SerializableFunction<String, Boolean> {
    VALID {
        @Override
        public Boolean apply(String input) {
            try {
                gsonObj.fromJson(input, Object.class);
                return true;
            } catch(com.google.gson.JsonSyntaxException ex) {
                return false;
            }
        }
    }
}

1 Ответ

0 голосов
/ 13 февраля 2020

Используйте TupleTag для фильтрации записи вместо 'enum InputValidation Implements'. Используйте приведенный ниже код для фильтрации не разбираемой строки json.

Pipeline p = Pipeline.create(options);

TupleTag<String> successParse = new TupleTag<String>();
TupleTag<String> failParse = new TupleTag<String>();

private static final Gson gsonObj=new Gson();

PCollectionTuple = input.apply(ParDo.of(new DoFn<String, String>(){
    @ProcessElement
    public void processElement(ProcessContext c) {
        try {
            gsonObj.fromJson(c.element(), Object.class);
            c.output(successParse,c.element());
        } catch {
            c.output(failParse,c.element());
        }
    }
}).withOutputTags(successParse, TupleTagList.of(failParse)));

Вышеупомянутый фрагмент кода работал в моем случае и является оптимальным решением для фильтрации записей.

...