Прежде чем указывать на решение, я должен указать, что вы не должны запускать pycurl в главном потоке, поскольку он блокирует, вместо этого вы должны выполнить его в другом потоке и отправить информацию в основной поток, чтобы ее можно было отобразить .
Суть в том, что при расчете процента используется следующая формула:
progress = 100 * (bytes_downloaded + size_of_resume_file) / (total_bytes + size_of_resume_file)
С учетом вышеизложенного решение имеет вид:
import os
import certifi
import pycurl
from PyQt5 import QtCore, QtWidgets
class Downloader(QtCore.QObject):
started = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
progressChanged = QtCore.pyqtSignal(int)
error = QtCore.pyqtSignal(int, str)
bytesChanged = QtCore.pyqtSignal(int, int)
@QtCore.pyqtSlot(str, str)
def download(self, url, save_location):
pass
class PycURLDownloader(Downloader):
def __init__(self, parent=None):
super().__init__(parent)
self._resume_size = 0
self._c = pycurl.Curl()
self._flag_stop = 0
def download(self, url, save_location):
self._flag_stop = 0
exist_path = os.path.exists(save_location)
self.started.emit()
with open(save_location, "ab" if exist_path else "wb") as file_id:
self._c.setopt(pycurl.CAINFO, certifi.where())
self._c.setopt(pycurl.URL, url)
self._c.setopt(pycurl.MAX_RECV_SPEED_LARGE, 1024)
if exist_path:
self._c.setopt(pycurl.RESUME_FROM, os.path.getsize(save_location))
self._resume_size = os.path.getsize(save_location)
else:
self._resume_size = 0
self._c.setopt(pycurl.WRITEDATA, file_id)
self._c.setopt(pycurl.NOPROGRESS, 0)
self._c.setopt(pycurl.PROGRESSFUNCTION, self._progress_callaback)
try:
self._c.perform()
except pycurl.error as e:
self.error.emit(*e.args)
else:
self.finished.emit()
self._c.close()
@QtCore.pyqtSlot()
def stop(self):
self._flag_stop = 1
def _progress_callaback(self, total, existing, totalfrac, fracmb):
frac = 0
if existing > 0 and total > 0:
frac = int(
100 * (existing + self._resume_size) / (total + self._resume_size)
)
self.bytesChanged.emit(existing, total)
self.progressChanged.emit(frac)
if QtCore.QThread.currentThread().isInterruptionRequested():
return 1
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.url_lineedit = QtWidgets.QLineEdit()
self.save_location_lineedit = QtWidgets.QLineEdit()
browse_button = QtWidgets.QPushButton(self.tr("Browse"))
self.download_progressbar = QtWidgets.QProgressBar(minimum=0, maximum=100)
self.download_log_browser = QtWidgets.QTextBrowser()
self.start_download_button = QtWidgets.QPushButton(self.tr("Start Download"))
widget = QtWidgets.QWidget()
widget.setContentsMargins(0, 0, 0, 0)
hlay = QtWidgets.QHBoxLayout(widget)
hlay.addWidget(self.save_location_lineedit)
hlay.addWidget(browse_button)
hlay.setContentsMargins(0, 0, 0, 0)
flay = QtWidgets.QFormLayout()
flay.addRow("URL", self.url_lineedit)
flay.addRow("Save as", widget)
flay.addRow("", self.download_progressbar)
flay.addRow("", QtWidgets.QLabel(self.tr("Packets output in Bytes")))
flay.addRow("", self.download_log_browser)
hlay2 = QtWidgets.QHBoxLayout()
hlay2.addStretch()
hlay2.addWidget(self.start_download_button)
hlay2.addStretch()
vlay = QtWidgets.QVBoxLayout(self)
vlay.addLayout(flay)
vlay.addLayout(hlay2)
self.start_download_button.clicked.connect(self.start_download)
browse_button.clicked.connect(self.select_save_location)
self._thread = QtCore.QThread(self)
self._thread.start()
self._downloader = PycURLDownloader()
self._downloader.moveToThread(self._thread)
self._downloader.progressChanged.connect(self.download_progressbar.setValue)
self._downloader.bytesChanged.connect(self.on_bytesChanged)
self._downloader.started.connect(self.on_started)
self._downloader.finished.connect(self.on_finished)
self.url_lineedit.setText("http://techslides.com/demos/sample-videos/small.mp4")
@QtCore.pyqtSlot()
def start_download(self):
url = self.url_lineedit.text()
save_location = self.save_location_lineedit.text()
if not url:
QtWidgets.QMessageBox.information(self, "Error", "Please put the links")
return
elif not save_location:
QtWidgets.QMessageBox.information(self, "Error", "Please put the location")
return
wrapper = partial(self._downloader.download, url, save_location)
QtCore.QTimer.singleShot(0, wrapper)
@QtCore.pyqtSlot()
def select_save_location(self):
filename, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Select")
if filename:
self.save_location_lineedit.setText(filename)
@QtCore.pyqtSlot(int, str)
def on_error(self, t, msg):
QtWidgets.QMessageBox.information(self, "Error", msg)
@QtCore.pyqtSlot(int, int)
def on_bytesChanged(self, existing, total):
self.download_log_browser.append(
"Downloaded %d/%d %d%%" % (existing, total, 100 * existing / total)
)
@QtCore.pyqtSlot()
def on_started(self):
self.start_download_button.setEnabled(False)
@QtCore.pyqtSlot()
def on_finished(self):
self.start_download_button.setEnabled(True)
def closeEvent(self, event):
self._thread.requestInterruption()
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
from functools import partial
import sys
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.resize(640, 480)
w.show()
ret = app.exec_()
sys.exit(ret)