искра не забирает все содержимое индекса - PullRequest
0 голосов
/ 17 июня 2020

у меня есть индекс, который содержит 6130057 r aws ну я сначала установил условие для изменения формата даты, а затем загрузил его в документы в фрейм данных проблема в том, что он занимает только 10 r aws в обоих документах и ​​df я не знаю, почему это мой код

from datetime import datetime as dt


def convert_ts(hit):
    hit = hit['_source']
# change Refill_Bar_End_Date_and_Time
    try:
        ts_from_doc = hit.get('Refill_Bar_End_Date_and_Time', None)

        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Refill_Bar_End_Date_and_Time'] = as_date

    except Exception as e:
        print(e)
        pass
# change Temporary_Service_Class_Expiry_Date
    try:
        ts_from_doc = hit.get('Temporary_Service_Class_Expiry_Date', None)

        if not ts_from_doc:
            raise ValueError('`Temporary_Service_Class_Expiry_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Temporary_Service_Class_Expiry_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Service_Fee_Expiry_Date
    try:
        ts_from_doc = hit.get('Service_Fee_Expiry_Date', None)

        if not ts_from_doc:
            raise ValueError('`Service_Fee_Expiry_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Service_Fee_Expiry_Date'] = as_date

    except Exception as e:
        print(e)
        pass 
# change Supervision_Period_Expiry_Date
    try:
        ts_from_doc = hit.get('Supervision_Period_Expiry_Date', None)

        if not ts_from_doc:
            raise ValueError('`Supervision_Period_Expiry_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Supervision_Period_Expiry_Date'] = as_date

    except Exception as e:
        print(e)
        pass  
# change Last_Service_Fee_Deduction_Date
    try:
        ts_from_doc = hit.get('Last_Service_Fee_Deduction_Date', None)

        if not ts_from_doc:
            raise ValueError('`Last_Service_Fee_Deduction_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Last_Service_Fee_Deduction_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Account_Disconnection_Date    
    try:
        ts_from_doc = hit.get('Account_Disconnection_Date', None)

        if not ts_from_doc:
            raise ValueError('`Account_Disconnection_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Account_Disconnection_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Credit_Clearance_Date   
    try:
        ts_from_doc = hit.get('Credit_Clearance_Date', None)

        if not ts_from_doc:
            raise ValueError('`Credit_Clearance_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Credit_Clearance_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Negative_Balance_Barring_Start_Date
    try:
        ts_from_doc = hit.get('Negative_Balance_Barring_Start_Date', None)

        if not ts_from_doc:
            raise ValueError('`Negative_Balance_Barring_Start_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Negative_Balance_Barring_Start_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Activation_Date
    try:
        ts_from_doc = hit.get('Activation_Date', None)

        if not ts_from_doc:
            raise ValueError('`Activation_Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Activation_Date'] = as_date

    except Exception as e:
        print(e)
        pass
# change Date
    try:
        ts_from_doc = hit.get('Date', None)

        if not ts_from_doc:
            raise ValueError('`Date` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Date'] = as_date

    except Exception as e:
        print(e)
        pass
    return hit


es = Elasticsearch(['http://localhost:9200'], timeout=600)
documents = es.search(index='subscribers-20200101', body={})['hits']['hits']
documents = [convert_ts(doc) for doc in documents]

print(documents) 
import pandas as pd
df = pd.DataFrame(documents)
df.count() ```
the count shows 

Account_Activated_Flag 10 Account_Balance_Units 10 Account_Disconnection_Date 9 Account_Group_Id 10 Account_ID_Master_MSISDN 10 Account_In_Euro_Flag 10 Activation_Date 9 Community_Id_1 10 Community_Id_2 10 Community_Id_3 10 Converged_Flag 10 Credit_Clearance_Date 9 Дата 10 First_Call_Done_Flag 10 First_IVR_Call_Done_Flag 10 GPRS_Block_Status 10 Язык 10 Life_Cycle_Notification_Report 10 Low_Level_Warning_Played_Flag 10 Negative_Balance_Barred_Flag 10 NOT_USED 10 Not_Used_2 10 Original_Service_Class_ID 10 Originating_SMS_Block_Status 10 Originating_Voice_Block_Status 10 Refill_Failed_Counter 10 Service_Class_ID 10 Service_Fee_Expiry_Date 9 Service_Fee_Expiry_Flag 10 Service_Fee_Expiry_Warning_Flag 10 Service_Fee_P eriod_Warning_Played_Flag 10 Service_Offerings 10 Special_Announcement_Played_Flag 10 Subscriber_ID_MSISDN 10 Supervision_Expiry_Flag 10 Supervision_Expiry_Warning_Flag 10 Supervision_Period_Expiry_Date 9 Supervision_Period_Warning_Played_Flag 10 Temporary_Block_Flag 10 Terminating_SMS_Block_Status 10 Terminating_Voice_Block_Status 10 DTYPE: int64 `` `

Ответы [ 2 ]

0 голосов
/ 18 июня 2020

Решено ..

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from datetime import datetime as dt
import pandas as pd

client = Elasticsearch()
s = Search(using=client, index="sub1")
s = s[1:]

def convert_ts(hit):
# change Refill_Bar_End_Date_and_Time
    try:
        ts_from_doc = hit.get('Refill_Bar_End_Date_and_Time', None)

        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Refill_Bar_End_Date_and_Time'] = as_date

    except Exception as e:
        print(e)
        pass

    return hit

documents = ([hit.to_dict() for hit in s.scan()])
documents = [convert_ts(doc) for doc in documents]
df = pd.DataFrame(documents)
0 голосов
/ 17 июня 2020

я пробовал это

client = Elasticsearch () s = Search (using = client, index = "sub1")

df = pd.DataFrame ([hit.to_dict () для попадания в s.scan ()])

, но не принимает во внимание исключение try

...