Прочитать файл JSON-newline с HCFS - PullRequest
1 голос
/ 30 апреля 2019

Как сделать так, чтобы соединитель Flink HCFS читался из Google Cloud Storage с шаблоном, подобным **/*S0.json, где файлы содержат данные JSON с разделителями новой строки?

Файлы содержат содержимое, например

{"message": "Hello world", "timestamp": 1556655155}
{"message": "Goodbye world", "timestamp": 1556655170}

В пользовательском интерфейсе GCS это выглядит следующим образом:

Sample files

Продолжение из Использование файлов GCS на основе шаблона из Flink

1 Ответ

1 голос
/ 02 мая 2019

После чтения файла JSON из HCFS в виде простого текста вы можете отобразить его на JSONObject, используя настраиваемый преобразователь :

import org.apache.flink.api.java.DataSet;
import org.apache.sling.commons.json.JSONObject;

DataSet<JSONObject> jsonInput = 
    input
        .map(record -> record.f1.toString())
        .map(StringToJsonObject::new);

JSONObject код сопоставителя на основе приведенного выше примера:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.sling.commons.json.JSONObject;

public class StringToJsonObject implements MapFunction<String, JSONObject> {
    private static final long serialVersionUID = 4573928723585302447L;

    public JSONObject map(String content) throws Exception {
        return new JSONObject(content);
    }
}

При необходимости вы можете сопоставить String с POJO вместо универсального JSONObject, используя сопоставитель, аналогичный this .

...