Когда я захотел загрузить свой несвязанный ввод из Kafka в BigQuery, я наткнулся на вариант .withMethod()
.Используя Method.FILE_LOAD, я также должен указать частоту запуска, а также ненулевое значение numFileShards.
Мои вопросы :
- Что контролирует количество файловых осколков?для чего он явно используется?По моим наблюдениям, это определенно не количество временных файлов, сгенерированных в моем временном местоположении GCS и видимых мне.Но мне интересно, какое число я должен выбрать, чтобы установить здесь?
- В соответствии с исходным кодом, который я цитирую ниже, значение по умолчанию должно быть 1000, но на самом деле это 0, поэтому я получил исключение, когда я не сделалустановите его явно, и когда я установлю его на 1, исключение пропало, но, опять же, я не понимаю, что это такое и для чего я настраиваюсь, lol
/**Control how many file shards are written when using BigQuery load jobs.
Applicable only when also setting {@link/#withTriggeringFrequency}.
The default value is 1000.*/
@Experimental
public Write<T> withNumFileShards(int numFileShards) {
checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards);
return toBuilder().setNumFileShards(numFileShards).build();
}
Можно ли указать размер партии по количеству записей, а не по времени? Продолжительность?
Исключение, которое я получил, когда не установил NumFileShards:
Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:79)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1656)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1602)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1068)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
at come.geotab.bigdata.streaming.mapenrichedgps.MainApplication.main(MainApplication.java:119)