Прежде всего, я не думаю, что возможно (или, по крайней мере, распространено) обрабатывать «хорошо напечатанный» JSON.Вместо этого данные JSON обычно поступают из JSON , разделенного символом новой строки, поэтому ваш входной файл должен выглядеть следующим образом:
{"testdata":{"siteOwner":"xxx","siteInfo":{"siteID":"id_member","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
{"testdata":{"siteOwner":"yyy","siteInfo":{"siteID":"id_member2","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
После этого с вашим кодом в lines
у вас будет потоклиний ".Далее, вы можете map
этот "поток строк" в "поток JSON", применяя функцию разбора в ParDo
:
static class ParseJsonFn extends DoFn<String, Json> {
@ProcessElement
public void processElement(ProcessContext c) {
// element here is your line, you can whatever you want, parse, print, etc
// this function will be simply applied to all elements in your stream
c.output(parseJson(c.element()))
}
}
PCollection<Json> jsons = lines.apply(ParDo.of(new ParseJsonFn())) // now you have a "stream of JSONs"