end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

sqlachemy + psycopg2 +fastapi for python で postgres への sql実行

先日のentryの続きとして、更に fastapi を経由し、 postgres への 生sqlを実行します

import os
import sys
sys.path.append( os.path.dirname(__file__)+"/../lib" )

from fastapi.middleware.cors import CORSMiddleware
import fastapi
import json
import sqlalchemy
import sqlalchemy.orm

conf_src = os.path.join(os.path.dirname(__file__),
                        '../../resources/app_py_conf.json')
conf = json.load( open(conf_src) )

# https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine
db_url_tmpl = "postgresql+psycopg2://{user}:{pass}@{host}:{port}/{db}"
engine = sqlalchemy.create_engine( db_url_tmpl.format(**conf["db"] ) )

session_local = sqlalchemy.orm.sessionmaker(bind=engine,
                                            autocommit=False,
                                            autoflush =False )
def get_db():
    db = session_local() # 実際のdbへの接続は、このタイミング
    try:
        yield db
    finally:
        # context manager(例:with)により、index()終了時に closeが呼ばれます
        db.close()

app = fastapi.FastAPI()

# FastAPIは defaultでは localhostからのrequestのみ許可する為
# CORSMiddlewareにより * からのrequestを許可化
# origins = ["http://localhost",
#            "http://localhost:8080" ]
app.add_middleware(CORSMiddleware,
                   allow_origins=["*"],
                   #allow_origins=origins,
                   allow_credentials=True,
                   allow_methods=["*"],
                   allow_headers=["*"] )

@app.get("/")
def index(db: sqlalchemy.orm.Session = fastapi.Depends(get_db) ):

    sql = sqlalchemy.text("SELECT * FROM city WHERE code = :code")
    sql_vals = {"code":"11002"}
    results = db.execute( sql.bindparams( **sql_vals ) ).mappings()
    for result in results:
        print(result)
        return result
    return None