Я пытаюсь построить входной конвейер для активного обучающего приложения. Сценарий, который я написал, загружает данные, дополняет и затем кэширует их.
Я кэширую весь набор данных один раз, а затем разделяю его на немаркированный и маркированный набор данных, откуда я передаю выборки из одного набора данных. к другой в каждой итерации.
После нескольких итераций обучение становится чрезвычайно медленным. Время тренировки для каждого образца увеличивается примерно в четыре раза после примерно 20 итераций, что недопустимо для моей модели. По тензорной доске и ресурсам подержанного компьютера я вижу, что тренировка в основном простаивает, а графический процессор даже не используется, но использование процессора и Sysmem действительно велико.
Кто-нибудь сталкивался с такой же проблемой и может сказать мне как это исправить? Или какая-нибудь подробная информация о том, как работает кэширование тензорного потока?
Спасибо!
Редактировать: Я разделил набор данных с помощью .take () и .skip () и перенес сэмплы с помощью .concatenate ( )
Редактировать: эти три метода я использую, хотя они упрощены. indexlist - список образцов, которые должны быть перенесены из немеченого в помеченный набор данных
def input_pipeline()
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(__convert_tf_records, num_parallel_calls=1)
dataset_bounding_boxes = dataset.map(get_lung_bb,num_parallel_calls=1)
if not is_training:
datasets['bounding_box'] = dataset_bounding_boxes
if params.repeat > 1 and is_training:
dataset = dataset.repeat(count=params.repeat)
dataset_bounding_boxes = dataset_bounding_boxes.repeat(count=params.repeat)
if is_training:
dataset = dataset.map(data_aug_rot, num_parallel_calls=1)
dataset = dataset.map(data_aug_zoom, num_parallel_calls=1)
dataset = dataset.map(data_aug_elastic_deform, num_parallel_calls=1)
dataset = tf.data.Dataset.zip((dataset, dataset_bounding_boxes))
dataset = dataset.map(get_random_patches_fn, num_parallel_calls=1)
dataset = dataset.unbatch()
if params.b_shuffle and is_training:
dataset = dataset.shuffle(buffer_size=params.batch_size*5)
dataset = dataset.batch(params.batch_size)
if params.b_cache and is_training:
cache_path = os.path.join(experiment_dir, "cache")
if not os.path.exists(cache_path):
os.makedirs(cache_path)
dataset = dataset.cache(cache_path + '/')
dataset = dataset.prefetch(buffer_size=params.batch_size)
datasets['patches'] = dataset
return datasets
def get_trainingdatasets():
initial_dataset = input_pipeline()
dataset_all=initial_dataset['patches']
dataset_all=dataset_all.unbatch()
for element in dataset_all:
dataset_size+=1
dataset_unlabeled=dataset_all.skip( int(dataset_size*args.ds_labeled_perc) ).batch(params.batch_size)
dataset_labeled=dataset_all.take( int(dataset_size* args.ds_labeled_perc) )
dataset_labeled= dataset_labeled.shuffle(buffer_size=500, reshuffle_each_iteration=True).batch(params.batch_size)
return dataset_labeled, dataset_unlabeled
def transfer_samples():
dataset_unlabeled = dataset_unlabeled.unbatch()
dataset_labeled = dataset_labeled.unbatch()
for idx in index_list:
element_add=dataset_unlabeled.skip(idx).take(1)
dataset_add_to_labeled = dataset_add_to_labeled.concatenate(element_add)
dataset_labeled = dataset_labeled.concatenate(dataset_add_to_labeled)
dataset_labeled= dataset_labeled.shuffle(buffer_size=params.batch_size*5)
dataset_labeled = dataset_labeled.batch(params.batch_size).prefetch(buffer_size=params.batch_size)
dataset_unlabeled = dataset_unlabeled.batch(params.batch_size).prefetch(buffer_size=params.batch_size)
return dataset_labeled, dataset_unlabeled