Как получить входные данные из pandas.dataFrame в Apache Beam Pipeline - PullRequest
0 голосов
/ 03 апреля 2019

Я пытаюсь получить входные данные от pandas dataframe к конвейеру лучей apache и записать его в GCS.Без использования потока данных / Apache, я могу записывать данные в GCS.Но теперь поток данных в картинке.

def database_to_gcs(self, type='full'):
    if type == 'full':
        with open(self.tablemetadata, 'r') as fr:
            next(fr)
            self.clear_directory()
            argv = [
                '--project={0}'.format(self.project_name),
                '--job_name=One',
                '--save_main_session',
                '--staging_location=gs://{0}/staging/'.format(self.bucket_name),
                '--temp_location=gs://{0}/staging/'.format(self.bucket_name),
                '--runner=DataflowRunner'
            ]
            p = beam.Pipeline(argv=sys.argv)
            for line in fr:
                table_name, primary_key = line.split(',')
                self.cur.execute("SELECT * FROM " + table_name)
                df = pd.DataFrame(list(self.cur))
                dictionary = df.to_dict('split')
                print(dictionary)
                input_dataframe = df
                output_path = 'gs://{0}/output/{1}/{2}/{3}'.format(self.bucket_name,
                                                                   table_name,
                                                                   str(datetime.now().date()),
                                                                   str(datetime.now()) + "_" + table_name + '.csv')
                (p
                  | 'ReadDataframe' >> beam.io.ReadFromText(input_dataframe)
                  | 'WriteToFile' >> beam.io.Write(output_path)
                  )
                p.run()

1 Ответ

0 голосов
/ 04 апреля 2019

Beam обеспечивает преобразование ParDo , где вы можете написать произвольный код Python, который работает с элементами ввода.Поэтому, возможно, стоит написать DoFn , который берет строки текста, прочитанные из входного файла, и создает кадры данных.Вы можете обработать эти кадры данных в том же ParDo или передать их во вторичный ParDo, где вы выполняете обработку.Я не думаю, что в настоящее время в Beam есть какие-либо служебные преобразования для обработки кадров данных pandas, хотя это обсуждалось несколько раз.

...