end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

nginx + lua-nginx-module + python でシングルサインオン(sso)認証を書いてみた

2021年版 mod_perl2を使ってシングルサインオン認証を書いてみた - end0tknr's kipple - web写経開発

上記entry の nginx + lua-nginx-module + python です。

詳細は、以下のsrcの通りですが、 lua-nginx-module in nginx.conf で振り分けのみ?を行い、 殆どの処理を py_lua_auth.py で動作するbackendサーバに任せています。

参考url

nginx.conf

worker_processes  1;

events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;

    sendfile        on;
    keepalive_timeout  65;

    server {
        listen       8080;
        server_name  localhost;

        location / {
            root   /home/end0tknr/html_pub;
            index  index.html index.htm;
        }

        location /pri {
            alias /home/end0tknr/html_pri;
            index  index.html index.htm;
            access_by_lua '
                local ticket = ngx.var.cookie_py_lua_auth
                if ticket then
                    local res = ngx.location.capture("/auth?ticket=" .. ticket)
                if res.status == 200 and string.match(res.body, "OK") then
                    ngx.exit(ngx.OK)
                end
            end
            ngx.exit(ngx.HTTP_FORBIDDEN)
        ';
        }
        
        location /login {
            proxy_pass   http://127.0.0.1:8181;
        }
        location /logout {
            proxy_pass   http://127.0.0.1:8181;
        }
        location /auth {
            internal;
            proxy_pass   http://127.0.0.1:8181;
        }
    }
}

py_lua_auth.py

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import base64
import base64
import hashlib
import os
import random
import string
import sys
import time
from email.utils        import formatdate
from http.cookies       import SimpleCookie
from http.server        import BaseHTTPRequestHandler, HTTPServer
from urllib.parse       import parse_qs, urlparse

# need for "pip3 install pycryptodome"  (pycrypto is old)
from Crypto             import Random
from Crypto.Cipher      import AES
from Crypto.Hash        import SHA256
from Crypto.PublicKey   import RSA
from Crypto.Signature   import pkcs1_15
from Crypto.Util        import Padding

address = ("0.0.0.0", 8181) #全ipから接続許可
#address = ('localhost', 8181)

cookie_name = "py_lua_auth"
cookie_age = 1800 # sec

aes_key_str = "1234567890ABCDEF1234567890ABCDEF"
aes_key = ( hashlib.md5(aes_key_str.encode()).hexdigest() ).encode()

private_pem = """
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQC+dlvKyOzo2XFoRD9u2CKIV/vXrDP2NbBoTp9fr9kNCkakWnZs
x19nXXl11F+p1Uc9ayO2O+ylTsJSiSoo3OGM+mK2X905lRXiX1ZlY8OgCDpBR0qf
PNFGKzd9RisssUIyCoPogF1+lR68Ghr1odLr6X2ISxdFm13POz0SSAO5GQIBAwKB
gH75kocwnfCQ9kWC1PSQFwWP/TpyzU7OdZrfFOp1O14G2cLm+Z3aP5o+UPk4P8aO
L35HbSQn8xjfLDcGHBs967IpzIiPMGdBlgZAlPaXvx2QU+GYHRS0Uta/31q2+veC
fO3ftZMVplXWdw12mNzbF1N1EEvEFYGNex2iEKmE6KKrAkEA8zPoWgQqOwt0G3oP
2rqbkS7OMrQOnhiHjTF1ekn7m/1qEjkyEi+Sc6rzxIonBOD/bHN3QE5dSyPSo9eZ
k6CngQJBAMh8AS8QdHepZOL846VqWB6NoLBq8ZA8fH42/7lovdGJYyi6Wc20a1/B
NCOrg1RO701xn2SopfwBUCRMem0GHZkCQQCiIprmrXF8sk1nprU8fGe2HzQhzV8U
EFpeIPj8MVJn/ka20MwMH7b3x00tsW9Ylf+dok+AND4yF+HCj7u3wG+rAkEAhagA
ygr4T8ZDQf3tGPGQFF5rIEdLtX2oVCSqe5spNluXcHw73nhHlSt4F8es4t9KM6EU
7cXD/VY1bYhRngQTuwJACwzEez5Hi0jDK5NY+czEyVlc08rMhkvloyU4vpHFmjiK
gxqVsgoWZYPTsDlXtRbiEIkjXr2fFsO7hHi/kNMZfw==
-----END RSA PRIVATE KEY-----
"""

public_pem = """-----BEGIN PUBLIC KEY-----
MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQC+dlvKyOzo2XFoRD9u2CKIV/vX
rDP2NbBoTp9fr9kNCkakWnZsx19nXXl11F+p1Uc9ayO2O+ylTsJSiSoo3OGM+mK2
X905lRXiX1ZlY8OgCDpBR0qfPNFGKzd9RisssUIyCoPogF1+lR68Ghr1odLr6X2I
SxdFm13POz0SSAO5GQIBAw==
-----END PUBLIC KEY-----"""
private_key = RSA.importKey( private_pem.strip().encode() )
public_key  = RSA.importKey( public_pem.strip().encode()  )

