以下の続き
Userinfoエンドポイント
Userinfoエンドポイントを実装する。
アクセストークンの永続化
Userinfoエンドポイントのリクエストにアクセストークンを使うので、まずはトークンエンドポイントで発行するアクセストークンをDBに永続化する。
Modelを定義する。
models.py
# ...省略 class AccessToken(models.Model): token = models.CharField(max_length=64, unique=True) user = models.ForeignKey(User, on_delete=models.CASCADE) expired_at = models.DateTimeField()
トークンエンドポイントでアクセストークンをDBに永続化する。
views.py
from sampleapp.models import ( AccessToken, # ...省略 ) # ...省略 class TokenView(View): def post(self, request): # ...省略 access_token = crypto.get_random_string(8) expires_in = 3600 AccessToken.objects.create( token=access_token, user=authorization_code.user, expired_at=datetime.now() + timedelta(seconds=expires_in), ) refresh_token = crypto.get_random_string(8) authorization_code.delete() return JsonResponse( { "access_token": access_token, "token_type": "Bearer", "expires_in": expires_in, "refresh_token": refresh_token, "id_token": id_token, } )
ディスカバリへの登録
Userinfoエンドポイントをディスカバリに登録する。
views.py
class DiscoveryView(View): def get(self, request): return JsonResponse( { # ...省略 "userinfo_endpoint": "http://localhost:8000/sample/userinfo/", } )
Userinfoエンドポイントの実装
Userinfoエンドポイントを実装していく。まずはViewの雛形を定義する。公式の仕様によるとUserinfoエンドポイントはGETでもPOSTでもいいらしいので、参照系ということでGETにしておく。
https://openid-foundation-japan.github.io/openid-connect-core-1_0.ja.html#UserInfoRequest
views.py
# ...省略 class UserInfoView(View): def get(self, request): return JsonResponse({})
urls.py
urlpatterns = [
# ...省略
path("userinfo/", views.UserInfoView.as_view(), name="userinfo"),
]
Viewの中を実装する。Userinfoエンドポイントへのリクエストは以下のような形式である。
GET /userinfo HTTP/1.1 Host: server.example.com Authorization: Bearer SlAV32hkKG
まずはAuthorizationヘッダーのバリデーションをした上でアクセストークンを取得する。
views.py
class UserInfoView(View): def get(self, request): raw_authorization = request.headers.get("Authorization") if raw_authorization is None: return HttpResponseBadRequest() authorization = raw_authorization.split(" ") if len(authorization) != 2 or authorization[0] != "Bearer": return HttpResponseBadRequest() access_token = authorization[1] return JsonResponse({})
DBからアクセストークンを取得する。期限のバリデーションも同時に行う。
class UserInfoView(View): def get(self, request): # ...省略 try: access_token_obj = AccessToken.objects.get( token=access_token, expired_at__gte=datetime.now() ) except AccessToken.DoesNotExist: return HttpResponseBadRequest() return JsonResponse({})
AccessTokenにUserが紐づいているので、subにUserのidを入れて返す。仕様でUserinfoエンドポイントのレスポンスにはsubを必ず含めることが決まっている。
UserInfo Response には, 必ず sub (subject) Claim を含むこと (MUST).
https://openid-foundation-japan.github.io/openid-connect-core-1_0.ja.html#UserInfoResponse
views.py
class UserInfoView(View): def get(self, request): # ...省略 return JsonResponse( { "sub": str(access_token_obj.user.id), } )
以下のようにcurlコマンドを実行すると、ユーザー情報を取得できた。
$ curl -X GET "http://localhost:8000/sample/userinfo/" -H "Authorization: Bearer EksRQHAD" {"sub": "1"} $
scopeによる制御の追加
本来のOIDCの仕様では、Userinfoエンドポイントで返すユーザー情報の種類は、認可リクエストでリクエストパラメータとして送ったscopeに従う。
今回はscopeにemailが存在する場合はユーザーのメールアドレスも返すように実装してみる。
まずは認可コードにscopeを紐付けてトークンエンドポイントまで引き回す。
models.py
# ...省略 class AuthorizationCodeScope(models.Model): code = models.ForeignKey(AuthorizationCode, on_delete=models.CASCADE) scope = models.CharField(max_length=32) # ...省略
views.py
from sampleapp.models import ( # ...省略 AuthorizationCodeScope, # ...省略 ) # ...省略 class ConsentView(LoginRequiredMixin, View): def post(self, request, consent_access_token): # ...省略 for scope in ConsentAccessTokenScope.objects.filter(token=token): AuthorizationCodeScope.objects.create( code=AuthorizationCode.objects.get(code=code), scope=scope.scope )
これでトークンエンドポイントからscopeを参照できるようになったので、次はアクセストークンにscopeを紐づけて永続化する。
models.py
# ...省略 class AccessTokenScope(models.Model): token = models.ForeignKey(AccessToken, on_delete=models.CASCADE) scope = models.CharField(max_length=32)
views.py
# ...省略 from sampleapp.models import ( AccessToken, AccessTokenScope, # ...省略 ) # ...省略 class TokenView(View): def post(self, request): # ...省略 access_token = crypto.get_random_string(8) expires_in = 3600 access_token_obj = AccessToken.objects.create( token=access_token, user=authorization_code.user, expired_at=datetime.now() + timedelta(seconds=expires_in), ) for scope in AuthorizationCodeScope.objects.filter(code=authorization_code): AccessTokenScope.objects.create(token=access_token_obj, scope=scope.scope) # ...省略
これでUserinfoエンドポイント上でアクセストークンからscopeを参照できるようになった。
UserInfoViewにscopeの制御を追加する。
openidは必須スコープなので存在しない場合は400を返す。emailが存在する場合はレスポンスにメールアドレスを追加する。
# ...省略 class UserInfoView(View): def get(self, request): # ...省略 try: access_token_obj = AccessToken.objects.get( token=access_token, expired_at__gte=datetime.now() ) except AccessToken.DoesNotExist: return HttpResponseBadRequest() scopes = [ scope.scope for scope in AccessTokenScope.objects.filter(token=access_token_obj) ] if "openid" not in scopes: return HttpResponseBadRequest() userinfo = { "sub": str(access_token_obj.user.id), } if "email" in scopes: userinfo["email"] = access_token_obj.user.email return JsonResponse(userinfo)
レスポンスにメールアドレスを追加できた。
$ curl -X GET "http://localhost:8000/sample/userinfo/" -H "Authorization: Bearer MNrRjLGV" {"sub": "1", "email": "admin@example.com"} $
いったんまとめ
Userinfoエンドポイントまで実装できたので、OIDCの最低限のフローに必要なものは全て実装できたはず。明日は別にやりたいことがあるので、IdP練習実装シリーズの年末年始休暇分はここまでとする。
次は少し間が開きそうだが、以下をやっていきたい。
本記事のコードのリポジトリは以下 github.com