Если я правильно понимаю ваш вопрос, вы пытаетесь принять строки JSON и сохранить их в хранилище Cloud Storage, используя Avro в качестве кодера для данных при их перемещении по потоку данных. Из вашего кода нет ничего очевидного для меня. Я сделал это, включая сохранение данных в облачном хранилище и в BigQuery.
Возможно, вы решите использовать более простой и, вероятно, менее подверженный ошибкам подход: определите класс Java для своих данных и используйте аннотации Avro на это чтобы кодер работал правильно. Вот пример:
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Data {
public long nonNullableValue;
@Nullable public long nullableValue;
}
Затем используйте этот тип в ваших DnFn
реализациях, как вы, вероятно, уже сделали. Beam должен уметь правильно перемещать данные между работниками с помощью Avro, даже если поля, помеченные @Nullable
, имеют нулевое значение.