У меня есть скрипт Python, который использует сокет для отправки данных через netcat
import socket
import time
import csv
import json
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
clientsocket.connect(('localhost', 44444))
try:
with open('csv1.csv') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
if len(row)==14:
j ={"id": row[0], "lon":row[1],"lat":row[2]}
line_count += 1
clientsocket.sendall(str(j))
data = clientsocket.recv(16)
time.sleep(250.0 / 1000.0)
if line_count==15:
break
finally:
clientsocket.close()
результат будет выглядеть так: "{'lat': 1, 'lon': 1, 'id': 1} "
Теперь я пытаюсь передать этот вывод в Hive с помощью агента Flume: мой агент выглядит так:
agent.sources = s1
agent.sinks = k1
agent.channels = c1
agent.sources.s1.type = netcat
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 44444
agent.sinks.k1.type = hive
agent.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
agent.sinks.k1.hive.database = default
agent.sinks.k1.hive.table = t
agent.sinks.k1.batchSize = 20
agent.sinks.k1.heartBeatInterval = 0
agent.sinks.k1.serializer = JSON
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
agent.sources.s1.channels = c1
agent.sinks.k1.channel = c1
моя таблица улья имеет следующую конфигурацию:
create table if not exists t (
plat string,
lon string,
id string
)
clustered by (id) into 5 buckets
stored as orc;
но, к сожалению, когда я запускаю агент, ничего не происходит, flume сохраняет то же самое и не показывает никаких ошибок! Я убедился, что мой скрипт на Python работает и работает нормально, но я не уверен, что здесь происходит.
Я использую дистрибутив Cloudera «docker image».