Отображение потокового DataFrame в Jupyter из подписки Redis - PullRequest
0 голосов
/ 29 октября 2018

У меня есть прайс-канал Redis 'price-updates' в redis, для которого издатель устанавливает обновления для цены акций. Я хочу отобразить потоковую сетку, которая продолжает добавлять обновления цен по мере их появления в конце сетки.

Пока что я создал нерабочую версию того, что я хочу сделать.

from streamz import Stream
from streamz.dataframe import DataFrame

source = Stream()
data = []

def handler(message):
    json_data = json.loads(message['data'])
    df = pd.DataFrame.from_dict([json_data]).set_index('sym')

source.map(handler).sink(data.append)
sdf = DataFrame(source)

## Run this in a different thread 
p.subscribe('price-updates')
while True:
    message = p.get_message()
    if message:
        source.emit(message)
        time.sleep(0.001)
## end of thread block


#displayStreamingDataGrid(sdf)

Буду признателен, если кто-то с большим опытом работы с sdf поможет мне в этом.

Ответы [ 2 ]

0 голосов
/ 27 ноября 2018

Вы можете использовать https://github.com/AaronWatters/jp_proxy_widget для создания HTML таблица, которую вы можете обновить на месте без видимой очистки таблицы между обновлениями.

Я поместил пример тетради здесь: https://github.com/AaronWatters/jp_doodle/blob/master/notebooks/misc/In%20place%20html%20table%20update%20demo.ipynb

Хитрость в том, чтобы создать виджет, который отображает таблицу и прикрепляет операция обновления, которая изменяет таблицу:

# Create a proxy widget with a table update method
import jp_proxy_widget

def updateable_table(headers, rows):
    w = jp_proxy_widget.JSProxyWidget()
    w.js_init("""
    # injected javascript for the widget:

    element.update_table = function(headers, rows) {
        element.empty();
        var table = $("<table border style='text-align:center'/>");
        table.appendTo(element);
        var header_row = $("<tr/>");
        for (var i=0; i<headers.length; i++) {
            $("<th style='text-align:center'>" + headers[i] + "</th>")
            .width(50)
            .appendTo(header_row);
        }
        header_row.appendTo(table);
        for (var j=0; j<rows.length; j++) {
            var table_row = $("<tr/>").appendTo(table);
            var data_row = rows[j];
            for (var i=0; i<data_row.length; i++) {
                $("<td>" + data_row[i] + "</td>").appendTo(table_row);
            }
        }
    }

    element.update_table(headers, rows);
    """, headers=headers, rows=rows)
    return w

# show the widget
w = updateable_table(headers, rows)
w

Код для обновления виджета

# Update the widget 20 times
import time
count = -20
for i in range(21):
    time.sleep(1)
    rows = [rows[-1]] + rows[:-1]  # rotate the rows
    rows[0][0] = count  # change the upper left entry.
    count += 1
    w.element.update_table(headers, rows)

обновляет таблицу на месте без видимого стирания. Пример Блокнот, связанный выше, также показывает, как сделать то же самое, используя pandas dataframe.

0 голосов
/ 31 октября 2018

Я смог сделать это без потоков. Однако я не получаю потоковую сетку, которая обновляется на месте, а скорее отображаю и очищаю вывод, что очень раздражает.

Вот виджет вывода в одной ячейке юпитера

import ipywidgets as iw
from IPython.display import display 

o = iw.Output()
def output_to_widget(df, output_widget): 
    output_widget.clear_output()
    with output_widget: 
        display(df)
o

Вот код для подписки на redis и обработки сообщения

import redis, json, time

r = redis.StrictRedis(host = HOST, password = PASS, port = PORT, db = DB)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe('QUOTES')

mdf = pd.DataFrame()
while True:
    message = p.get_message()
    if message:
        json_msg = json.loads(message['data'])
        df = pd.DataFrame([json_msg]).set_index('sym')
        mdf = mdf.append(df)
        output_to_widget(mdf, o)
    time.sleep(0.001)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...