File

mod_rest/example/prosody_oauth.py @ 5390:f2363e6d9a64

mod_http_oauth2: Advertise the currently supported id_token signing algorithm This field is REQUIRED. The algorithm RS256 MUST be included, but isn't because we don't implement it, as that would require implementing a pile of additional cryptography and JWT stuff. Instead the id_token is signed using the client secret, which allows verification by the client, since it's a shared secret per OpenID Connect Core 1.0 § 10.1 under Symmetric Signatures. OpenID Connect Discovery 1.0 has a lot of REQUIRED and MUST clauses that are not supported here, but that's okay because this is served from the RFC 8414 OAuth 2.0 Authorization Server Metadata .well-known endpoint!
author Kim Alvefur <zash@zash.se>
date Sun, 30 Apr 2023 16:13:40 +0200
parent 5269:0e5a37f55440
child 5488:9a4556a13cc7
line wrap: on
line source

from requests_oauthlib import OAuth2Session
import requests


class ProsodyRestSession(OAuth2Session):
    def __init__(
        self, base_url, client_name, client_uri, redirect_uri, *args, **kwargs
    ):
        self.base_url = base_url
        discovery_url = base_url + "/.well-known/oauth-authorization-server"

        meta = requests.get(discovery_url).json()
        reg = requests.post(
            meta["registration_endpoint"],
            json={
                "client_name": client_name,
                "client_uri": client_uri,
                "redirect_uris": [redirect_uri],
            },
        ).json()

        super().__init__(client_id=reg["client_id"], *args, **kwargs)

        self.meta = meta
        self.client_secret = reg["client_secret"]
        self.client_id = reg["client_id"]

    def authorization_url(self, *args, **kwargs):
        return super().authorization_url(
            self.meta["authorization_endpoint"], *args, **kwargs
        )

    def fetch_token(self, *args, **kwargs):
        return super().fetch_token(
            token_url=self.meta["token_endpoint"],
            client_secret=self.client_secret,
            *args,
            **kwargs
        )

    def xmpp(self, json=None, *args, **kwargs):
        return self.post(self.base_url + "/rest", json=json, *args, **kwargs)


if __name__ == "__main__":
    # Example usage

    # from prosody_oauth import ProsodyRestSession
    from getpass import getpass

    p = ProsodyRestSession(
        input("Base URL: "),
        "Prosody mod_rest OAuth 2 example",
        "https://modules.prosody.im/mod_rest",
        "urn:ietf:wg:oauth:2.0:oob",
    )

    print("Open the following URL in a browser and login:")
    print(p.authorization_url()[0])

    p.fetch_token(code=getpass("Paste Authorization code: "))

    print(p.xmpp(json={"disco": True, "to": "jabber.org"}).json())