Source code for django_cas_ng.views

"""CAS login/logout replacement views"""


from datetime import timedelta
from importlib import import_module
from typing import Any
from urllib import parse as urllib_parse

from django.conf import settings
from django.contrib import messages
from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout
from django.core.exceptions import PermissionDenied
from django.http import (
    HttpRequest,
    HttpResponse,
    HttpResponseBadRequest,
    HttpResponseRedirect,
)
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.decorators.csrf import csrf_exempt

from .models import SESSION_KEY_MAXLENGTH, ProxyGrantingTicket, SessionTicket
from .signals import cas_user_logout
from .utils import (
    RedirectException,
    get_cas_client,
    get_protocol,
    get_redirect_url,
    get_service_url,
    get_user_from_session,
)

SessionStore = import_module(settings.SESSION_ENGINE).SessionStore

__all__ = ['LoginView', 'LogoutView', 'CallbackView']


def clean_next_page(request, next_page):
    if not next_page:
        return next_page
    if settings.CAS_CHECK_NEXT and not is_local_url(request.build_absolute_uri('/'), next_page):
        raise RedirectException("Non-local url is forbidden to be redirected to.")
    return next_page


def is_local_url(host_url, url):
    """
    :param host_url: is an absolute host url, say https://site.com/
    :param url: is any url
    :return: Is :url: local to :host_url:?
    """
    url = url.strip()
    parsed_url = urllib_parse.urlparse(url)
    if not parsed_url.netloc:
        return True
    parsed_host = urllib_parse.urlparse(host_url)
    if parsed_url.netloc != parsed_host.netloc:
        return False
    if parsed_url.scheme != parsed_host.scheme and parsed_url.scheme:
        return False
    url_path = parsed_url.path if parsed_url.path.endswith('/') else parsed_url.path + '/'
    host_path = parsed_host.path if parsed_host.path.endswith('/') else parsed_host.path + '/'
    return url_path.startswith(host_path)


