Управление данными временных рядов в Python: суммирование рядов и агрегирование за период времени - PullRequest
1 голос
/ 17 мая 2019

Я пытаюсь понять, как визуализировать некоторые данные датчика.У меня есть данные, которые собираются каждые 5 минут для нескольких устройств и хранятся в структуре JSON, которая выглядит примерно так (обратите внимание, что я не могу контролировать структуру данных):

[
  {
    "group": { "id": "01234" },
    "measures": {
      "measures": {
        "...device 1 uuid...": {
          "metric.name.here": {
            "mean": [
              ["2019-04-17T14:30:00+00:00", 300, 1],
              ["2019-04-17T14:35:00+00:00", 300, 2],
              ...
            ]
          }
        },
        "...device 2 uuid...": {
          "metric.name.here": {
            "mean": [
              ["2019-04-17T14:30:00+00:00", 300, 0],
              ["2019-04-17T14:35:00+00:00", 300, 1],
              ...
            ]
          }
        }
      }
    }
  }
]

Каждый кортеж формы["2019-04-17T14:30:00+00:00", 300, 0] - это [timestamp, granularity, value].Устройства сгруппированы по идентификатору проекта.Внутри любой группы я хочу взять данные для нескольких устройств и суммировать их вместе.Например, для приведенных выше примеров данных я хочу, чтобы окончательная серия выглядела следующим образом:

["2019-04-17T14:30:00+00:00", 300, 1],
["2019-04-17T14:35:00+00:00", 300, 3],

Ряды не обязательно имеют одинаковую длину.

Наконец, я хочу объединить эти измерения вежечасные выборки.

Я могу получить отдельные серии следующим образом:

with open('data.json') as fd:
  data = pd.read_json(fd)

for i, group in enumerate(data.group):
    project = group['project_id']
    instances = data.measures[i]['measures']
    series_for_group = []
    for instance in instances.keys():
        measures = instances[instance][metric][aggregate]

        # build an index from the timestamps
        index = pd.DatetimeIndex(measure[0] for measure in measures)

        # extract values from the data and link it to the index
        series = pd.Series((measure[2] for measure in measures),
                           index=index)

        series_for_group.append(series)

В нижней части внешнего цикла for у меня есть массив объектов pandas.core.series.Series, представляющих различныенаборы измерений, связанных с текущей группой.Я надеялся, что смогу просто сложить их вместе, как в total = sum(series_for_group), но это приведет к неверным данным.

  1. Я даже правильно читаю в этих данных?Это первый раз, когда я работал с Пандами;Я не уверен, если (а) создать индекс, а затем (б) заполнить данные, правильная процедура здесь.

  2. Как бы я успешно суммировал эти серии вместе?

  3. Как мне пересчитать эти данные с интервалом в 1 час?Если посмотреть на этот вопрос , то это выглядит так, как будто методы .groupby и .agg представляют интерес, но из этого примера не ясно, как задать размер интервала.

Обновление 1

Может быть, я могу использовать concat и groupby?Например:

final = pd.concat(all_series).groupby(level=0).sum()

Ответы [ 3 ]

1 голос
/ 17 мая 2019

То, что я предложил в комментарии, это сделать что-то вроде этого:

result = pd.DataFrame({}, columns=['timestamp', 'granularity', 'value',
                               'project', 'uuid', 'metric', 'agg'])
for i, group in enumerate(data.group):
    project = group['id']
    instances = data.measures[i]['measures']

    series_for_group = []


    for device, measures in instances.items():
        for metric, aggs in measures.items():
            for agg, lst in aggs.items():
                sub_df = pd.DataFrame(lst, columns = ['timestamp', 'granularity', 'value'])
                sub_df['project'] = project
                sub_df['uuid'] = device
                sub_df['metric'] = metric
                sub_df['agg'] = agg

                result = pd.concat((result,sub_df), sort=True)

# parse date:
result['timestamp'] = pd.to_datetime(result['timestamp'])

