Я работаю над приложением Django, которое использует планшетный сканер. Чтобы оставаться на связи со сканером, используется RPy C. (Подключение к сканеру занимает много времени, поэтому я хочу повторно использовать подключение несколько раз.)
Подключение к сканеру и поддержание работоспособности службы работает нормально. Я могу позвонить в сканер, и он немедленно реагирует. Но я не могу сохранить файл изображения PIL, см. Ошибку ниже. Без RPy C удалось сохранить файл изображения PIL.
Traceback (most recent call last):
File "/usr/lib/python3.6/code.py", line 91, in runcode
exec(code, self.locals)
File "<console>", line 1, in <module>
File "/home/user/Projects/django-project/project/appname/models.py", line 55, in scan_preview
image_file.save(file_path)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 166, in __getattr__
return syncreq(self, consts.HANDLE_GETATTR, name)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 76, in syncreq
return conn.sync_request(handler, proxy, *args)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/protocol.py", line 469, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/async_.py", line 102, in value
raise self._obj
AttributeError: cannot access 'save'
Похоже, RPy C пытается сохранить изображение, я не понимаю, почему. Закрытие соединения перед сохранением изображения приводит к ошибке о закрытии потока.
Я хочу, чтобы RPy C прекратил беспокоить меня после возвращения файла изображения PIL. I хочу, чтобы он возвращал изображение PIL без потока мамбо-джамбо. (Я не знаю правильных терминов, новых для RPy C.)
scanner_service.py
import rpyc
import sane
class Scanner(object):
"""
The Scanner class is used to interact with flatbed scanners.
"""
def __init__(self):
sane.init()
self.device_name = None
self.error_message = None
self.device = self.get_device()
def get_device(self):
"""
Return the first detected scanner and set the name.
@return: sane.SaneDev
"""
devices = sane.get_devices()
print('Available devices:', devices)
# Empty list means no scanner is connect or the connected scanner is
# already being used
if not devices:
self.error_message = 'Scanner disconnect or already being used.'
return None
# open the first scanner you see
try:
device = sane.open(devices[0][0])
except Exception as e:
self.error_message = e
print(e)
return None
brand = devices[0][1]
model = devices[0][2]
self.device_name = "{brand}_{model}".format(
brand=brand,
model=model
)
print("Device name:", self.device_name)
# set to color scanning mode, this is not always the default mode
device.mode = 'color'
return device
def print_options(self):
"""Print the device's options. Useful for development."""
for opt in self.device.get_options():
print(opt, '\n')
def scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
self.device.resolution = dpi
self.device.start()
image = self.device.snap()
return image
def scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
The ROI is selected by giving two x/y coordinates, one for the top left
and one for the bottom right. This creates a square that the scanner
will scan.
To create a ROI measure the amount of millimeters starting at the top
left corner of the document.
http://www.sane-project.org/html/doc014.html
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
self.device.resolution = dpi
# top left x/y
self.device.tl_x = top_left_x
self.device.tl_y = top_left_y
# bottom right x/y
self.device.br_x = bottom_right_x
self.device.br_y = bottom_right_y
self.device.start()
image = self.device.snap()
return image
class ScannerService(rpyc.Service):
"""
Run the scanner as a service.
Multiple clients can connect to the Scanner Service to use the flatbed
scanner without having to wait for sane to setup a connection.
"""
def __init__(self):
self.scanner = Scanner()
def on_connect(self, conn):
pass
def on_disconnect(self, conn):
pass
def exposed_get_device_name(self):
"""
Return the name of the scanner
@return: string
"""
return self.scanner.device_name
def exposed_scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
image = self.scanner.scan_image(dpi)
return image
def exposed_scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
image = self.scanner.scan_roi(
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi
)
return image
if __name__ == "__main__":
from rpyc.utils.server import ThreadedServer
t = ThreadedServer(ScannerService(), port=18861)
t.start()
utils.py
class ScannerServiceConnection(object):
"""Connect to and interact with the scanner service."""
def __init__(self):
self.connection = self.get_connection()
@staticmethod
def get_connection():
"""
Connect to the scanner service
@return: ScannerService object
"""
connection = rpyc.connect("localhost", 18861)
return connection
def get_device_name(self):
"""
Return the name of the scanner
@return: string
"""
return self.connection.root.exposed_get_device_name()
def scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
image = self.connection.exposed_scan_image(dpi)
return image
def scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
image = self.connection.root.exposed_scan_roi(
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi
)
return image