Потоковая передача данных в таблицы Google BigQuery: SocketTimeoutException, 502 неверных шлюза, 500 внутренних предупреждений об ошибках сервера - PullRequest
0 голосов
/ 26 октября 2019

Мы используем API Camel BigQuery (версия 2.20) для потоковой передачи записей из очереди сообщений на сервере ActiveMQ (версия 5.14.3) в таблицу Google BigQuery.

Мывнедрили и развернули механизм потоковой передачи в виде определения XML-маршрута в Spring Framework , таким образом:

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        ./spring-beans.xsd
        http://camel.apache.org/schema/spring
        ./camel-spring.xsd">

    <!--
    # ==========================================================================
    # ActiveMQ JMS Bean Definition
    # ==========================================================================
    -->
    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
                <property name="userName"  value="MyAmqUserName" />
                <property name="password"  value="MyAmqPassword" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # GoogleBigQueryComponent
    # https://github.com/apache/camel/tree/master/components/camel-google-bigquery
    # ==========================================================================
    -->
    <bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
        <property name="connectionFactory">
            <bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
                <property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
            </bean>
        </property>
    </bean>

    <!--
    # ==========================================================================
    # Main Context Bean Definition
    # ==========================================================================
    -->
    <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >

        <!--
        ========================================================================
        https://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/RedeliveryPolicy.html
        ========================================================================
        -->
        <onException useOriginalMessage="true">
            <exception>com.google.api.client.googleapis.json.GoogleJsonResponseException</exception>
            <exception>java.net.SocketTimeoutException</exception>
            <exception>java.net.ConnectException</exception>
            <redeliveryPolicy
                backOffMultiplier="2"
                logHandled="false"
                logRetryAttempted="true"
                maximumRedeliveries="10"
                maximumRedeliveryDelay="60000"
                redeliveryDelay="1000"
                retriesExhaustedLogLevel ="ERROR"
                retryAttemptedLogLevel="WARN"
                />
        </onException>

        <!--
        # ==================================================================
        # Message Route :
        # 1. consume messages from my AMQ queue
        # 2. write message to Google BigQuery table
        # see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
        # ==================================================================
        -->
        <route>
            <from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&amp;concurrentConsumers=20" />
            <to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
        </route>

    </camelContext>

</beans>

Вышеприведенное, похоже, работает, и мы, похоже, получаем высокоскоростные сообщения /записей (один маршрут обрабатывает более 12 000 сообщений в минуту), но наши журналы показывают большое количество предупреждений SocketTimeoutException , 502 Bad Gateway и 500 Внутренняя ошибка сервера предупреждений:

2019-10-21 15:33:13 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: java.net.SocketTimeoutException: connect timed out

2019-10-24 12:46:53 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 502 Bad Gateway

2019-10-25 12:33:33 | WARN  | DefaultErrorHandler | Failed delivery for (MessageId: XXX on ExchangeId: XXX). On delivery attempt: 0 caught: com.google.api.client.googleapis.json.GoogleJsonResponseException: 500 Internal Server Error

Вопросы

  1. Является ли мое использование объекта onException в целом / синтаксически правильным (за исключением штрафанастройки атрибутов redeliveryPolicy )? Или я что-то пропустил?

  2. Мое первое интересное предупреждающее сообщение гласит: " При попытке доставки: 0 перехвачено: java.net.SocketTimeoutException". Мой файл журнала не имеет "При попытке доставки: 1", При попытке доставки: 2 "и т. Д. Означает ли это, что последующие попытки доставки данного сообщения были успешными?

  3. Что касается попыток потоковой передачи данных в GCP, должен ли я рассматривать «SocketTimeoutException», «500 Internal Server Error» и «502 Bad Gateway» по-разному друг от друга, или используется один и тот же onException + redeliveryPolicy OK?

  4. Есть ли другие способы повысить производительность этого метода Camel / Google API для потоковой передачи данных в GCP? Может ли Camel / Google API поддерживать пакетную обработку сообщений, чтобы уменьшить количествоопераций вставки GCP? Я уже использую двойные потоки с дедупликацией (CamelGoogleBigQueryInsertId).

1 Ответ

0 голосов
/ 29 октября 2019

Отказ от ответственности: у меня нет опыта использования Camel BigQuery API. Мой ответ основан на наблюдении и понимании API BigQuery в целом.

Основываясь на наблюдении, что есть retriesExhaustedLogLevel ="ERROR", если нет журнала ERROR, это, вероятно, означает, что повторная попытка успешна. Повторная попытка по таймауту / 500/502 может быть такой же. По крайней мере, я не знаю, как с ними можно обращаться по-разному. Пакетная обработка определенно поможет, основываясь на публичной документации :

Максимальное количество строк на запрос: 10000 строк на запрос

Рекомендуется максимум 500 строк. Пакетная обработка может повысить производительность и пропускную способность до определенного уровня, но за счет задержки каждого запроса. Слишком мало строк на запрос и издержки каждого запроса могут сделать использование неэффективным. Слишком много строк на запрос, и пропускная способность может упасть.

Рекомендуется максимум 500 строк на запрос, но экспериментирование с репрезентативными данными (схема и размеры данных) поможет вам определить идеальный размер пакета.

...