Как правило, мы получаем 3 миллиона записей в день, что будет наилучшей конфигурацией сервера для потока данных.
package purplle.datapipeline;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.SocketTimeoutException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import purplle.datapipeline.buisness.EventSchemaBuilder;
import purplle.datapipeline.buisness.Ordering;
import purplle.datapipeline.common.Constants;
import purplle.datapipeline.helpers.Event_ordering;
import purplle.datapipeline.helpers.Event_schema;
import purplle.datapipeline.helpers.JSON_helper;
public class StarterPipeline {
public interface StarterPipelineOption extends PipelineOptions {
/**
* Set this required option to specify where to read the input.
*/
@Description("Path of the file to read from")
@Default.String(Constants.pubsub_event_pipeline_url)
String getInputFile();
void setInputFile(String value);
}
@SuppressWarnings("serial")
static class ParseJsonData_storage extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) throws JSONException {
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
if (c.element().length() > 0 && JSON_helper.isJSONValid(c.element())) {
JSONObject event_obj = new JSONObject(c.element());
if (event_obj.length() > 0 && event_obj.has("event")) {
JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
if (ob2.length() > 0 && ob2.has("event")) {
// Reorder the json object then pass to create pipe saperated string.
KV<String, String> event_kv_pair = Event_ordering.order_event_columns(ob2, "storage");
if (!event_kv_pair.getKey().isEmpty() && event_kv_pair.getKey().length() > 0) {
c.output(event_kv_pair);
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Storage string empty = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Storage object error = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Storage object error = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Storage empty element = " + c.element());
}
}
}
@SuppressWarnings("serial")
static class ParseJsonData_bigquery extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws JSONException {
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("Event json = " + c.element());
if (!c.element().isEmpty() && JSON_helper.isJSONValid(c.element())) {
JSONObject event_obj = new JSONObject(c.element());
if (event_obj.length() > 0 && event_obj.has("event")) {
JSONObject ob2 = JSON_helper.flatJsonConvertKeyToLower(event_obj);
if (ob2.length() > 0 && ob2.has("event")) {
TableRow event_row = EventSchemaBuilder.get_event_row(ob2, "bigquery");
if (!event_row.isEmpty()) {
c.output(event_row);
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Bigquery set event ordering schema error = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Bigquery set event ordering object error = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Bigquery event item object error = " + c.element());
}
} else {
log = LoggerFactory.getLogger(StarterPipeline.class);
log.error("Bigquery event item error = " + c.element());
}
}
}
@SuppressWarnings("serial")
static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
@ProcessElement
public void processElement(ProcessContext c) throws JSONException {
String event_string = c.element().getValue();
String event_name = c.element().getKey();
LocalDateTime now = LocalDateTime.now(ZoneId.of("Asia/Kolkata"));
int year = now.getYear();
int month = now.getMonthValue();
int day = now.getDayOfMonth();
int hour = now.getHour();
int minute = now.getMinute();
int second = now.getSecond();
String storage_file_path = event_name + "/" + year + "/" + month + "/" + day + "/" + hour + "/" + event_name
+ "-" + year + "-" + month + "-" + day + "-" + hour + "-" + minute + "-" + second + ".txt";
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("Writing file to location = " + storage_file_path);
// Create your service object
Storage storage = StorageOptions.getDefaultInstance().getService();
// Upload a blob to the newly created bucket
BlobId blobId = BlobId.of(Constants.gcp_events_bucket_name, storage_file_path);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
@SuppressWarnings("unused")
Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
}
}
@SuppressWarnings("serial")
public static class ReadEventJson_storage extends PTransform<PCollection<String>, PCollection<KV<String, String>>> {
@Override
public PCollection<KV<String, String>> expand(PCollection<String> lines) {
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("Storage workflow started");
@SuppressWarnings("unused")
Boolean tempbool = Event_ordering.setEventsOrdering();
// Convert lines of text into individual words.
PCollection<KV<String, String>> words = lines.apply(ParDo.of(new ParseJsonData_storage()));
return words;
}
}
@SuppressWarnings("serial")
public static class ReadEventJson_bigquery extends PTransform<PCollection<String>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<String> lines) {
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("Bigquery workflow started");
@SuppressWarnings("unused")
Boolean tempbool = Event_ordering.setEventsOrdering();
log.info("Bigquery get event ordering");
Ordering events_ordering = Event_ordering.getEventsOrdering();
Event_schema es = new Event_schema();
es.setEventSchema(events_ordering);
// Convert lines of text into individual words.
PCollection<TableRow> table_row = lines.apply(ParDo.of(new ParseJsonData_bigquery()));
log.info("Bigquery workflow rows prepared");
return table_row;
}
}
/** A SimpleFunction that converts a Word and Count into a printable string. */
@SuppressWarnings("serial")
public static class CombineEventStrings extends SimpleFunction<KV<String, Iterable<String>>, KV<String, String>> {
@Override
public KV<String, String> apply(KV<String, Iterable<String>> input) {
String combined_event = "";
for (String combined_str : input.getValue()) {
combined_event += combined_str + "\n";
}
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("combined_event = " + combined_event);
KV<String, String> return_kv = KV.of(input.getKey(), combined_event);
return return_kv;
}
}
@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {
Logger log = LoggerFactory.getLogger(StarterPipeline.class);
log.info("Events pipeline job started");
StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StarterPipelineOption.class);
Pipeline p = Pipeline.create(options);
log.info("Pipeline created");
log.info("Pipeline Started");
PCollection<String> datastream = p.apply("Read Events From Pubsub",
PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
// PCollection<String> windowed_items =
// datastream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
// PCollection<String> windowed_items = datastream.apply(
// Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10))));
PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
.withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
// Write to storage
windowed_items.apply("Read and make pipe separated event string", new ReadEventJson_storage())
.apply("Combine events by keys", GroupByKey.<String, String>create())
.apply("Combine events strings by event name", MapElements.via(new CombineEventStrings()))
.apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
// Write into Big Query
windowed_items.apply("Read and make event table row", new ReadEventJson_bigquery())
.apply("Write_events_to_BQ",
BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
public String getDestination(ValueInSingleWindow<TableRow> element) {
String destination = EventSchemaBuilder
.fetch_destination_based_on_event(element.getValue().get("event").toString());
return destination;
}
@Override
public TableDestination getTable(String table) {
String destination = EventSchemaBuilder.fetch_table_name_based_on_event(table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
TableSchema table_schema = EventSchemaBuilder.fetch_table_schema_based_on_event(table);
return table_schema;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
);
p.run().waitUntilFinish();
log.info("Events Pipeline Job Stopped");
}
}