Как реализовать разделение тестов поездов без перекрытий в Apache Beam? - PullRequest
0 голосов
/ 13 июня 2019

Я бы хотел обучить тестовому разделению списка текстов со связанными сущностями, чтобы не было перекрывающихся разбиений сущностей.

Обеспечение отсутствия перекрытий является сложной задачей. В настоящее время я достигаю этого с помощью 2 groupby операций.Мне было интересно , как я могу устранить узкие места в памяти, которые эти groupby операции создают, или есть ли более чистый способ сделать весь процесс.

INPUT

ENTITIES TEXT
e1       TextA
e1, e2   TextB
e3       TextC

Я хотел бы иметь выходы:

РАЗДЕЛЕНИЕ ПОЕЗДА

ENTITIES TEXT
e1       TextA
e1, e2   TextB

РАЗДЕЛ ТЕСТА

ENTITIES TEXT
e3       TextC

МОЙ ПОДХОД

Начальные groupby сущности:

e1 [{"text":"TextA", "entities":["e1"]}, {"text":"TextB", "entities":["e1","e2"]}]
e2 [{"text":"TextB", "entities":["e1","e2"]}]
e3 [{"text":"TextC", "entities":["e3"]}]

Далее я создаю ключ сущности-создателя:

e1-e2 {"text":"TextA", "entities":["e1"]}
e1-e2 {"text":"TextB", "entities":["e1","e2"]}
e1-e2 {"text":"TextB", "entities":["e1","e2"]}
e3 {"text":"TextC", "entities":["e3"]}

I затем groupby на этом ключе взаимодействия:

e1-e2 [{"text":"TextA", "entities":["e1"]}, {"text":"TextB", "entities":["e1","e2"]}]
e3 [{"text":"TextC", "entities":["e3"]}]

Моя работа с большим набором данных с 7 миллионами записей не выполняется при операциях groubpy, см. Ошибки ниже.

Затем выполнитеразделить тест на поезд с partition и, наконец, применить distinct, чтобы удалить дубликаты.

ОШИБКИ

К сожалению, мой подход не работает здесь с:

  logger:  "root:shuffle.py:try_split"   
  message:  "Refusing to split <dataflow_worker.shuffle.GroupedShuffleRangeTracker object at 0x7fab8a9d2a58> at b'\x9f|\xe7c\x00\x01': proposed split position is out of range [b'\x95n*A\x00\x01', b'\x9f|\xe7c\x00\x01'). Position of last group processed was b'\x9f|\xe7b\x00\x01'."   
  logger:  "root:shuffle.py:request_dynamic_split"   
  message:  "Refusing to split GroupedShuffleReader <dataflow_worker.shuffle.GroupedShuffleReader object at 0x7fab8a9d2588> at n3znYwAB"   

Ответы [ 2 ]

1 голос
/ 14 июня 2019

Эти сообщения об ошибках касаются динамического перетаскивания Dataflow , не имеющего ничего общего с вашим определением разделения.Они не должны быть фатальными для вашей работы.(Они?)

При этом, я не думаю, что это возможно сделать с одной группой.Например, представьте, что у кого-то было

ENTITIES TEXT
e1       TextA
e1, e2   TextB
e2, e3   TextC
...
eN, eN+1 TextX

. Чтобы обнаружить связь между TextA и TextX, понадобилось бы O (N) группировок.(По сути, вы пытаетесь найти непересекающиеся соединенные компоненты.)

0 голосов
/ 18 июня 2019

Для решения этой проблемы без использования GroupBy:

def combine_entities(values):
  res = set()
  for value in values:
    res.add(value)
  return list(res)

def split_fn(example,train,test):
  """
  3 cases:
    example["entities"] only contains elements that are also in train --> label as train
    example["entities"] only contains elements that are also in test --> label as test
    example["entities"] contains both elements in train and test --> for this never to happen you need an extra constraint (as @robertwb mentioned) on your data.
  """
    return example, "train"

unique_entities = (p
                   | 'Extract' >> beam.Map(lambda x: x["entities"])
                   | 'CombineSet' >> beam.CombineGlobally(combine_entites))

ttrain,ttest = uniq | 'Split' >> beam.Partition(lambda x: hash(x) % 100 < 80, 2)
res  = (p
        | 'Split' >> beam.Map(split_fn,
                       train=beam.pvalue.AsList(ttrain),
                       test=beam.pvalue.AsList(ttest))

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...