WTForms и Flask - проверить размер файла в другом поле - PullRequest
0 голосов
/ 27 марта 2020

У меня есть такая структура в моем приложении Flask:

class UploadForm(FlaskForm):
        username = StringField('Username',
                       validators=[DataRequired(),
                                   Length(min=2, max=20)])
        email = StringField('Email address',
                    validators=[DataRequired(),
                                Email()])
        domain_name = StringField('Project name',
                          validators=[DataRequired()])
        dir_level = IntegerField(
            'Folder level', validators=[DataRequired()])
        file = FileField('Upload textfile', validators=[
                         FileRequired(), FileAllowed(['txt'])])
        scan_now = BooleanField('Scan website now')
        submit = SubmitField('Submit')

Поле scan_now представляет проект в другую, более быструю очередь и используется для приоритетных отчетов ,

Учитывая, что scan_now дороже в вычислительном отношении, я хотел бы ограничить его использование только для файлов .txt, которые имеют менее 500 строк, и вызвать ошибку, если пользователь установит флажок, но имеет загружено более 500 строк.

Как я могу это сделать?

Спасибо!

РЕДАКТИРОВАТЬ:

Поскольку вы запросили файл просмотров моего менеджера, за ним находится лог c:

@manager.route("/manage_url", methods=["GET", "POST"])
@auth.login_required
def manage_url():
    form = UploadForm()
    config = []
    if request.method == "POST" and form.validate_on_submit():
        date = datetime.today()
        date = date.strftime('%d/%m/%Y')

        file = request.files['file']
        filename = file.filename
        email = request.form['email']
        username = request.form['username']
        livello_dir = request.form['livello_dir']
        domain_name = request.form['domain_name']
        try:
            scan_now = request.form['scan_now']
        except:
            scan_now = False
        file.save(os.path.join(uploads_dir, secure_filename(filename)))
        # crea file config da leggere
        config_entry = {
            filename:
            {
                "date": date,
                "email": email,
                "user": username,
                "domain_name": domain_name.replace(" ", "_"),
                "livello_dir": livello_dir
            }
        }
        config.append(config_entry)
        if not os.path.exists('./config.json'):
            with open(os.path.join(ROOT_DIR, 'config.json'), 'w', encoding='utf-8') as f:
                json.dump(config, f, ensure_ascii=False, indent=4)
        else:
            with open(os.path.join(ROOT_DIR, 'config.json'), 'r') as f:
                config = json.load(f)
            config.append(config_entry)
            with open(os.path.join(ROOT_DIR, 'config.json'), 'w', encoding='utf-8') as f:
                json.dump(config, f, ensure_ascii=False, indent=4)

        if scan_now:
            def on_the_fly_scan():
                executor = Executor()
                executor.start_process(file=filename)
            thread = Thread(target=on_the_fly_scan)
            thread.daemon = True
            thread.start()
        return redirect(url_for('manager.manage_url'))

    try:
        files = os.listdir(os.path.join(app.instance_path, 'uploads'))
        paths = [os.path.join(app.instance_path, 'uploads') +
                 "/" + file for file in files]
    except:
        files = None
        paths = None

    domini = []
    contact = ""
    operator_name = ""
    livello_dir = ""

    for file in paths:
        with open(file, 'r', encoding='unicode_escape') as csv_file:
            csv_reader = csv.reader(csv_file, delimiter='\t')
            rows = list(csv_reader)

            dominio_rilevato = tldextract.extract(rows[1][0]).registered_domain
            domini.append(dominio_rilevato)

    with open(os.path.join(ROOT_DIR, 'config.json'), 'r') as f:
        data = json.load(f)
    content_package = list(zip(domini, files))

    return render_template("manage_url.html",
                           content_package=content_package,
                           data=data,
                           config_entries=list(list(d.keys()) for d in data),
                           form=form,
                           contact=contact,
                           operator_name=operator_name,
                           livello_dir=livello_dir)

1 Ответ

1 голос
/ 05 апреля 2020

Вы можете проверить размер файла вручную перед сохранением файла. добавьте эти строки перед file.save(os.path.join(uploads_dir, secure_filename(filename))):

if scan_now:
    if count_rows(filename) > 500:
        raise Exception('too many rows')
        # or whatever you need to do

А вот код count_rows():

def count_rows(filename):
    with open(filename, 'r') as f:
        return sum(1 for row in f)
        # if file is in csv format return sum() - 1
        # because first line is not a record
        return sum(1 for row in f) - 1

...