Я пытаюсь добавить oauth2 (сервер) в моем приложении фляги, и у меня есть некоторые проблемы с конечной точкой /oauth/token
с client_secret_post
.
Мое приложение отправляет в него следующую форму:
client_id=XXX
client_secret=YYY
grant_type=client_credentials
token_endpoint_auth_method=client_secret_post
redirect_uri=http://localhost:8081/oauth-callback
И я получаю в логах:
DEBUG:authlib.oauth2.rfc6749.authenticate_client:Authenticate None via "client_secret_basic" failed
127.0.0.1 - - [23/Jun/2019 18:05:26] "POST /oauth/token HTTP/1.0" 401 -
Кажется, что token_endpoint_auth_method
ничего не меняет и всегда возвращает {"error": "invalid_client"}
.
Я пытался добавить TOKEN_ENDPOINT_AUTH_METHODS = ['client_secret_post']
к своему class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
без эффектов (также ни один из регистраторов ничего не печатает).
Что я там пропустил?
Я реализовал некоторые вещи в моем приложении, более или менее как пример фляги oauth2, вот некоторые выдержки:
app.py:
from app_oauth import config_oauth
...
def create_app(...):
...
config_oauth(app)
...
app_oauth.py:
from authlib.flask.oauth2 import AuthorizationServer, ResourceProtector
from authlib.flask.oauth2.sqla import (
create_query_client_func,
create_save_token_func,
create_revocation_endpoint,
create_bearer_token_validator,
)
from authlib.oauth2.rfc6749 import grants
from werkzeug.security import gen_salt
from models import db, User
from models import OAuth2Client, OAuth2AuthorizationCode, OAuth2Token
from flask import current_app
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
def create_authorization_code(self, client, user, request):
current_app.logger.debug("create auth code")
code = gen_salt(48)
item = OAuth2AuthorizationCode(
code=code,
client_id=client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=user.get_user_id(),
)
db.session.add(item)
db.session.commit()
return code
def parse_authorization_code(self, code, client):
current_app.logger.debug("parse auth code")
item = OAuth2AuthorizationCode.query.filter_by(
code=code, client_id=client.client_id).first()
if item and not item.is_expired():
return item
def delete_authorization_code(self, authorization_code):
current_app.logger.debug("delete auth code")
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(self, authorization_code):
current_app.logger.debug("auth user")
return User.query.get(authorization_code.user_id)
class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant):
def authenticate_user(self, username, password):
current_app.logger.debug("password grant auth user")
user = User.query.filter_by(name=username).first()
if user.check_password(password):
return user
class RefreshTokenGrant(grants.RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token):
current_app.logger.debug("refresh token grant")
token = OAuth2Token.query.filter_by(refresh_token=refresh_token).first()
if token and not token.revoked and not token.is_refresh_token_expired():
return token
def authenticate_user(self, credential):
current_app.logger.debug("auth user grant user")
return User.query.get(credential.user_id)
query_client = create_query_client_func(db.session, OAuth2Client)
save_token = create_save_token_func(db.session, OAuth2Token)
authorization = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)
require_oauth = ResourceProtector()
def config_oauth(app):
authorization.init_app(app)
# support all grants
authorization.register_grant(grants.ImplicitGrant)
authorization.register_grant(grants.ClientCredentialsGrant)
authorization.register_grant(AuthorizationCodeGrant)
authorization.register_grant(PasswordGrant)
authorization.register_grant(RefreshTokenGrant)
# support revocation
revocation_cls = create_revocation_endpoint(db.session, OAuth2Token)
authorization.register_endpoint(revocation_cls)
# protect resource
bearer_cls = create_bearer_token_validator(db.session, OAuth2Token)
require_oauth.register_token_validator(bearer_cls())
и мой план:
from app_oauth import authorization
...
@bp_api_v1_auth.route("/oauth/token", methods=["POST"])
def oauth_token():
return authorization.create_token_response()
Редактировать: после копания похоже, что он обрабатывается ClientCredentialsGrant, который по умолчанию выполняет только client_secret_basic
, затем я добавил:
class ClientCredentialsGrant(grants.ClientCredentialsGrant):
TOKEN_ENDPOINT_AUTH_METHODS = [
'client_secret_basic', 'client_secret_post'
]
...
authorization.register_grant(ClientCredentialsGrant)
Который теперь проверяет, но отвечает {"error": "unauthorized_client"}