У меня есть параметризованный класс входных данных, который выводит загруженную модель на следующий этап конвейера:
class InputData(param.Parameterized):
# placeholders for the incoming parameters
load_sim_widget = param.ClassSelector(class_=LoadAdhSimulation)
att_widget = param.ClassSelector(class_=Attributes)
proj_widget = param.ClassSelector(class_=projections.Projection)
label = param.String(default='Basic Input', precedence=-1)
def __init__(self, **params):
super(InputData, self).__init__(**params)
self.dat_files = []
self.model = None
# output the adh_viz object
@param.output()
def adh_viz(self):
return self.load_data()
@param.depends('load_sim_widget.load_netcdf', 'load_sim_widget.adh_root_filename',
'load_sim_widget.adh_directory', watch=True)
def available_attributes(self):
att_list = []
# if netcdf is selected
if self.load_sim_widget.load_netcdf:
filename = os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '.nc')
try:
# open the xarray dataset (does not load into memory)
ncdf = xr.open_dataset(filename)
except FileNotFoundError:
print('File Not Found: {}'.format(filename))
else:
with ncdf:
# enable and add all variables in the netcdf file
for var in ncdf.data_vars:
att_name = var.lower().replace(" ", "_")
# if the variable has results dimensions
if 'times' in ncdf[var].dims and 'nodes_ids' in ncdf[var].dims:
# add to the list
att_list.append(att_name)
# close the dataset
ncdf.close()
# otherwise read from available *.dat files
else:
# read in extension dicts & standardize extensions
ext_to_widget, widget_to_ext = attribute_dict()
# fix
ext_to_widget['error'] = ext_to_widget.pop('err_hyd')
# get the list of filenames # todo this isn't foolproof e.g. `SanDieg` finds files
glob_items = glob.glob(os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '*.dat'))
# convert filenames list to attribute list
att_list = []
for filename in glob_items:
# suffix.append(get_variable_from_file_name(filename))
att_list.append(ext_to_widget[get_variable_from_file_name(filename)])
# dictionary for naming inconsistencies # todo add complexity for err_con in future
label_to_widget = {'error': 'error_hydro',
'depth-averaged_velocity': 'velocity'}
# loop over the inconsistencies
for key in label_to_widget.keys():
# if this is found in the list
if key in att_list:
# remove the key
att_list.remove(key)
# add back in the corrected value
att_list.append(label_to_widget[key])
# adjust available attribute widgets based on available data
for key, value in self.att_widget.params().items():
# if this attribute wasn't in the list
if key in att_list:
# ensure the widget is enabled
self.att_widget.params()[key].constant = False
# set the widget value
setattr(self.att_widget, key, True)
elif key != 'name':
# ensure the widget is enabled
self.att_widget.params()[key].constant = False
# uncheck the attribute
setattr(self.att_widget, key, False)
# disable attribute
self.att_widget.params()[key].constant = True
def load_data(self):
# if file is netcdf
if self.load_sim_widget.load_netcdf:
self.model, avail_attributes = load_model(self.load_sim_widget.adh_directory,
project_name=self.load_sim_widget.adh_root_filename,
netcdf=self.load_sim_widget.load_netcdf)
# request to load data from *dat files:
else:
# get a list of requested suffix strings
slist = self.att_widget.suffix_list(value=True)
# construct list of filenames
fnames = []
[fnames.append(os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '_' + x + '.dat')) for x in slist]
# read the requested files
self.model, avail_attributes = load_model(self.load_sim_widget.adh_directory,
project_name=self.load_sim_widget.adh_root_filename,
netcdf=False, crs=self.proj_widget.get_crs(), filenames=fnames)
adh_viz = AdhViz()
adh_viz.set_model(self.model)
roams_model = None
return adh_viz
# visualize the page
def panel(self):
self.available_attributes() # for the initial load
return pn.Row(pn.Spacer(height=700), pn.Param(self.load_sim_widget, show_name=False),
pn.Param(self.att_widget, show_name=False), pn.Param(self.proj_widget, show_name=False),
name=self.label)
Тогда следующий этап конвейера должен повредить выходные данные первого и визуализировать их.
class ViewResults(param.Parameterized):
load_sim_widget = param.ClassSelector(class_=LoadAdhSimulation)
att_widget = param.ClassSelector(class_=Attributes)
proj_widget = param.ClassSelector(class_=projections.Projection)
display_range = param.ClassSelector(class_=display_opts.DisplayRangeOpts)
cmap_opts = param.ClassSelector(class_=display_opts.ColormapOpts)
adh_viz = param.ClassSelector(class_=AdhViz)
wmts_widget = param.ClassSelector(class_=display_opts.WMTS)
wireframe = param.ClassSelector(class_=display_opts.ShowElements)
def __init__(self, **params):
for k, p in self.params().items():
if k in params or k == 'name':
continue
params[k] = p.class_()
super(ViewResults, self).__init__(**params)
self.annotator = ESAnnotator(path_type=gv.Path,
crs=ccrs.GOOGLE_MERCATOR,
point_columns=['depth_elevation'],
poly_columns=['name']
)
self.map_pane_ = pn.Spacer(width=0)
self.analysis_pane_ = pn.Spacer(width=0)
self.tool_pane_ = pn.Spacer(width=0)
# self.adh_viz = adh_viz
@property
def tabs(self):
result = pn.panel(self.adh_viz, parameters=['result_label'], show_name=False)
disp_tab = pn.Column(self.wmts_widget,
pn.Pane(self.cmap_opts, show_name=False),
pn.Pane(self.display_range, show_name=False),
pn.Pane(self.wireframe),
result)
return [('Display', disp_tab)]
# what to pass out of this page (for pipeline)
@param.output()
def output(self):
pass
# how to build this page
def panel(self):
return pn.panel(self.run)
@param.depends('adh_viz.result_label', 'wireframe.mesh_elements') # never need watch=True
def run(self):
# create the meshes for the dynamic map
meshes = self.adh_viz.create_animation2()
if self.wireframe.mesh_elements is True:
edgepaths_overlay = self.adh_viz.view_elements() # transparent/ invisible overlay
else:
edgepaths_overlay = hv.Points(data=[]) # existing edgepaths overlay
# Define function which applies colormap and color_range
def apply_opts(obj, colormap, color_range):
return obj.options(cmap=colormap, height=600, width=600).redim.range(**{obj.vdims[0].name: color_range}).options(
clipping_colors={'NaN': 'transparent', 'min': 'transparent'})
if meshes.label == 'scalar':
# Apply the colormap and color range dynamically
dynamic = hv.util.Dynamic(rasterize(meshes), operation=apply_opts,
streams=[Params(self.cmap_opts), Params(self.display_range)]
) * \
self.wmts_widget.view() * \
self.annotator.polys * \
self.annotator.points * \
edgepaths_overlay
elif meshes.label == 'vector':
# Apply the colormap and color range dynamically
dynamic = hv.util.Dynamic(rasterize(vectorfield_to_paths(meshes, color='Magnitude', magnitude='Magnitude',
scale=0.05), aggregator='mean', cmap=process_cmap('viridis'), precompute=True),
operation=apply_opts, streams=[Params(self.cmap_opts),
Params(self.display_range)]) * \
self.wmts_widget.view() * \
self.annotator.polys * \
self.annotator.points * \
edgepaths_overlay
time = pn.panel(self.adh_viz, parameters=['time'], widgets={'time': pn.widgets.DiscretePlayer}, show_name=False)
# time = pn.panel(self.adh_viz, parameters=['time'], show_name=False)
hv_panel = pn.panel(dynamic)
map_pane = pn.Column(hv_panel[0], pn.Row(pn.Spacer(width=100), time))
tool_pane = pn.Tabs(*self.tabs)
return pn.Row(map_pane, pn.Spacer(width=100, height=900), pn.Column(tool_pane, pn.Pane(LOGO, width=300)))
Вот панель инструментов:
def results_dashboard(directory=os.path.join(ROOTDIR, 'data/SanDiego'), rootname='SanDiego'):
# generic single simulation options
load_sim_widget = LoadAdhSimulation(adh_root_filename=rootname, adh_directory=directory)
att_widget = Attributes()
# projection options
proj_widget = projections.Projection(name='')
# generic display options
wmts_widget = display_opts.WMTS(name='')
display_range = display_opts.DisplayRangeOpts()
cmap_opts = display_opts.ColormapOpts()
wireframe = display_opts.ShowElements(name='')
# create stages for pipeline
stages = [
('Input', InputData(load_sim_widget=load_sim_widget, att_widget=att_widget, proj_widget=proj_widget)),
('View', ViewResults(load_sim_widget=load_sim_widget, att_widget=att_widget, proj_widget=proj_widget,
display_range=display_range, cmap_opts=cmap_opts, wmts_widget=wmts_widget,
wireframe=wireframe))
]
# create the pipeline
pipeline = pn.pipeline.Pipeline(stages, debug=True)
# modify button width (not exposed)
pipeline.layout[0][1]._widget_box.width = 100
pipeline.layout[0][2]._widget_box.width = 100
# return a display of the pipline
return pipeline.layout
Однако, когда я запускаю эту панель, второй этап просто отображает пустой объект adh_viz.Я удостоверился, что данные загружены, но кажется, что они не проходят должным образом.
У меня этот код работал правильно несколько дней назад, но я сделал небольшое изменение для загрузки данных РАНЬШЕ в процессе (должно быть не связано с конвейером), и теперь он не работает.
Есть идеи, что мне здесь не хватает?
РЕДАКТИРОВАНИЕ ДОБАВИТЬ: Когда я извлекаю код results_dashboard
из класса и просто запускаю его в блокноте Jupyter.Я получаю сообщение об ошибке в консоли:
Error: Model 'ClearTool' does not exist. This could be due to a widget or a custom model not being registered before first usage.
НО, если я использую то же ядро (не перезагружаюсь), но перезапускаю импорт, мой конвейер будет успешно передавать данные из одногоэтап к следующему.
К сожалению, это не сработает, если я оставлю код results_dashboard
внутри класса.
ИЗМЕНЕНО ВНОВЬ: Он будет работать с классом results_dashboard
, если я добавлю panel, hv и hv.extension ('bokeh') к импортам в записной книжке и перезагрузу их.