Я хочу загрузить медицинский тест в S3, проанализировать его с помощью AWS Textract, извлечь таблицы и отправить на AWS Comprehend Medical. По какой-то причине для запуска требуется около 6-8 секунд.
Вот что я сделал до сих пор и буду благодарен за ваш совет, или, возможно, есть репо с рабочим решением.
import json
import boto3
import sys
def get_rows_columns_map(table_result, blocks_map):
rows = {}
for relationship in table_result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = blocks_map[child_id]
if cell['BlockType'] == 'CELL':
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
if row_index not in rows:
# create new row
rows[row_index] = {}
# get the text value
rows[row_index][col_index] = get_text(cell, blocks_map)
return rows
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
if word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] =='SELECTED':
text += 'X '
return text
def get_table_csv_results(file_name):
with open(file_name, 'rb') as file:
img_test = file.read()
bytes_test = bytearray(img_test)
#print('Image loaded', file_name)
# process using image bytes
# get the results
client = boto3.client('textract')
response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES'])
# Get the text blocks
blocks=response['Blocks']
#pprint(blocks)
blocks_map = {}
table_blocks = []
for block in blocks:
blocks_map[block['Id']] = block
if block['BlockType'] == "TABLE":
table_blocks.append(block)
if len(table_blocks) <= 0:
return "<b> NO Table FOUND </b>"
csv = ''
for index, table in enumerate(table_blocks):
csv += generate_table_csv(table, blocks_map, index +1)
csv += '\n\n'
return csv
def generate_table_csv(table_result, blocks_map, table_index):
rows = get_rows_columns_map(table_result, blocks_map)
table_id = 'Table_' + str(table_index)
# get cells.
csv = 'Table: {0}\n\n'.format(table_id)
for row_index, cols in rows.items():
for col_index, text in cols.items():
csv += '{}'.format(text) + ","
csv += '\n'
csv += '\n\n\n'
return csv
def extract_entities(text):
client = boto3.client(service_name='comprehendmedical')
result = client.detect_entities_v2(Text=text)
return result['Entities']
def main(file_name):
import time
start_time = time.time()
table_csv = get_table_csv_results(file_name)
#print("Entities:")
entities = extract_entities(table_csv)
print("--- %s seconds ---" % (time.time() - start_time))
#output_file = 'output.csv'
# replace content
#with open(output_file, "wt") as fout:
# fout.write(table_csv)
# show the results
#print('CSV OUTPUT FILE: ', output_file)
if __name__ == "__main__":
file_name = sys.argv[1]
main(file_name)
Пример изображения: