Вставьте ответ netcat JSON в приемник Hive с помощью Flume - PullRequest
0 голосов
/ 09 ноября 2019

У меня есть скрипт Python, который использует сокет для отправки данных через netcat

import socket
import time
import csv
import json
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('localhost', 44444))

try:
    with open('csv1.csv') as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:

            if line_count == 0:
                line_count += 1
            else:
                if len(row)==14:
                    j ={"id": row[0], "lon":row[1],"lat":row[2]}

                    line_count += 1
                    clientsocket.sendall(str(j))
                    data = clientsocket.recv(16)
                    time.sleep(250.0 / 1000.0)
            if line_count==15:
                break


finally:
    clientsocket.close()

результат будет выглядеть так: "{'lat': 1, 'lon': 1, 'id': 1} "

Теперь я пытаюсь передать этот вывод в Hive с помощью агента Flume: мой агент выглядит так:

agent.sources = s1
agent.sinks = k1
agent.channels = c1

agent.sources.s1.type = netcat
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 44444

agent.sinks.k1.type = hive
agent.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
agent.sinks.k1.hive.database = default
agent.sinks.k1.hive.table = t

agent.sinks.k1.batchSize = 20
agent.sinks.k1.heartBeatInterval = 0
agent.sinks.k1.serializer = JSON

agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

agent.sources.s1.channels = c1
agent.sinks.k1.channel = c1

моя таблица улья имеет следующую конфигурацию:

create table if not exists t (

    plat string,
    lon string,
    id string

)
clustered by (id) into 5 buckets
stored as orc;

но, к сожалению, когда я запускаю агент, ничего не происходит, flume сохраняет то же самое и не показывает никаких ошибок! Я убедился, что мой скрипт на Python работает и работает нормально, но я не уверен, что здесь происходит.

Я использую дистрибутив Cloudera «docker image».

...