2021年版 mod_perl2を使ってシングルサインオン認証を書いてみた - end0tknr's kipple - web写経開発
上記entry の nginx + lua-nginx-module + python です。
詳細は、以下のsrcの通りですが、 lua-nginx-module in nginx.conf で振り分けのみ?を行い、 殆どの処理を py_lua_auth.py で動作するbackendサーバに任せています。
参考url
- https://blog.1q77.com/2013/10/nginx-mod_lua/
- https://kazuhira-r.hatenablog.com/entry/2019/08/12/220406
- https://noknow.info/it/python/implemented_encryption_decryption_library
- https://ja.pymotw.com/2/BaseHTTPServer/
- https://qiita.com/linxuesong/items/8ac98102c24b8f587a16
nginx.conf
worker_processes 1; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; sendfile on; keepalive_timeout 65; server { listen 8080; server_name localhost; location / { root /home/end0tknr/html_pub; index index.html index.htm; } location /pri { alias /home/end0tknr/html_pri; index index.html index.htm; access_by_lua ' local ticket = ngx.var.cookie_py_lua_auth if ticket then local res = ngx.location.capture("/auth?ticket=" .. ticket) if res.status == 200 and string.match(res.body, "OK") then ngx.exit(ngx.OK) end end ngx.exit(ngx.HTTP_FORBIDDEN) '; } location /login { proxy_pass http://127.0.0.1:8181; } location /logout { proxy_pass http://127.0.0.1:8181; } location /auth { internal; proxy_pass http://127.0.0.1:8181; } } }
py_lua_auth.py
#!/usr/bin/python3 # -*- coding: utf-8 -*- import base64 import base64 import hashlib import os import random import string import sys import time from email.utils import formatdate from http.cookies import SimpleCookie from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import parse_qs, urlparse # need for "pip3 install pycryptodome" (pycrypto is old) from Crypto import Random from Crypto.Cipher import AES from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Util import Padding address = ("0.0.0.0", 8181) #全ipから接続許可 #address = ('localhost', 8181) cookie_name = "py_lua_auth" cookie_age = 1800 # sec aes_key_str = "1234567890ABCDEF1234567890ABCDEF" aes_key = ( hashlib.md5(aes_key_str.encode()).hexdigest() ).encode() private_pem = """ -----BEGIN RSA PRIVATE KEY----- MIICWwIBAAKBgQC+dlvKyOzo2XFoRD9u2CKIV/vXrDP2NbBoTp9fr9kNCkakWnZs x19nXXl11F+p1Uc9ayO2O+ylTsJSiSoo3OGM+mK2X905lRXiX1ZlY8OgCDpBR0qf PNFGKzd9RisssUIyCoPogF1+lR68Ghr1odLr6X2ISxdFm13POz0SSAO5GQIBAwKB gH75kocwnfCQ9kWC1PSQFwWP/TpyzU7OdZrfFOp1O14G2cLm+Z3aP5o+UPk4P8aO L35HbSQn8xjfLDcGHBs967IpzIiPMGdBlgZAlPaXvx2QU+GYHRS0Uta/31q2+veC fO3ftZMVplXWdw12mNzbF1N1EEvEFYGNex2iEKmE6KKrAkEA8zPoWgQqOwt0G3oP 2rqbkS7OMrQOnhiHjTF1ekn7m/1qEjkyEi+Sc6rzxIonBOD/bHN3QE5dSyPSo9eZ k6CngQJBAMh8AS8QdHepZOL846VqWB6NoLBq8ZA8fH42/7lovdGJYyi6Wc20a1/B NCOrg1RO701xn2SopfwBUCRMem0GHZkCQQCiIprmrXF8sk1nprU8fGe2HzQhzV8U EFpeIPj8MVJn/ka20MwMH7b3x00tsW9Ylf+dok+AND4yF+HCj7u3wG+rAkEAhagA ygr4T8ZDQf3tGPGQFF5rIEdLtX2oVCSqe5spNluXcHw73nhHlSt4F8es4t9KM6EU 7cXD/VY1bYhRngQTuwJACwzEez5Hi0jDK5NY+czEyVlc08rMhkvloyU4vpHFmjiK gxqVsgoWZYPTsDlXtRbiEIkjXr2fFsO7hHi/kNMZfw== -----END RSA PRIVATE KEY----- """ public_pem = """-----BEGIN PUBLIC KEY----- MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQC+dlvKyOzo2XFoRD9u2CKIV/vX rDP2NbBoTp9fr9kNCkakWnZsx19nXXl11F+p1Uc9ayO2O+ylTsJSiSoo3OGM+mK2 X905lRXiX1ZlY8OgCDpBR0qfPNFGKzd9RisssUIyCoPogF1+lR68Ghr1odLr6X2I SxdFm13POz0SSAO5GQIBAw== -----END PUBLIC KEY-----""" private_key = RSA.importKey( private_pem.strip().encode() ) public_key = RSA.importKey( public_pem.strip().encode() ) class PyLuaAuth(): def login(self, uid, passwd, remote_ip): uid = self.check_passwd(uid, passwd) if not uid: return null enc_ticket = self.encode_ticket(uid, remote_ip) return enc_ticket # 今回は暗号化がメインですので、この部分は全OKにします def check_passwd(self, uid, passwd): return uid def encode_ticket(self, uid, remote_ip): data = ":".join([uid, remote_ip, str(int( time.time() )) ]); h1 = SHA256.new( data.encode() ) signature = pkcs1_15.new(private_key).sign(h1) #署名 signature = base64.b64encode(signature).decode() data_sign = "\t".join([data,signature]) return self.encrypt_cbc(data_sign) def decode_ticket( self, ticket ): data_sign_str = self.decrypt_cbc(ticket) (data,signature)= data_sign_str.split("\t") signature = base64.b64decode( signature.encode() ) h2 = SHA256.new( data.encode() ) try: pkcs1_15.new(public_key).verify(h2, signature) verified = True except ValueError: verified = False if not verified: return [] return data.split(":") def encrypt_cbc(self, plain_text): iv = Random.get_random_bytes(AES.block_size) cipher = AES.new(aes_key, AES.MODE_CBC, iv) data = Padding.pad( plain_text.encode('utf-8'), AES.block_size, 'pkcs7') return base64.b64encode(iv + cipher.encrypt(data)).decode() def decrypt_cbc(self, encrypted_text): encrypted_text = base64.b64decode(encrypted_text) iv = encrypted_text[:AES.block_size] cipher = AES.new(aes_key, AES.MODE_CBC, iv) data = Padding.unpad(cipher.decrypt(encrypted_text[AES.block_size:]), AES.block_size, 'pkcs7') return data.decode() # refer to https://kazuhira-r.hatenablog.com/entry/2019/08/12/220406 class MyHTTPReqHandler(BaseHTTPRequestHandler): def do_GET(self): parsed_req = self.parse_req() if parsed_req["path"] == "/login": return self.login(parsed_req) if parsed_req["path"] == "/logout": return self.logout(parsed_req) if parsed_req["path"] == "/auth": self.auth(parsed_req["query"]["ticket"][0] ) return self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.dump_req(parsed_req) def do_POST(self): parsed_req = self.parse_req() if parsed_req["path"] == "/login": self.login(parsed_req) return if parsed_req["path"] == "/logout": return self.logout(parsed_req) if parsed_req["path"] == "/auth": self.auth(parsed_req["query"]["ticket"][0] ) return self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.dump_req(parsed_req) def auth(self, ticket): ticket = ticket.strip('"').replace(' ','+') result_decode = pyluaauth.decode_ticket(ticket) if len(result_decode) == 3 and result_decode[0]: self.send_response(200) self.end_headers() self.wfile.write( "OK".encode() ) return self.send_response(403) def logout(self, parsed_req): cookies = SimpleCookie() cookies[cookie_name] = "" cookies[cookie_name]['expires'] = \ formatdate(timeval=0, usegmt=True) #cookies[cookie_name]['secure'] = True cookies_str = cookies.output().replace('Set-Cookie: ', '',1) self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.send_header('Set-Cookie', cookies_str ) self.end_headers() def login(self, parsed_req): login_ticket = \ pyluaauth.login(parsed_req["query"]["uid"][0], parsed_req["query"]["passwd"][0], self.client_address[0]) cookies = SimpleCookie() cookies[cookie_name] = login_ticket cookies[cookie_name]['expires'] = \ formatdate(timeval=(time.time() + cookie_age ), usegmt=True) #cookies[cookie_name]['secure'] = True cookies_str = cookies.output().replace('Set-Cookie: ', '',1) self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.send_header('Set-Cookie', cookies_str ) self.end_headers() self.wfile.write( login_ticket.encode() ) def dump_req(self, parsed_req): self.wfile.write( "path = {}\n".format( parsed_req["path"]).encode() ) self.wfile.write( "query = {}\n".format( parsed_req["query"]).encode() ) def parse_req(self): parsed_path = urlparse(self.path) content_length = 0 if self.headers['content-length']: content_length = int(self.headers['content-length']) ret_parse = {} ret_parse["path"] = parsed_path.path ret_parse["query"] = parse_qs(parsed_path.query) # ✕ refer to https://ja.pymotw.com/2/BaseHTTPServer/ # ○ https://qiita.com/linxuesong/items/8ac98102c24b8f587a16 ret_parse["post_query"] = {} enc = sys.getfilesystemencoding() postdata_raw = self.rfile.read( content_length ) postdata = parse_qs( postdata_raw.decode(enc) ) ret_parse["query"].update( postdata ) cookies = SimpleCookie(os.environ.get("HTTP_COOKIE","")) return ret_parse pyluaauth = PyLuaAuth() with HTTPServer(address, MyHTTPReqHandler) as server: server.serve_forever()