Установка kubectl на работников 'dataflowrunner' - PullRequest
0 голосов
/ 07 января 2019

Я хочу заполнить данные в моем кластере эластичного поиска, работающем на kubernetes. У меня есть данные на Bigquery, и я хочу использовать поток данных (Python) для загрузки данных. Похоже, версия Python Apache-Beam не имеет упругого поиска. Я написал свой собственный модуль записи по записям в потоке данных, но мне нужно перенести порт эластичного поиска из кластера kubernetes. Поэтому мне нужно установить google-cloud-sdk и kubectl, чтобы я мог перенаправить порт, записать свои данные и закрыть его после этого. Кажется, мой код работает нормально, когда я запускаю работу локально, но я не могу установить google-cloud-sdk и kubectl на рабочих.

Мой код, кажется, работает нормально, когда я запускаю работу локально, но я не могу установить google-cloud-sdk и kubectl на рабочих.

это команды, которые вызываются в setup.py в подпроцессе. Открыть

['export', 'CLOUD_SDK_REPO="cloud-sdk-$(lsb_release', '-c', '-s)"'],
['echo', '"deb', 'https://packages.cloud.google.com/apt', '$CLOUD_SDK_REPO', 'main"', '|', 'sudo', 'tee', '-a', '/etc/apt/sources.list.d/google-cloud-sdk.list'],
['sudo', 'rm', '/etc/apt/sources.list.d/partner.list'],
['sudo', 'apt-get', 'install', 'google-cloud-sdk', 'kubectl']

это мой метод для переадресации порта службы эластичного поиска в start_bundle

def _open_connection(self):
    tries = 0
    connected = False
    while tries <= 3 and not connected:
        tries += 1
        try:
            res = requests.get('http://{0}:{1}'.format(self.host, self.port))
            connected = (res.status_code == 200)
        except Exception as e:
            logging.warning(e)
            subprocess.check_call('gcloud container clusters get-credentials {0}'.format(ES_CLUSTER_NAME), shell=True)
            try:
                subprocess.check_call('kubectl version', shell=True)
            except exception as ee:
                logging.warning(ee)
                subprocess.check_call('gcloud components install kubectl', shell=True)
            subprocess.call('kubectl port-forward elasticsearch-0 {0}:{0} & disown'.format(self.port), shell=True)
            time.sleep(3)
    return connected

Я ожидаю, что эти команды (я пробовал варианты) установят необходимые пакеты на каждом работнике, но установка продолжает сбой.

1 Ответ

0 голосов
/ 13 февраля 2019

Я исправил эту проблему, пропустив переадресацию портов и вместо этого внедрив внутренний балансировщик нагрузки в моем порту упора поиска. Таким образом, мои работники потока данных могут напрямую подключаться к внутреннему IP-адресу для записи данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...