Периодическая партия, смешанная с потоковой передачей - проблема с игрушкой, иллюстрирующая мою проблему - PullRequest
0 голосов
/ 23 апреля 2019

Это пример придуманной игрушки, чтобы попытаться получить помощь по сложным частям моей проблемы. Допустим, у меня есть данные о продажах, которые я получаю из потока Кафки:

...
Period: 5, SalesPersonId: 78, Sale: TRUE, Timestamp: ...,  
Period: 5, SalesPersonId: 43, Sale: FALSE, Timestamp: ...,  
Period: 5, SalesPersonId: 33, Sale: TRUE, Timestamp: ...,  
...

Каждая строка представляет возможность продажи конкретного продавца (в течение определенного периода).

Вот как работают периоды: периоды длятся примерно 2-3 недели. Но периоды не под моим контролем; они назначаются уже по прибытии в поток. При переходе между периодами я все еще мог бы получать данные из последнего периода в течение дня или двух (например, место продажи в Японии может все еще находиться в старом периоде в течение некоторого времени). Кто-то в чате Beam предложил мне использовать окна сеансов для этого случая, если я просто включу период в свой ключ и увеличу промежуток до 2 дней. Похоже, это будет работать.

Мне понятно, как делать такие вещи, как: общее количество возможностей продаж за период, средний уровень продаж на одного продавца за период и т. Д. Например, вызовите PCollection, полученную в результате следующего запроса A :

SELECT
    period,
    salesPersonId,
    COUNT(*) as totalSalesOpportunities,
    COUNT(*) FILTER(WHERE sale) as totalSales,  
    ROUND(COUNT(*) FILTER (WHERE SALE)/COUNT(*),2) as salesRate  
FROM stream  
GROUP BY period, salesPersonId  

Мое требование более сложное, чем это. Допустим, у нашей компании есть гипотеза о том, что продавцы, у которых больше возможностей сбыта в течение периода, будут иметь более высокие темпы продаж. Возможно, общие возможности продаж являются показателем мотивации, или дополнительные возможности дают больше практики, пытаясь продать любой товар, который продается в этот период. Итак, компания хочет эту статистику:

Каков совокупный уровень продаж продавцов, которые находятся на 90-м или выше процентиле возможностей продаж в этот период (на данный момент)? 10-й или нижний процентиль? Т.е.

(TOTAL SALES MADE BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)/(TOTAL SALES OPPORTUNITIES BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)

Конечно, в начале периода 90-й процентиль мог иметь только 3 возможности. Но со временем распределение будет распространяться, и это может быть 40 возможностей. Это нормально, тогда, если эта статистика обновляется, скажем, ежечасно.

Насколько я могу сказать, мне нужно сделать следующее, назовите это B :

Rekey A, apply ApproximateQuantiles, feed it back to filter A, reaggregate A.

Но я не думаю, что это можно сделать постепенно. Так как же выразить «продолжайте делать А постепенно, но делайте В как пакетную операцию каждый час»?

Или есть лучший способ справиться с этой ситуацией с помощью Beam?

1 Ответ

0 голосов
/ 27 апреля 2019

Если я правильно понимаю ваш вопрос, вам нужно 2 типа агрегации для одних и тех же данных.

  1. Инкрементная агрегация для A
  2. Почасовая агрегация для B

Здесь следует отметить, что вы не можете сделать зависимость A от B и B зависимой от A, поскольку это создает цикл в вашем конвейерном графе.

Вы можете начать с ПК1, который содержит исходный поток ввода.

PC2: PC1 -> Do A '(что совпадает с A) -> Do B

PC3: PC1 -> Do A с PC2 в качестве бокового входа.

Вы можете узнать больше о боковом вводе здесь

...