Как рассчитать z балл и обновить входной файл для дальнейших входов в Apache Beam? - PullRequest
0 голосов
/ 20 декабря 2018

Я пытаюсь построить конвейер, используя Apache Beam, который принимает CSV-файл, содержащий 10 тыс. Заказов, содержащих размер заказа.Есть второй вход от PubSub, то есть «новый порядок».Мне нужно вычислить оценку Z для нового заказа, записать в большой запрос, а также обновить файл csv с помощью «нового заказа», и следующий живой заказ поступает в предыдущем (новом) порядке, включенном для вычисления счета Z.

вычислите среднее значение, стандартное отклонение строк 10k, а также получите «новый порядок» из pubsub.Рассчитайте оценку Z нового заказа, запишите оценку z в большой запрос.Дождитесь следующего заказа и вычислите среднее значение, снова «std dev», включая «предыдущий ввод» и оценку Z следующего заказа.Но я не могу рассчитать новое среднее значение, стандартное отклонение для предыдущего ввода.

from __future__ import absolute_import

import argparse
import logging

import numpy as np
    import apache_beam as beam
    from google.cloud import storage 
    from apache_beam.pvalue import AsList
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import StandardOptions

    spikey_schema = 'order_id:STRING,zscore:FLOAT'


    class CollectOrderSize(beam.DoFn):
        def process(self, element):
            order_id,order_size = element.split(",")
            print(order_size)
            order_size = float(order_size)
            return [order_size]

    class CollectOrderTuple(beam.DoFn):
        def process(self, element):
            order_id,order_size = element.split(",")
            order_size = float(order_size)
            order_id = str(order_id)
            return [(order_id,order_size)]


    class StandardDeviation(beam.CombineFn):
        def create_accumulator(self):
            return (0.0, 0.0, 0) # x, x^2, count

        def add_input(self, sum_count, input):
            (sum, sumsq, count) = sum_count
            print(sum)
            return sum + input, sumsq + input*input, count + 1

        def merge_accumulators(self, accumulators):
            sums, sumsqs, counts = zip(*accumulators)
            print(sums)
            return sum(sums), sum(sumsqs), sum(counts)

        def extract_output(self, sum_count):
            (sum, sumsq, count) = sum_count
            if count:
                mean = sum / count
                variance = (sumsq / count) - mean*mean
                stddev = np.sqrt(variance) if variance > 0 else 0
                print('stddev:',stddev)
                stats = {
                    'mean': mean,
                    'variance': variance,
                    'stddev': stddev,
                    'count': count
                }
                print(stats)
                return stats


    def calculate_zscore(cur_order,past_stats):

        past_stats = past_stats[0]
        order_id,cur_order_size = cur_order
        z_score = (cur_order_size - past_stats['mean'] ) / past_stats['stddev']
        print(z_score)
        print(order_id)
        return {
            'order_id': order_id,
            'zscore': z_score,
        }



    def update_csv(current_order):

        STORAGE_BUCKET = 'spikey-asia-storage'

        storage_client = storage.Client()
        bucket = storage_client.get_bucket(STORAGE_BUCKET)

        blob = bucket.blob('inputs/orders.csv')

        if blob.exists():
            blob.download_to_filename('/tmp/orders.csv')

        with open('/tmp/orders.csv','a+') as file:
            file.write(str(current_order+'\n'))
            file.close()

        blob.upload_from_filename('/tmp/orders.csv')
        print('file Uploaded!')




    def run(argv=None):
      parser = argparse.ArgumentParser()

      parser.add_argument(
          '--input_file',
          required=True,
          help=('Input file in the form of gcs bucket url'))

      parser.add_argument(
          '--input_topic',
          required=True,
          help=('Input PubSub topic of the form '))


      known_args, pipeline_args = parser.parse_known_args(argv)


      pipeline_options = PipelineOptions(pipeline_args)
      p = beam.Pipeline(options=pipeline_options)

      pipeline_options.view_as(SetupOptions).save_main_session = True

      pipeline_options.view_as(StandardOptions).streaming = True

      live_sales =( 
                   p |'Read From PubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic)
                                                   .with_output_types(bytes)
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
             )

      historical_sales = (
                p | beam.io.ReadFromText(known_args.input_file, skip_header_lines=1) 
            )


      combined_sales = (
                 (historical_sales,live_sales) | 'MergedPColl' >> beam.Flatten()

                )



      order_stats = (
                combined_sales | 'Split & Collect order_size' >> (beam.ParDo(CollectOrderSize()))
                          | 'Calcuate Mean & Standard Deviation' >> beam.CombineGlobally(StandardDeviation())
                )

      current_asp = (       
                    live_sales   
                                  |'Get Current Order size' >> (beam.ParDo(CollectOrderTuple())) 
                                  | 'Calcuate Z-score' >> beam.Map(calculate_zscore,AsList(order_stats))
                                  | 'Write to Bigquery' >> beam.io.WriteToBigQuery('spikey-gcp:spikey_orders.order_zscores', schema=spikey_schema)
                 )

      write_to_bucket = (
                        live_sales |'Update the file ' >> beam.Map(update_csv)  
                       )

      result = p.run()
      result.wait_until_finish()

    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...