У меня есть очередь SQS, подключенная к SNS topi c, который получает сообщения от записной книжки, выполняющей задание cron в Databricks. My Da sh -Плотно внешний интерфейс захватывает сообщение по одному из этой очереди SQS, отображает данные (то есть количество положительных, отрицательных и нейтральных твитов), а затем удаляет это конкретное сообщение c.
Я столкнулся с проблемой, заключающейся в том, что всякий раз, когда страница обновляется, активируется функция, которая обрабатывает извлечение сообщения, удаление, а затем отображение указанных данных. Итак, если при обновлении страницы в очереди нет сообщения, круговая диаграмма не может быть отображена, и вместо этого я получаю пустой график x и y.
Я хочу, чтобы функция круговой диаграммы проверяла SQS очередь. Если есть новое сообщение, возьмите его, обновите sh график новыми данными и удалите сообщение из очереди. Если инициировано обновление страницы sh и в очереди нет сообщения, то следует ссылаться на данные из предыдущего сообщения, пока новое сообщение не будет в очереди SQS.
Это может быть достигнуто с помощью модуль копирования?
Это код, который обрабатывает круговую диаграмму:
@app.callback(Output('pie', 'figure'),
[Input('pie-update', 'interval')])
def update_pie(_):
try:
sqs = boto3.client('sqs', aws_access_key_id=accesskeyid, aws_secret_access_key=secretaccesskey,
region_name='us-east-1')
resp = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MaxNumberOfMessages=1
)
try:
messages = resp['Messages']
except KeyError:
print("No messages in the queue!")
messages = []
message = messages[0]
string_body_dictionary = message['Body']
body_dictionary = json.loads(string_body_dictionary)
string_message_dictionary = body_dictionary.get('Message')
sentiment_dictionary = json.loads(string_message_dictionary)
positive = sentiment_dictionary.get('Positive')
negative = sentiment_dictionary.get('Negative')
neutral = sentiment_dictionary.get('Neutral')
entries = [
{'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']}
for msg in resp['Messages']
]
resp = sqs.delete_message_batch(
QueueUrl=queue_url, Entries=entries
)
if len(resp['Successful']) != len(entries):
raise RuntimeError(
f"Failed to delete messages: entries={entries!r} resp={resp!r}"
)
values = [positive, negative, neutral]
labels = ['Positive', 'Negative', 'Mixed']
trace = go.Pie(labels=labels, values=values, title="Distribution of Twitter Sentiement",
hoverinfo='label+percent', textinfo='value',
textfont=dict(size=20, color=app_colors['text']),
marker=dict(
line=dict(color=app_colors['background'], width=2)))
return {'data': [trace], 'layout': go.Layout(title="Distribution of Twitter Sentiement",
colorway=["#5E0DAC", '#FF4F00', '#375CB1', '#FF7400', '#FFF400',
'#FF0056'],
template='plotly_dark',
paper_bgcolor='rgba(0, 0, 0, 0)',
plot_bgcolor='rgba(0, 0, 0, 0)',
margin={'b': 15},
hovermode='x',
autosize=True)}
except Exception as e:
with open('errors.txt', 'a') as f:
f.write(str(e))
f.write('\n')