[docs]class LoginView(View):
[docs] @method_decorator(csrf_exempt) def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: return super().dispatch(request, *args, **kwargs)
[docs] def successful_login(self, request: HttpRequest, next_page: str) -> HttpResponse: """ This method is called on successful login. Override this method for custom post-auth actions (i.e, to add a cookie with a token). :param request: :param next_page: :return: """ return HttpResponseRedirect(next_page)
[docs] def post(self, request: HttpRequest) -> HttpResponse: next_page = clean_next_page(request, request.POST.get('next', settings.CAS_REDIRECT_URL)) service_url = get_service_url(request, next_page) client = get_cas_client(service_url=service_url, request=request) if request.POST.get('logoutRequest'): clean_sessions(client, request) return HttpResponseRedirect(next_page) return HttpResponseRedirect(client.get_login_url())
[docs] def get(self, request: HttpRequest) -> HttpResponse: """ Forwards to CAS login URL or verifies CAS ticket :param request: :return: """ next_url = getattr( settings, "CAS_LOGIN_NEXT_PAGE", request.GET.get("next"), ) next_page = clean_next_page(request, next_url) required = request.GET.get('required', False) service_url = get_service_url(request, next_page) client = get_cas_client(service_url=service_url, request=request) if not next_page and settings.CAS_STORE_NEXT and 'CASNEXT' in request.session: next_page = request.session['CASNEXT'] request.session['CASNEXT'] = None if not next_page: next_page = get_redirect_url(request) if request.user.is_authenticated: if settings.CAS_LOGGED_MSG is not None: message = settings.CAS_LOGGED_MSG % request.user.get_username() messages.success(request, message) return self.successful_login(request=request, next_page=next_page) ticket = request.GET.get('ticket') if not ticket: if settings.CAS_STORE_NEXT: request.session['CASNEXT'] = next_page return HttpResponseRedirect(client.get_login_url()) user = authenticate(ticket=ticket, service=service_url, request=request) pgtiou = request.session.get("pgtiou") if user is not None: auth_login(request, user) # from https://code.djangoproject.com/ticket/19147 if not request.session.session_key: request.session.save() # Truncate session key to a max of its value length. # When using the signed_cookies session backend, the # session key can potentially be longer than this. session_key = request.session.session_key[:SESSION_KEY_MAXLENGTH] if not request.session.exists(session_key): request.session.create() try: st = SessionTicket.objects.get(session_key=session_key) st.ticket = ticket st.save() except SessionTicket.DoesNotExist: SessionTicket.objects.create( session_key=session_key, ticket=ticket ) if pgtiou and settings.CAS_PROXY_CALLBACK: # Delete old PGT ProxyGrantingTicket.objects.filter( user=user, session_key=session_key ).delete() # Set new PGT ticket try: pgt = ProxyGrantingTicket.objects.get(pgtiou=pgtiou) pgt.user = user pgt.session_key = session_key pgt.save() except ProxyGrantingTicket.DoesNotExist: pass if settings.CAS_LOGIN_MSG is not None: name = user.get_username() message = settings.CAS_LOGIN_MSG % name messages.success(request, message) return self.successful_login(request=request, next_page=next_page) if settings.CAS_RETRY_LOGIN or required: return HttpResponseRedirect(client.get_login_url()) raise PermissionDenied(_('Login failed.'))
[docs]class LogoutView(View):
[docs] def get(self, request: HttpRequest) -> HttpResponse: """ Redirects to CAS logout page :param request: :return: """ next_url = getattr( settings, "CAS_LOGOUT_NEXT_PAGE", request.GET.get("next"), ) next_page = clean_next_page(request, next_url) # try to find the ticket matching current session for logout signal # Truncate session key to a max of its value length # When using the signed_cookies session backend, the # session key can potentially be longer than this. session_key = None if request.session and request.session.session_key: session_key = request.session.session_key[:SESSION_KEY_MAXLENGTH] try: st = SessionTicket.objects.get(session_key=session_key) ticket = st.ticket except SessionTicket.DoesNotExist: ticket = None # send logout signal cas_user_logout.send( sender="manual", user=request.user, session=request.session, ticket=ticket, ) # clean current session ProxyGrantingTicket and SessionTicket ProxyGrantingTicket.objects.filter(session_key=session_key).delete() SessionTicket.objects.filter(session_key=session_key).delete() auth_logout(request) next_page = next_page or get_redirect_url(request) if settings.CAS_LOGOUT_COMPLETELY: if next_page.lower().startswith("https://") or next_page.lower().startswith("http://"): redirect_url = next else: if hasattr(settings, 'CAS_ROOT_PROXIED_AS') and settings.CAS_ROOT_PROXIED_AS: protocol, host, _, _, _, _ = urllib_parse.urlparse(settings.CAS_ROOT_PROXIED_AS) else: protocol = get_protocol(request) host = request.get_host() redirect_url = urllib_parse.urlunparse( (protocol, host, next_page, '', '', ''), ) client = get_cas_client(request=request) return HttpResponseRedirect(client.get_logout_url(redirect_url)) # This is in most cases pointless if not CAS_RENEW is set. The user will # simply be logged in again on next request requiring authorization. return HttpResponseRedirect(next_page)
[docs]class CallbackView(View): """ Read PGT and PGTIOU sent by CAS """
[docs] @method_decorator(csrf_exempt) def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: return super().dispatch(request, *args, **kwargs)
[docs] def post(self, request: HttpRequest) -> HttpResponse: if request.POST.get('logoutRequest'): clean_sessions(get_cas_client(request=request), request) return HttpResponse("{}\n".format(_('ok')), content_type="text/plain") return HttpResponseBadRequest('Missing logoutRequest')
[docs] def get(self, request: HttpRequest) -> HttpResponse: pgtid = request.GET.get('pgtId') pgtiou = request.GET.get('pgtIou') pgt = ProxyGrantingTicket.objects.create(pgtiou=pgtiou, pgt=pgtid) pgt.save() ProxyGrantingTicket.objects.filter( session_key=None, date__lt=(timezone.now() - timedelta(seconds=60)) ).delete() return HttpResponse("{}\n".format(_('ok')), content_type="text/plain")
def clean_sessions(client, request): if not hasattr(client, 'get_saml_slos'): return for slo in client.get_saml_slos(request.POST.get('logoutRequest')): try: st = SessionTicket.objects.get(ticket=slo.text) session = SessionStore(session_key=st.session_key) # send logout signal cas_user_logout.send( sender="slo", user=get_user_from_session(session), session=session, ticket=slo.text, ) session.flush() # clean logout session ProxyGrantingTicket and SessionTicket ProxyGrantingTicket.objects.filter(session_key=st.session_key).delete() SessionTicket.objects.filter(session_key=st.session_key).delete() except SessionTicket.DoesNotExist: pass