Это пример придуманной игрушки, чтобы попытаться получить помощь по сложным частям моей проблемы. Допустим, у меня есть данные о продажах, которые я получаю из потока Кафки:
...
Period: 5, SalesPersonId: 78, Sale: TRUE, Timestamp: ...,
Period: 5, SalesPersonId: 43, Sale: FALSE, Timestamp: ...,
Period: 5, SalesPersonId: 33, Sale: TRUE, Timestamp: ...,
...
Каждая строка представляет возможность продажи конкретного продавца (в течение определенного периода).
Вот как работают периоды: периоды длятся примерно 2-3 недели. Но периоды не под моим контролем; они назначаются уже по прибытии в поток. При переходе между периодами я все еще мог бы получать данные из последнего периода в течение дня или двух (например, место продажи в Японии может все еще находиться в старом периоде в течение некоторого времени). Кто-то в чате Beam предложил мне использовать окна сеансов для этого случая, если я просто включу период в свой ключ и увеличу промежуток до 2 дней. Похоже, это будет работать.
Мне понятно, как делать такие вещи, как: общее количество возможностей продаж за период, средний уровень продаж на одного продавца за период и т. Д. Например, вызовите PCollection, полученную в результате следующего запроса A :
SELECT
period,
salesPersonId,
COUNT(*) as totalSalesOpportunities,
COUNT(*) FILTER(WHERE sale) as totalSales,
ROUND(COUNT(*) FILTER (WHERE SALE)/COUNT(*),2) as salesRate
FROM stream
GROUP BY period, salesPersonId
Мое требование более сложное, чем это. Допустим, у нашей компании есть гипотеза о том, что продавцы, у которых больше возможностей сбыта в течение периода, будут иметь более высокие темпы продаж. Возможно, общие возможности продаж являются показателем мотивации, или дополнительные возможности дают больше практики, пытаясь продать любой товар, который продается в этот период. Итак, компания хочет эту статистику:
Каков совокупный уровень продаж продавцов, которые находятся на 90-м или выше процентиле возможностей продаж в этот период (на данный момент)? 10-й или нижний процентиль? Т.е.
(TOTAL SALES MADE BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)/(TOTAL SALES OPPORTUNITIES BY PEOPLE WITH 90%+ SALES OPPORTUNITIES)
Конечно, в начале периода 90-й процентиль мог иметь только 3 возможности. Но со временем распределение будет распространяться, и это может быть 40 возможностей. Это нормально, тогда, если эта статистика обновляется, скажем, ежечасно.
Насколько я могу сказать, мне нужно сделать следующее, назовите это B :
Rekey A, apply ApproximateQuantiles, feed it back to filter A, reaggregate A.
Но я не думаю, что это можно сделать постепенно. Так как же выразить «продолжайте делать А постепенно, но делайте В как пакетную операцию каждый час»?
Или есть лучший способ справиться с этой ситуацией с помощью Beam?