Облачный поток данных - как Dataflow выполняет параллелизм? - PullRequest
0 голосов
/ 04 июля 2018

Мой вопрос, за кулисами, для поэлементного Beam DoFn (ParDo), как параллельная рабочая нагрузка Cloud Dataflow? Например, в моем ParDO я отправляю один http-запрос на внешний сервер для одного элемента. И я использую 30 рабочих, каждый имеет 4vCPU.

  1. Значит ли это, что на каждом работнике будет максимум 4 потока?
  2. Означает ли это, что для каждого работника необходимо только 4 http-соединения, или их можно установить, если я поддерживаю их, чтобы добиться максимальной производительности?
  3. Как я могу настроить уровень параллелизма, кроме использования большего количества ядер или большего количества рабочих?
  4. с моими текущими настройками (30 * 4vCPU рабочий), я могу установить около 120 http-соединений на http-сервере. Но и сервер, и работник имеют очень низкое потребление ресурсов. в основном я хочу заставить их работать намного больше, посылая больше запросов в секунду. Что мне делать ...

Фрагмент кода для иллюстрации моей работы:

public class NewCallServerDoFn extends DoFn<PreparedRequest,KV<PreparedRequest,String>> {


private static final Logger Logger = LoggerFactory.getLogger(ProcessReponseDoFn.class);

private static PoolingHttpClientConnectionManager _ConnManager = null;
private static CloseableHttpClient _HttpClient = null;
private static HttpRequestRetryHandler _RetryHandler = null;
private static  String[] _MapServers = MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.server_host").split(",");

@Setup
public void setupHttpClient(){

    Logger.info("Setting up HttpClient");

   //Question: the value of maxConnection below is actually 10, but with 30 worker machines, I can only see 115 TCP connections established on the server side. So this setting doesn't really take effect as I expected.....

    int maxConnection = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.max_connection");
    int timeout = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.timeout");

    _ConnManager = new PoolingHttpClientConnectionManager();

    for (String mapServer : _MapServers) {
        HttpHost serverHost = new HttpHost(mapServer,80);
        _ConnManager.setMaxPerRoute(new HttpRoute(serverHost),maxConnection);
    }

    // config timeout
    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(timeout)
            .setConnectionRequestTimeout(timeout)
            .setSocketTimeout(timeout).build();

    // config retry
    _RetryHandler = new HttpRequestRetryHandler() {

        public boolean retryRequest(
                IOException exception,
                int executionCount,
                HttpContext context) {

            Logger.info(exception.toString());
            Logger.info("try request: " + executionCount);

            if (executionCount >= 5) {
                // Do not retry if over max retry count
                return false;
            }
            if (exception instanceof InterruptedIOException) {
                // Timeout
                return false;
            }
            if (exception instanceof UnknownHostException) {
                // Unknown host
                return false;
            }
            if (exception instanceof ConnectTimeoutException) {
                // Connection refused
                return false;
            }
            if (exception instanceof SSLException) {
                // SSL handshake exception
                return false;
            }
            return true;
        }

    };

    _HttpClient = HttpClients.custom()
                            .setConnectionManager(_ConnManager)
                            .setDefaultRequestConfig(requestConfig)
                            .setRetryHandler(_RetryHandler)
                            .build();

    Logger.info("Setting up HttpClient is done.");

}

@Teardown
public void tearDown(){
    Logger.info("Tearing down HttpClient and Connection Manager.");
    try {
        _HttpClient.close();
        _ConnManager.close();
    }catch (Exception e){
        Logger.warn(e.toString());
    }
    Logger.info("HttpClient and Connection Manager have been teared down.");
}




@ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    String response="{\"my_error\":\"failed to get response from map server with retries\"}";


    String chosenServer = _MapServers[request.getHardwareId() % _MapServers.length];

    String parameter;
    try {
        parameter = URLEncoder.encode(request.getRequest(),"UTF-8");
    } catch (UnsupportedEncodingException e) {
        Logger.error(e.toString());

        return;
    }

    StringBuilder sb = new StringBuilder().append(MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.api_path"))
            .append("?coordinates=")
            .append(parameter);

    HttpGet getRequest = new HttpGet(sb.toString());
    HttpHost host = new HttpHost(chosenServer,80,"http");
    CloseableHttpResponse httpRes;

    try {
        httpRes = _HttpClient.execute(host,getRequest);
        HttpEntity entity = httpRes.getEntity();
        if(entity != null){
            try
            {
                response = EntityUtils.toString(entity);
            }finally{
                EntityUtils.consume(entity);
                httpRes.close();
            }
        }
    }catch(Exception e){
        Logger.warn("failed by get response from map server with retries for " + request.getRequest());
    }

    c.output(KV.of(request, response));

}
}

1 Ответ

0 голосов
/ 04 июля 2018
  1. Да, на основании этого ответа .
  2. Нет, вы можете установить больше соединений. Исходя из моего ответа , вы можете использовать асинхронный http-клиент для получения большего количества одновременных запросов. Поскольку этот ответ также описывает, вам нужно собрать результаты этих асинхронных вызовов и вывести их синхронно в любой @ProcessElement или @FinishBundle.
  3. См. 2.
  4. Поскольку использование вашего ресурса низкое, это указывает на то, что работник проводит большую часть своего времени в ожидании ответа. Я думаю, что с помощью описанного выше подхода вы сможете намного лучше использовать свои ресурсы и достичь той же производительности с гораздо меньшим количеством работников.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...