Когда я тренирую свой набор данных (tf.data.dataset.from_generator(x)
, с x итератор, созданный с помощью
df.rdd.flatMap(func).toLocalIterator()
, где к моему набору данных применяется fun c. Функция repeat () не работает над набором данных Я использую tenorflow версии 1.14.0. Ошибка:
В вашем итераторе набора данных закончились данные; прерывается обучение. Убедитесь, что ваш итератор может генерировать как минимум validation_steps * epochs
пакетов (в этом case, 3 пакета). Возможно, вам понадобится использовать функцию repeat () при построении набора данных.
Какие возможные обходные пути, чтобы заставить работать функцию повторения? Я могу дать более подробную информацию, но я думаю, происходит то, что сама функция не работает в моих условиях.
РЕДАКТИРОВАТЬ :
в моем сценарии 'pipe.py' у меня есть:
import numpy as np
from src.spark_utils.spark_session.sparkSession import arranca_spark
import tensorflow as tf
def f_input(
row: np.array,
future_steps=10,
input_steps=10,
slice=1,
start=0,
end=0
):
input_array = np.array(
[
np.array(row['customer_id_list']),
np.array(row['year_list']),
np.array(row['month_list']),
np.array(row['day_list']),
np.array(row['importe_list']),
np.array(row['cif_id_list']),
np.array(row['cat_id_list'])
]
)
_, n = input_array.shape
if start < 0:
for i in range(n - input_steps + 1 - future_steps + start,
n - input_steps + 1 - future_steps - end):
input_indices = range(i * slice, i * slice + input_steps, 1)
input = input_array[:, input_indices]
yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
else:
for i in range(start, n - input_steps + 1 - future_steps - end):
input_indices = range(i * slice, i * slice + input_steps, 1)
input = input_array[:, input_indices]
yield input[0], input[1], input[2], input[3], input[4], input[5], input[6]
class Input():
def __init__(self, data, input_steps, future_steps, start, end):
self.data =data
self.input_steps =input_steps
self.future_steps =future_steps
self.start =start
self.end =end
self.slice=1
def __call__(self, *args, **kwargs):
func =lambda x: f_input(x)
result = self.data.rdd.flatMap(func)
return result.toLocalIterator()
def f_output(
row: np.array,
future_steps=10,
input_steps=10,
slice=1,
start=0,
end=0
):
output_array = np.array(
[
np.array(row['year_list']),
np.array(row['month_list']),
np.array(row['day_list']),
np.array(row['cif_id_list'])
]
)
_, n = output_array.shape
if start < 0:
for i in range(n - input_steps + 1 - future_steps + start,
n - input_steps + 1 - future_steps - end):
output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
output = np.reshape(output_array[:, output_indices], (4, 10, 1))
yield output[0], output[1], output[2], output[3]
else:
for i in range(start, n - input_steps + 1 - future_steps - end):
output_indices = range(i * slice + future_steps, i * slice + input_steps + future_steps, 1)
output = np.reshape(output_array[:, output_indices], (4, 10, 1))
yield output[0], output[1], output[2], output[3]
class Output():
def __init__(self, data, input_steps, future_steps, start, end):
self.data = data
self.input_steps = input_steps
self.future_steps = future_steps
self.start = start
self.end = end
def __call__(self, *args, **kwargs):
func = lambda x: f_output(x)
result = self.data.rdd.flatMap(func)
return result.toLocalIterator()
class Pipeline(object):
def __init__(self, df, batch_size, input_steps,future_steps):
self.df =df
self.batch_size=batch_size
self.input_steps=input_steps
self.future_steps=future_steps
@property
def train_dataset(self):
features= tf.data.Dataset.from_generator(
generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=0, end=self.input_steps+1)(),
output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,))
)
)
labels = tf.data.Dataset.from_generator(
generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
start=0, end=self.input_steps+1)(),
output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1))
)
)
return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)
@property
def test_dataset(self):
features= tf.data.Dataset.from_generator(
generator=lambda: Input(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps, start=-1, end=0)(),
output_types=(tf.float32, tf.float32, tf.int32, tf.int32, tf.float32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,)),
tf.TensorShape((self.input_steps,))
)
)
labels = tf.data.Dataset.from_generator(
generator=lambda: Output(data=self.df, input_steps=self.input_steps, future_steps=self.future_steps,
start=-1, end=0)(),
output_types=(tf.float32, tf.int32, tf.int32, tf.int32),
output_shapes=(
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1)),
tf.TensorShape((self.input_steps, 1))
)
)
return tf.data.Dataset.zip((features, labels)).repeat().batch(self.batch_size)
Из другого файла я вызываю эти классы, записывая:
from pyspark.sql import Row
names=("customer_id_list", "year_list", "month_list", "day_list", "importe_list", 'cif_id_list', 'cat_id_list')
customer=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
year=[0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5, 0.5, 0.5, 0.5, 0.5, 0.5,0.5, 0.5, 0.5, 0.5, 0.5,0.5 ,0.5 ]
month=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
day=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
importe=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cif_id=[1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]
cat_id=[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]
row_dict={'customer_id_list': customer, 'year_list':year, 'month_list':month, 'day_list':day, 'importe_list':importe, 'cif_id_list':cif_id, 'cat_id_list':cat_id}
df = spark.createDataFrame([Row(**row_dict)])
input=Input(df,input_steps=10, future_steps=10, start=0, end=0)
print(list(input()))
И затем я получаю ошибку:
Исключение в потоке "serve toLocalIterator" org. apache .spark.SparkException: работа прервана из-за s Ошибка tage: Задача 0 на этапе 0.0 провалилась 4 раза, последний раз ...
Есть идеи, почему это может быть?