При сканировании удаленной таблицы hbase с использованием Happybase происходит «Ошибка чтения 0 байтов в Tsocket» - PullRequest
0 голосов
/ 10 февраля 2019

Я пытаюсь сканировать удаленную таблицу HBASE, которая содержит более 1 000 000 000 строк.После сканирования, используя отсканированные строки, попробуйте сделать файл CSV, используя в hdfs.

Я пытался решить почти 3 недели, но я не могу.

Таким образом я сканирую данные и создаю CSV-файл

Сообщение об ошибке

источник / host /anaconda3 / lib / python3.6 / site-packages / thriftpy / transport / socket.py

источник /host/anaconda3/lib/python3.6/site-packages/thriftpy/transport / socket.py

==> Я пробовал протокол Compat, увеличил сетевой буфер памяти tcp, увеличил время ожидания конфигурации, установил размер пакета от 1 до 10000 в параметре сканирования и т. д.

Но работает хорошо почти 30 минут, но вдруг происходит ошибка.Почти в 1/50 раза он хорошо заканчивается (работает без ошибок) Пожалуйста, помогите мне.Я пытался найти причину ошибки.Но я не могу его получить.

Кто-нибудь знает, как это решить?

Это мой код

import sys
print ("--sys.version--")
print (sys.version)
from pyhive import hive
import csv
import os
import happybase
import time
import subprocess
import datetime
import chardet
import logging
logging.basicConfig(level=logging.DEBUG)


csv_list=[]

col=[]
def conn_base():
    print('conn_base starts')


    #SETTING CONNECTION AND CONFIGURATION
    conn=happybase.Connection('13.xxx.xxx.xxx',port=9090)
    table=conn.table(b'TEMP_TABLE')

    #ITERATE DATA AND MAKE CSV FILE PER 100,000 RECORD. AND TAKE A TIME TO SLEEP PER 500000
    tmp=[]
    print('LET\'S MAKE CSV FILE FROM HBASE')
    index=0
    st=0
    global csv_list
    for row_key, data in table.scan():
        try:
           if (st%1000000==0):
                time.sleep(30)
                print("COUNT: ",st)
            if (st%500000==0):

               print("CHANGE CSV _FILE")
                index+=1
                ta_na='TEMP_TABLE'+str(index)+'_version.csv'
                csv_list.append(ta_na)

            st+=1
            with open('/home/host01/csv_dir/TEMP_TABLE/'+csv_list[index-1] ,'a') as f:
                tmp=[]
                tmp.append(data[b'CF1:XXXXX'].decode())
                tmp.append(data[b'CF1:YYYYY'].decode())
                tmp.append(data[b'CF1:DDDDD'].decode())
                tmp.append(data[b'CF1:SSSSS'].decode())
                tmp.append(data[b'CF1:GGGGG'].decode())
                tmp.append(data[b'CF1:HHHHH'].decode())
                tmp.append(data[b'CF1:QQQQQ'].decode())
                tmp.append(data[b'CF1:WWWWWW'].decode())
                tmp.append(data[b'CF1:EEEEE'].decode())
                tmp.append(data[b'CF1:RRRRR'].decode())


                f.write(",".join(tmp)+'\n')
                tmp=[]

        except:
            pass


        #PUT CSV FILES TO HDFS.
        st=1
        for i in range(len(csv_list)):
            try:
                st+=1
                cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE"+str(csv_list[i])+" /user/hive/warehouse/TEMP_TABLE/"
                subprocess.call(cmd,shell=True)
                if (st%50==0):
                    time.sleep(5)


            except:
                pass
        cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE/*.csv  /user/hive/warehouse/TEMP_TABLE/"
        subprocess.call(cmd,shell=True)

        print("PUT ALL CSV FILES TO HDFS")
        conn.close()

1 Ответ

0 голосов
/ 14 февраля 2019

Сначала убедитесь, что сервер HBase Thrift запущен и работает.Вы можете запустить thrift сервер с помощью следующей команды:

hbase-daemon.sh start thrift [ -p 9090 ]

Если вы хотите указать номер порта, используйте -p.Порт по умолчанию 9090

...