Поток данных в BigQuery и системы хранения очень большой - PullRequest
0 голосов
/ 12 июня 2018

Мы создаем конвейер данных в GCP и сталкиваемся с некоторыми проблемами во время тестирования.Наша текущая архитектура основана на AWS, чтобы проверить, что мы отправляем одну копию данных в pubsub из Lambda в режиме реального времени.

  • Перед лицом проблемы задержки из pubsub в BigQuery и хранения через поток данных (Есть ли способ сделать массовуюзагрузка в соответствии с таблицей вместо вставки одного события за раз) У нас есть окно 5 минут, а через 5 минут мы группируем данные по ключу события для целей хранения и записываем все события за эту продолжительность в один файл, можем ли мы сделать что-то подобное в BigQuery иОпределите схему только один раз для одного типа события вместо всех событий.
  • Автоматическое масштабирование работника не происходит, мин 2, и дается максимум 10
  • Все используемые сервисы находятся в asia-northeast1
  • Как правило, мы получаем 3 миллиона записей в день, что будет наилучшей конфигурацией сервера для потока данных.

    package purplle.datapipeline;
    import static java.nio.charset.StandardCharsets.UTF_8;
    
    import java.net.SocketTimeoutException;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
    import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
    import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
    import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.GroupByKey;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
    import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
    import org.apache.beam.sdk.transforms.windowing.Repeatedly;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.ValueInSingleWindow;
    import org.joda.time.Duration;
    import org.json.JSONException;
    import org.json.JSONObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.api.services.bigquery.Bigquery;
    import com.google.api.services.bigquery.model.TableRow;
    import com.google.api.services.bigquery.model.TableSchema;
    import com.google.cloud.storage.Blob;
    import com.google.cloud.storage.BlobId;
    import com.google.cloud.storage.BlobInfo;
    import com.google.cloud.storage.Storage;
    import com.google.cloud.storage.StorageOptions;
    
    import purplle.datapipeline.buisness.EventSchemaBuilder;
    import purplle.datapipeline.buisness.Ordering;
    import purplle.datapipeline.common.Constants;
    import purplle.datapipeline.helpers.Event_ordering;
    import purplle.datapipeline.helpers.Event_schema;
    import purplle.datapipeline.helpers.JSON_helper;
    
    public class StarterPipeline {
    
    
    public interface StarterPipelineOption extends PipelineOptions {
    
        /**
         * Set this required option to specify where to read the input.
         */
        @Description("Path of the file to read from")
        @Default.String(Constants.pubsub_event_pipeline_url)
        String getInputFile();
    
        void setInputFile(String value);
    
    }
    
    @SuppressWarnings("serial")
    static class ParseJsonData_storage extends DoFn<String, KV<String, String>> {
    
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
    
            if (c.element().length() > 0 && JSON_helper.isJSONValid(c.element())) {
                JSONObject event_obj = new JSONObject(c.element());
                if (event_obj.length() > 0 && event_obj.has("event")) {
                    JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
                    if (ob2.length() > 0 && ob2.has("event")) {
                        // Reorder the json object then pass to create pipe saperated string.
                        KV<String, String> event_kv_pair = Event_ordering.order_event_columns(ob2, "storage");
                        if (!event_kv_pair.getKey().isEmpty() && event_kv_pair.getKey().length() > 0) {
                            c.output(event_kv_pair);
                        } else {
                            log = LoggerFactory.getLogger(StarterPipeline.class);
                            log.error("Storage string empty = " + c.element());
                        }
                    } else {
                        log = LoggerFactory.getLogger(StarterPipeline.class);
                        log.error("Storage object error = " + c.element());
                    }
                } else {
                    log = LoggerFactory.getLogger(StarterPipeline.class);
                    log.error("Storage object error = " + c.element());
                }
            } else {
                log = LoggerFactory.getLogger(StarterPipeline.class);
                log.error("Storage empty element = " + c.element());
            }
        }
    }
    
    @SuppressWarnings("serial")
    static class ParseJsonData_bigquery extends DoFn<String, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Event json = " + c.element());
            if (!c.element().isEmpty() && JSON_helper.isJSONValid(c.element())) {
                JSONObject event_obj = new JSONObject(c.element());
                if (event_obj.length() > 0 && event_obj.has("event")) {
                    JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
                    if (ob2.length() > 0 && ob2.has("event")) {
                        TableRow event_row = EventSchemaBuilder.get_event_row(ob2, "bigquery");
                        if (!event_row.isEmpty()) {
                            c.output(event_row);
                        } else {
                            log = LoggerFactory.getLogger(StarterPipeline.class);
                            log.error("Bigquery set event ordering schema error = " + c.element());
                        }
                    } else {
                        log = LoggerFactory.getLogger(StarterPipeline.class);
                        log.error("Bigquery set event ordering object error = " + c.element());
                    }
                } else {
                    log = LoggerFactory.getLogger(StarterPipeline.class);
                    log.error("Bigquery event item object error = " + c.element());
                }
            } else {
                log = LoggerFactory.getLogger(StarterPipeline.class);
                log.error("Bigquery event item error = " + c.element());
            }
        }
    }
    
    @SuppressWarnings("serial")
    static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
        @ProcessElement
        public void processElement(ProcessContext c) throws JSONException {
    
            String event_string = c.element().getValue();
            String event_name = c.element().getKey();
    
            LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Kolkata"));
            int year = now.getYear();
            int month = now.getMonthValue();
            int day = now.getDayOfMonth();
            int hour = now.getHour();
            int minute = now.getMinute();
            int second = now.getSecond();
    
            String storage_file_path = event_name + "/" + year + "/" + month + "/" + day + "/" + hour + "/" + event_name
            + "-" + year + "-" + month + "-" + day + "-" + hour + "-" + minute + "-" + second + ".txt";
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Writing file to location = " + storage_file_path);
    
            // Create your service object
            Storage storage = StorageOptions.getDefaultInstance().getService();
    
            // Upload a blob to the newly created bucket
            BlobId blobId = BlobId.of(Constants.gcp_events_bucket_name, storage_file_path);
            BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
            @SuppressWarnings("unused")
            Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
    
        }
    }
    
    @SuppressWarnings("serial")
    public static class ReadEventJson_storage extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
        @Override
        public PCollection<KV<String, String>> expand(PCollection<String> lines) {
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Storage workflow started");
    
            @SuppressWarnings("unused")
            Boolean tempbool = Event_ordering.setEventsOrdering();
            // Convert lines of text into individual words.
            PCollection<KV<String, String>> words = lines.apply(ParDo.of(new ParseJsonData_storage()));
    
            return words;
        }
    }
    
    @SuppressWarnings("serial")
    public static class ReadEventJson_bigquery extends PTransform<PCollection<String>, PCollection<TableRow>> {
        @Override
        public PCollection<TableRow> expand(PCollection<String> lines) {
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("Bigquery workflow started");
    
            @SuppressWarnings("unused")
            Boolean tempbool = Event_ordering.setEventsOrdering();
    
            log.info("Bigquery get event ordering");
            Ordering events_ordering = Event_ordering.getEventsOrdering();
    
            Event_schema es = new Event_schema();
            es.setEventSchema(events_ordering);
    
            // Convert lines of text into individual words.
            PCollection<TableRow> table_row = lines.apply(ParDo.of(new ParseJsonData_bigquery()));
    
            log.info("Bigquery workflow rows prepared");
    
            return table_row;
        }
    }
    
    /** A SimpleFunction that converts a Word and Count into a printable string. */
    @SuppressWarnings("serial")
    public static class CombineEventStrings extends SimpleFunction<KV<String, Iterable<String>>, KV<String, String>> {
    
        @Override
        public KV<String, String> apply(KV<String, Iterable<String>> input) {
    
            String combined_event = "";
    
            for (String combined_str : input.getValue()) {
                combined_event += combined_str + "\n";
            }
    
            Logger log = LoggerFactory.getLogger(StarterPipeline.class);
            log.info("combined_event = " + combined_event);
    
            KV<String, String> return_kv = KV.of(input.getKey(), combined_event);
    
            return return_kv;
        }
    }
    
    @SuppressWarnings("serial")
    public static void main(String[] args) throws SocketTimeoutException {
    
        Logger log = LoggerFactory.getLogger(StarterPipeline.class);
    
        log.info("Events pipeline job started");
    
        StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(StarterPipelineOption.class);
    
        Pipeline p = Pipeline.create(options);
    
        log.info("Pipeline created");
    
        log.info("Pipeline Started");
    
        PCollection<String> datastream = p.apply("Read Events From Pubsub",
            PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
    
        // PCollection<String> windowed_items =
        // datastream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
    
        // PCollection<String> windowed_items = datastream.apply(
        // Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10))));
    
        PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
            .withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
    
        // Write to storage
        windowed_items.apply("Read and make pipe separated event string", new ReadEventJson_storage())
        .apply("Combine events by keys", GroupByKey.<String, String>create())
        .apply("Combine events strings by event name", MapElements.via(new CombineEventStrings()))
        .apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
    
        // Write into Big Query
        windowed_items.apply("Read and make event table row", new ReadEventJson_bigquery())
    
        .apply("Write_events_to_BQ",
            BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
                public String getDestination(ValueInSingleWindow<TableRow> element) {
                    String destination = EventSchemaBuilder
                    .fetch_destination_based_on_event(element.getValue().get("event").toString());
                    return destination;
                }
    
                @Override
                public TableDestination getTable(String table) {
                    String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table);
                    return new TableDestination(destination, destination);
                }
    
                @Override
                public TableSchema getSchema(String table) {
                    TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table);
                    return table_schema;
                }
            }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
            );
    
        p.run().waitUntilFinish();
    
        log.info("Events Pipeline Job Stopped");
    
    }
    
    }
    

Изображение: Прогресс потока данных 1 | Прогресс потока данных 2 | Описание задания потока данных

1 Ответ

0 голосов
/ 12 июня 2018

Проверьте это сообщение:

  • https://medium.com/teads-engineering/give-meaning-to-100-billion-analytics-events-a-day-d6ba09aa8f44

  • Они обрабатывают 100 миллиардов событий в день с потоком данных.

  • Вместо потоковой передачи они выбрали партию.Обратите внимание, что они выбрали сложный способ пакетирования, в настоящее время Dataflow имеет более простой и быстрый путь.
  • Их описанная задержка «колеблется между 3 мин (минимальная продолжительность фазы Write BQ) и 30 мин».
  • Эта задержка могла бы быть намного короче, если бы они перешли к новому «простому» пакету Dataflow в режим BigQuery.

(коннектор заслуживает более глубокой оценки, но пока что проверьте этот слайдout https://twitter.com/felipehoffa/status/1000024539944902656)

...