Искристая структурированная потоковая передача Kafka Avro на Hbase - PullRequest
0 голосов
/ 26 июня 2018

Как описано в Spark Structured Streaming с интеграцией Hbase , мне интересно записывать данные в HBase в структуре структурированного потокового вещания. Я клонировал код SHC из github, расширил его с помощью поставщика синхронизации и пытался записывать записи в HBase. Buy, я получил ошибку: «Запросы с потоковыми источниками должны выполняться с помощью writeStream.start ()». Мой код Python ниже:

    spark = SparkSession \
        .builder \
        .appName("SparkConsumer") \
        .getOrCreate()

    print 'read Avro schema from file: {}...'.format(schema_name)
    schema = avro.schema.parse(open(schema_name, 'rb').read())
    reader = avro.io.DatumReader(schema)
    print 'the schema is read'

    rows = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', brokers) \
        .option('subscribe', topic) \
        .option('group.id', group_id) \
        .option('maxOffsetsPerTrigger', 1000) \
        .option("startingOffsets", "earliest") \
        .load()
    rows.printSchema()

    schema = StructType([ \
            StructField('consumer_id', StringType(), False), \
            StructField('audit_system_id', StringType(), False), \
            StructField('object_path', StringType(), True), \
            StructField('object_type', StringType(), False), \
            StructField('what_action', StringType(), False), \
            StructField('when', LongType(), False), \
            StructField('where', StringType(), False), \
            StructField('who', StringType(), True), \
            StructField('workstation', StringType(), True) \
        ])

    def decode_avro(msg):
        bytes_reader = io.BytesIO(bytes(msg))
        decoder = avro.io.BinaryDecoder(bytes_reader)
        data = reader.read(decoder)
        return (\
                data['consumer_id'],\
                data['audit_system_id'],\
                data['object_path'],\
                data['object_type'],\
                data['what_action'],\
                data['when'],\
                data['where'],\
                data['who'],\
                data['workstation']\
               )

    udf_decode_avro = udf(decode_avro, schema)

    values = rows.select('value')
    values.printSchema()

    changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
    changes.printSchema()

    change_catalog = '''
    {
        "table":
        {
            "namespace": "uba_input",
            "name": "changes"
        },
        "rowkey": "consumer_id",
        "columns":
        {
            "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
            "audit_system_id": {"cf": "data", "col": "audit_system_id", "type": "string"},
            "object_path": {"cf": "data", "col": "object_path", "type": "string"},
            "object_type": {"cf": "data", "col": "object_type", "type": "string"},
            "what_action": {"cf": "data", "col": "what_action", "type": "string"},
            "when": {"cf": "data", "col": "when", "type": "bigint"},
            "where": {"cf": "data", "col": "where", "type": "string"},
            "who": {"cf": "data", "col": "who", "type": "string"},
            "workstation": {"cf": "data", "col": "workstation", "type": "string"}
        }
    }'''

    query = changes \
        .writeStream \
        .outputMode("append") \
        .format('HBase.HBaseSinkProvider')\
        .option('hbasecat', change_catalog) \
        .option("checkpointLocation", '/tmp/checkpoint') \
        .start()

#        .format('org.apache.spark.sql.execution.datasources.hbase')\
#    query = changes \
#        .writeStream \
#        .format('console') \
#        .start()

    query.awaitTermination()
...