Я пытался запустить конвейер apache-beam на облачной платформе Google, используя Cloud Dataflow. Однако кажется, что он не проходит эту строку кода:
p | 'GetFile' >> beam.io.ReadFromText(input_filename)
Он возвращает это предупреждение и остается застрявшим:
ПРЕДУПРЕЖДЕНИЕ: root: Повторить с экспоненциальным откатом: ожиданиев течение 5,14973849643 секунд перед повторной попыткой существует, потому что мы перехватили исключение: SSLHandshakeError: [SSL: CERTIFICATE_VERIFY_FAILED] проверка сертификата не удалась (_ssl.c: 661)
Вот мой код:
import apache_beam as beam
PROJECT='xxxx'
BUCKET='xxxx'
class Split(beam.DoFn):
def process(self, element):
IATA,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE= element.split(",")
return [{
'IATA': IATA,
'AIRPORT': AIRPORT,
'CITY': CITY
}]
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=examplejob2',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=argv)
input_filename = 'gs://{0}/airports.csv'.format(BUCKET)
output_filename = 'gs://{0}/output.txt'.format(BUCKET)
# find all lines that contain the searchTerm
(p
|'GetFile' >> beam.io.textio.ReadFromText(input_filename)
|'Split' >> beam.ParDo(Split())
|'Write' >> beam.io.WriteToText(output_filename)
)
p.run()
if __name__ == '__main__':
run()
Может кто-нибудь помочь решить эту проблему?