Ошибка в значительной степени объясняется в трассировке стека: Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
.
withRowMapper()
принимает сериализуемый RowMapper<>
функциональный интерфейс. И он сериализуется и десериализуется Beam, когда это необходимо. Однако в вашей лямбде вы также используете экземпляр Schema
, который вы определяете вне лямбды (замыкание). Поэтому при сериализации лямбда-кода Java также придется сериализовать schema
, потому что он там используется. Но Schema
не сериализуем, поэтому он терпит неудачу.
Есть несколько обходных путей, о которых я могу подумать:
создать схему внутри лямбды:
- в этом случае экземпляр схемы не будет сериализован;
- он будет создаваться каждый раз, когда вызывается лямбда;
сериализует схему (например, в строку Json) в сериализуемый объект за пределами лямбды, а затем десериализует ее внутри лямбды:
- это в основном то же самое, что и выше, но с дополнительным этапом сериализации;
- внутри лямбды, его все равно придется десериализовать при каждом вызове;
найти / написать сериализуемую Schema
реализацию:
- может быть невозможным или трудным для выполнения;
- , вероятно, будет иметь меньше накладных расходов, как в описанных выше подходах, поскольку десериализация произойдет только при создании экземпляра
RowMapper<>
;
Я думаю, что совершенно нормально создать новый экземпляр схемы в лямбде, если это не вызывает проблем.