class PyLuaAuth():

    def login(self, uid, passwd, remote_ip):
        uid = self.check_passwd(uid, passwd)
        if not uid:
            return null
        
        enc_ticket = self.encode_ticket(uid, remote_ip)
        return enc_ticket
    

    # 今回は暗号化がメインですので、この部分は全OKにします
    def check_passwd(self, uid, passwd):
        return uid
    
    def encode_ticket(self, uid, remote_ip):
        data = ":".join([uid, remote_ip, str(int( time.time() )) ]);

        h1 = SHA256.new( data.encode() )
        signature = pkcs1_15.new(private_key).sign(h1)  #署名
        signature = base64.b64encode(signature).decode()
    
        data_sign = "\t".join([data,signature])
        return self.encrypt_cbc(data_sign)
    

    def decode_ticket( self, ticket ):
        data_sign_str = self.decrypt_cbc(ticket)
        (data,signature)= data_sign_str.split("\t")

        signature = base64.b64decode( signature.encode() )

        h2 = SHA256.new( data.encode() )
        try:
            pkcs1_15.new(public_key).verify(h2, signature)
            verified = True
        except ValueError:
            verified = False
        
        if not verified:
            return []

        return data.split(":")


    def encrypt_cbc(self, plain_text):
        iv = Random.get_random_bytes(AES.block_size)
        cipher = AES.new(aes_key, AES.MODE_CBC, iv)
        data = Padding.pad( plain_text.encode('utf-8'),
                            AES.block_size,
                            'pkcs7')
        return base64.b64encode(iv + cipher.encrypt(data)).decode()


    def decrypt_cbc(self, encrypted_text):
        encrypted_text = base64.b64decode(encrypted_text)
        iv = encrypted_text[:AES.block_size]
        cipher = AES.new(aes_key, AES.MODE_CBC, iv)
        data = Padding.unpad(cipher.decrypt(encrypted_text[AES.block_size:]),
                             AES.block_size,
                             'pkcs7')
        return data.decode()

# refer to https://kazuhira-r.hatenablog.com/entry/2019/08/12/220406
class MyHTTPReqHandler(BaseHTTPRequestHandler):
    
    def do_GET(self):
        parsed_req = self.parse_req()

        if parsed_req["path"] == "/login":
            return self.login(parsed_req)
        
        if parsed_req["path"] == "/logout":
            return self.logout(parsed_req)
        
        if parsed_req["path"] == "/auth":
            self.auth(parsed_req["query"]["ticket"][0] )
            return

        self.send_response(200)
        self.send_header('Content-Type', 'text/plain; charset=utf-8')
        self.end_headers()

        self.dump_req(parsed_req)

        
    def do_POST(self):
        parsed_req = self.parse_req()
        
        if parsed_req["path"] == "/login":
            self.login(parsed_req)
            return
        if parsed_req["path"] == "/logout":
            return self.logout(parsed_req)
        
        if parsed_req["path"] == "/auth":
            self.auth(parsed_req["query"]["ticket"][0] )
            return
        
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain; charset=utf-8')
        self.end_headers()

        self.dump_req(parsed_req)
        
    def auth(self, ticket):
        ticket = ticket.strip('"').replace(' ','+')
        
        result_decode = pyluaauth.decode_ticket(ticket)
        
        if len(result_decode) == 3 and result_decode[0]:
            self.send_response(200)
            self.end_headers()
            self.wfile.write( "OK".encode() )
            return

        self.send_response(403)

        
    def logout(self, parsed_req):
        cookies = SimpleCookie()
        cookies[cookie_name] = ""
        cookies[cookie_name]['expires'] = \
            formatdate(timeval=0, usegmt=True)
        #cookies[cookie_name]['secure'] = True
        
        cookies_str = cookies.output().replace('Set-Cookie: ', '',1)
        
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain; charset=utf-8')
        self.send_header('Set-Cookie', cookies_str )
        self.end_headers()
        

    def login(self, parsed_req):
        login_ticket = \
            pyluaauth.login(parsed_req["query"]["uid"][0],
                            parsed_req["query"]["passwd"][0],
                            self.client_address[0])
        
        cookies = SimpleCookie()
        cookies[cookie_name] = login_ticket
        cookies[cookie_name]['expires'] = \
            formatdate(timeval=(time.time() + cookie_age ),
                       usegmt=True)
        #cookies[cookie_name]['secure'] = True
        
        cookies_str = cookies.output().replace('Set-Cookie: ', '',1)
        
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain; charset=utf-8')
        self.send_header('Set-Cookie', cookies_str )
        self.end_headers()

        self.wfile.write( login_ticket.encode() )

        
    def dump_req(self, parsed_req):
        self.wfile.write(
            "path = {}\n".format( parsed_req["path"]).encode()
        )
        self.wfile.write(
            "query = {}\n".format( parsed_req["query"]).encode()
        )
        
        
    def parse_req(self):
        parsed_path = urlparse(self.path)
        
        content_length = 0
        if self.headers['content-length']:
            content_length = int(self.headers['content-length'])

        ret_parse  = {}

        ret_parse["path"] = parsed_path.path
        ret_parse["query"] = parse_qs(parsed_path.query)

        # ✕ refer to https://ja.pymotw.com/2/BaseHTTPServer/
        # ○ https://qiita.com/linxuesong/items/8ac98102c24b8f587a16
        ret_parse["post_query"] = {}

        enc = sys.getfilesystemencoding()
        postdata_raw = self.rfile.read( content_length )
        postdata = parse_qs( postdata_raw.decode(enc) )

        ret_parse["query"].update( postdata )
        
        cookies = SimpleCookie(os.environ.get("HTTP_COOKIE",""))
        
        return ret_parse
        

pyluaauth = PyLuaAuth()

with HTTPServer(address, MyHTTPReqHandler) as server:
    server.serve_forever()