, что приводит к получению данных, которые выглядят следующим образом

    agg     granularity         metric  project     timestamp           uuid                value
0   mean    300     metric.name.here    01234   2019-04-17 14:30:00     ...device 1 uuid...     1
1   mean    300     metric.name.here    01234   2019-04-17 14:35:00     ...device 1 uuid...     2
0   mean    300     metric.name.here    01234   2019-04-17 14:30:00     ...device 2 uuid...     0
1   mean    300     metric.name.here    01234   2019-04-17 14:35:00     ...device 2 uuid...     1

, тогда вы можете выполнить общую агрегацию

result.resample('H', on='timestamp').sum()

, что дает:

timestamp
2019-04-17 14:00:00    4
Freq: H, Name: value, dtype: int64

или групповое агрегирование:

result.groupby('uuid').resample('H', on='timestamp').value.sum()

, что дает:

uuid                 timestamp          
...device 1 uuid...  2019-04-17 14:00:00    3
...device 2 uuid...  2019-04-17 14:00:00    1
Name: value, dtype: int64
0 голосов
/ 18 мая 2019

Я получил то, что кажется рабочим решением, основанным на коде в моем вопросе. В моей системе для обработки около 85 МБ входных данных требуется около 6 секунд. Для сравнения я отменил код Куанга через 5 минут.

Я не знаю, является ли это правильным способом обработки этих данных, но он дает, по-видимому, правильные результаты. Я заметил, что построение списка серий, как в этом решении, а затем выполнение одного вызова pd.concat более производительно, чем помещение pd.concat в цикл.

#!/usr/bin/python3

import click
import matplotlib.pyplot as plt
import pandas as pd


@click.command()
@click.option('-a', '--aggregate', default='mean')
@click.option('-p', '--projects')
@click.option('-r', '--resample')
@click.option('-o', '--output')
@click.argument('metric')
@click.argument('datafile', type=click.File(mode='rb'))
def plot_metric(aggregate, projects, output, resample, metric, datafile):

    # Read in a list of project id -> project name mappings, then
    # convert it to a dictionary.
    if projects:
        _projects = pd.read_json(projects)
        projects = {_projects.ID[n]: _projects.Name[n].lstrip('_')
                    for n in range(len(_projects))}
    else:
        projects = {}

    data = pd.read_json(datafile)
    df = pd.DataFrame()

    for i, group in enumerate(data.group):
        project = group['project_id']
        project = projects.get(project, project)

        devices = data.measures[i]['measures']
        all_series = []
        for device, measures in devices.items():
            samples = measures[metric][aggregate]
            index = pd.DatetimeIndex(sample[0] for sample in samples)
            series = pd.Series((sample[2] for sample in samples),
                               index=index)
            all_series.append(series)

        # concatenate all the measurements for this project, then
        # group them using the timestamp and sum the values.
        final = pd.concat(all_series).groupby(level=0).sum()

        # resample the data if requested
        if resample:
            final = final.resample(resample).sum()

        # add series to dataframe
        df[project] = final

    fig, ax = plt.subplots()
    df.plot(ax=ax, figsize=(11, 8.5))
    ax.legend(frameon=False, loc='upper right', ncol=3)

    if output:
        plt.savefig(output)
        plt.close()
    else:
        plt.show()


if __name__ == '__main__':
    plot_metric()
0 голосов
/ 17 мая 2019

Чтобы построить фрейм данных (df) из рядов различной длины (например, s1, s2, s3), вы можете попробовать:

df=pd.concat([s1,s2,s3], ignore_index=True, axis=1).fillna('')

Как только вы построите свой фрейм данных:

  1. Убедитесь, что все даты хранятся как объекты отметок времени:

    DF [ 'Date'] = pd.to_datetime (ДФ [ 'Дата'])

Затем добавьте еще один столбец, чтобы извлечь часы из столбца даты:

df['Hour']=df['Date'].dt.hour

А затем сгруппируйте по часам и суммируйте значения:

df.groupby('Hour').sum()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...