tf.data + generator + keras => repeat () не работает, почему? - PullRequest
0 голосов
/ 14 февраля 2020

Когда я тренирую свой набор данных (tf.data.dataset.from_generator(x), с x итератор, созданный с помощью

df.rdd.flatMap(func).toLocalIterator() 

, где к моему набору данных применяется fun c. Функция repeat () не работает над набором данных Я использую tenorflow версии 1.14.0. Ошибка:

В вашем итераторе набора данных закончились данные; прерывается обучение. Убедитесь, что ваш итератор может генерировать как минимум validation_steps * epochs пакетов (в этом case, 3 пакета). Возможно, вам понадобится использовать функцию repeat () при построении набора данных.

Какие возможные обходные пути, чтобы заставить работать функцию повторения? Я могу дать более подробную информацию, но я думаю, происходит то, что сама функция не работает в моих условиях.

РЕДАКТИРОВАТЬ :

в моем сценарии 'pipe.py' у меня есть:

import numpy as np
from src.spark_utils.spark_session.sparkSession import arranca_spark
import tensorflow as tf

def f_input(
        row: np.array,
        future_steps=10,
        input_steps=10,
        slice=1,
        start=0,
        end=0
):
    input_array = np.array(
        [
            np.array(row['customer_id_list']),
            np.array(row['year_list']),
            np.array(row['month_list']),
            np.array(row['day_list']),
            np.array(row['importe_list']),
            np.array(row['cif_id_list']),
            np.array(row['cat_id_list'])
        ]
    )
    _, n = input_array.shape
    if start < 0:
        for i in range(n - input_steps + 1 - future_steps + start,
                       n - input_steps + 1 - future_steps - end):
            input_indices = range(i * slice, i * slice + input_steps, 1)
            input = input_array[:, input_indices]
            yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
    else:
        for i in range(start, n - input_steps + 1 - future_steps - end):
            input_indices = range(i * slice, i * slice + input_steps, 1)
            input = input_array[:, input_indices]
            yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]




class Input():
    def __init__(self, data, input_steps, future_steps, start, end):
        self.data =data
        self.input_steps =input_steps
        self.future_steps =future_steps
        self.start =start
        self.end =end
        self.slice=1

    def __call__(self, *args, **kwargs):
        func =lambda x: f_input(x)
        result = self.data.rdd.flatMap(func)
        return result.toLocalIterator()

def f_output(
        row: np.array,
        future_steps=10,
        input_steps=10,
        slice=1,
        start=0,
        end=0
):
    output_array = np.array(
        [
            np.array(row['year_list']),
            np.array(row['month_list']),
            np.array(row['day_list']),
            np.array(row['cif_id_list'])
        ]
    )
    _, n = output_array.shape
    if start < 0:
        for i in range(n - input_steps + 1 - future_steps + start,
                       n - input_steps + 1 - future_steps - end):
            output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
            output = np.reshape(output_array[:, output_indices], (4, 10, 1))
            yield output[0], output[1], output[2], output[3]
    else:
        for i in range(start, n - input_steps + 1 - future_steps - end):
            output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
            output = np.reshape(output_array[:, output_indices], (4, 10, 1))
            yield output[0], output[1], output[2], output[3]

class Output():
    def __init__(self, data, input_steps, future_steps, start, end):
        self.data = data
        self.input_steps = input_steps
        self.future_steps = future_steps
        self.start = start
        self.end = end

    def __call__(self, *args, **kwargs):
        func = lambda x: f_output(x)
        result = self.data.rdd.flatMap(func)
        return result.toLocalIterator()

class Pipeline(object):
    def __init__(self, df, batch_size, input_steps,future_steps):
        self.df =df
        self.batch_size=batch_size
        self.input_steps=input_steps
        self.future_steps=future_steps

    @property
    def train_dataset(self):
        features= tf.data.Dataset.from_generator(
            generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=0, end=self.input_steps+1)(),
            output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,))
            )
        )
        labels = tf.data.Dataset.from_generator(
            generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
                                         start=0, end=self.input_steps+1)(),
            output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1))
            )
        )
        return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)

    @property
    def test_dataset(self):
        features= tf.data.Dataset.from_generator(
            generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=-1, end=0)(),
            output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,)),
                tf.TensorShape((self.input_steps,))
            )
        )
        labels = tf.data.Dataset.from_generator(
            generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
                                         start=-1, end=0)(),
            output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
            output_shapes=(
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1)),
                tf.TensorShape((self.input_steps, 1))
            )
        )
        return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)

Из другого файла я вызываю эти классы, записывая:

from pyspark.sql import Row
names=("customer_id_list", "year_list", "month_list", "day_list", "importe_list", 'cif_id_list', 'cat_id_list')
customer=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
year=[0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5, 0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5 ]
month=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
day=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
importe=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cif_id=[1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]
cat_id=[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]

row_dict={'customer_id_list': customer, 'year_list':year, 'month_list':month, 'day_list':day, 'importe_list':importe, 'cif_id_list':cif_id, 'cat_id_list':cat_id}
df = spark.createDataFrame([Row(**row_dict)])
input=Input(df,input_steps=10, future_steps=10, start=0, end=0)
print(list(input()))

И затем я получаю ошибку:

Исключение в потоке "serve toLocalIterator" org. apache .spark.SparkException: работа прервана из-за s Ошибка tage: Задача 0 на этапе 0.0 провалилась 4 раза, последний раз ...

Есть идеи, почему это может быть?

1 Ответ

0 голосов
/ 15 февраля 2020

Убедитесь, что функцию генератора можно вызывать несколько раз, или используйте .cache(), чтобы генератор запускался только один раз.

import tensorflow as tf

# Created only once outside of gen()
it = iter([1, 2, 3])

def gen():
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
print(list(dataset.repeat(2).as_numpy_iterator()))  # Prints just [1, 2, 3]

Если вы переместите создание итератора внутри функции генератора , repeat будет работать так, как вы ожидаете:

import tensorflow as tf

def gen():
  # Create iterator every time gen() is called
  it = iter([1, 2, 3])
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
print(list(dataset.repeat(2).as_numpy_iterator()))  # Prints [1, 2, 3, 1, 2, 3]

Другой вариант - использовать кеш, чтобы gen вызывался только один раз:

import tensorflow as tf

# Created only once outside of gen()
it = iter([1, 2, 3])

def gen():
  for element in it:
    yield element

dataset = tf.data.Dataset.from_generator(gen, tf.int32)
# Add a `.cache()` here
print(list(dataset.cache().repeat(2).as_numpy_iterator()))  # Prints [1, 2, 3, 1, 2, 3]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...