Как можно более эффективно вставить данные в Hbase из Kafka, используя Spark Streaming? - PullRequest
0 голосов
/ 25 марта 2019

Я пытаюсь создать Kafka Data Ingestion в HBase через PySpark. В данный момент я пытаюсь вставить данные из Kafka в Hbase, используя HappyBase, но это очень медленно. Я думаю, что массовая загрузка, также использующая HappyBase, не улучшит производительность значительно. Ниже приведен текущий исходный код. Мне нужно добиться максимально возможной производительности. Есть ли у вас какие-либо идеи? Может быть saveAsNewAPIHadoopDataset или другое решение?

def SaveToHBase(rdd):
    print("=====Pull from Stream=====")
    if not rdd.isEmpty():

        print(len(rdd.collect()))
        print(datetime.now())
        for line in rdd.collect():
            ctable.put((line.log_id), { \
            b'log:content': (line.log)})


kds = KafkaUtils.createDirectStream(ssc, topic, k_params, fromOffsets=None)

parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed = parsed.map(lambda x: x[1])
parsed = parsed.map(lambda rec: rec.split(","))
parsed = parsed.filter(lambda x: x != None and len(x) == 2 )
parsed = parsed.map(lambda data:Row(log_id=getValue(str,data[0]), \
        log=getValue(str,data[1])))

parsed.foreachRDD(SaveToHBase)

1 Ответ

0 голосов
/ 26 марта 2019

Обычно хорошим инструментом для передачи данных между Kafka и внешними источниками и целями является Kafka Connect.

Kafka Connect является частью Apache Kafka и обеспечивает масштабируемую потоковую интеграцию, для реализации которой требуется только файл конфигурации.Есть много готовых разъемов, и вы можете написать свой собственный, если хотите.Вы можете запустить Kafka Connect на одном компьютере или кластеризовать для обеспечения устойчивости и пропускной способности.Он работает отдельно от ваших Kafka брокеров.

Если вы хотите обработать данные до посадки их на цель, вы должны сделать это с помощью технологии потоковой обработки (Spark Streaming, Kafka Streams, KSQL и т. Д.) И записать результаты в тему Kafka.Эта тема Kafka затем служит источником для Kafka Connect для записи в целевое хранилище данных (в вашем случае, HBase).

Вы можете найти разъем Kafka Connect для HBase здесь: https://www.confluent.io/connector/kafka-connect-hbase-sink/

...