end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

xlrd for python3 で xls な excel を 開く

目次

xlrd.open_workbook( $file_path ) - localにあるfileを開く

よく見る xlrd.open_workbook(~) の使用法です。 例えば、以下の download_excel() では、 tempfile.TemporaryDirectory() に xlsな excelを一旦、保存し その後、xlrd.open_workbook(~) でfileを開いています。

#!python
# -*- coding: utf-8 -*-

# refer urls are below.
# https://gigazine.net/news/20151201-household-income-map/
# https://gunmagisgeek.com/datavis/mimanCity/
# http://www.e-stat.go.jp/SG1/estat/NewList.do?tid=000001063455
# https://github.com/shimizu/H25_yearly_income

from bs4 import BeautifulSoup
from psycopg2  import extras # for bulk insert

import appbase
import copy
import json
import xlrd # for xls
import os
import re
import tempfile
import urllib.request
from service.city       import CityService

master_xls  = "b013.xls"
target_host = "http://www.e-stat.go.jp"
target_tbl_name = "".join([
    "家計を主に支える者の年齢(6区分)・従業上の地位(8区分)・",
    "世帯の年間収入階級(5区分),",
    "現住居以外の土地の所有状況(4区分)別普通世帯数―市区町村"])
target_tbl_url = re.compile("^/stat-search/file-download")
bulk_insert_size = 20
logger = None

class EstatJutakuTochiService(appbase.AppBase):

    def __init__(self):
        global logger
        logger = self.get_logger()

    def save_tbl_rows(self, rows):
        logger.info("start")
        logger.info(rows[0])
        
        row_groups = self.__divide_rows(rows, bulk_insert_size)
        sql = """
INSERT INTO estat_jutakutochi (city,setai,setai_nushi_age,setai_year_income)
VALUES %s
 ON CONFLICT DO NOTHING
"""
        with self.db_connect() as db_conn:
            with self.db_cursor(db_conn) as db_cur:

                for row_group in row_groups:
                    try:
                        # bulk insert
                        extras.execute_values(db_cur,sql,row_group)
                    except Exception as e:
                        logger.error(e)
                        logger.error(sql)
                        logger.error(row_group)
                        return False
                    
            db_conn.commit()
        return True
    

    def __divide_rows(self, org_rows, chunk_size):
        i = 0
        chunk = []
        ret_rows = []
        for org_row in org_rows:
            chunk.append( ( org_row['city'],
                            org_row['setai'],
                            json.dumps(org_row['setai_nushi_age'],
                                       ensure_ascii=False),
                            json.dumps(org_row['setai_year_income'],
                                       ensure_ascii=False) ) )
            
            if len(chunk) >= chunk_size:
                ret_rows.append(chunk)
                chunk = []
            i += 1

        if len(chunk) > 0:
            ret_rows.append(chunk)

        return ret_rows

    
    def download_excel(self, download_url):
        logger.info(download_url)
        
        ret_data = []
    
        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_xls_path =os.path.join(tmp_dir, master_xls)
            try:
                data = urllib.request.urlopen(download_url).read()
                with open(tmp_xls_path, mode="wb") as fh:
                    fh.write(data)
                
                wbook = xlrd.open_workbook(tmp_xls_path)
                for sheetname in wbook.sheet_names():
                    wsheet = wbook.sheet_by_name(sheetname)
                    
                    logger.info("start %s %d rows" % (sheetname, wsheet.nrows) )

                    tmp_ret_data = self.__load_wsheet( wsheet )
                    ret_data.extend( tmp_ret_data )
                
            except Exception as e:
                logger.error("fail", download_url)
                logger.error(e)
                return []
            
        return ret_data
        
    def __load_wsheet( self, wsheet ):
        
        ret_data = []
        city_template = {"city":"",
                         "setai":0,
                         "setai_nushi_age"  :{}, #json
                         "setai_year_income":{}} #json
        now_city = None
        now_data = None
        row_no = 21
        
        while row_no < wsheet.nrows :
            tmp_atri_key = wsheet.cell_value(row_no,7)
            if not tmp_atri_key :
                row_no += 1
                continue

            tmp_atri_key = \
                tmp_atri_key.translate(str.maketrans({' ': '', ' ': ''}))

            city_def = self.__is_city_caption(tmp_atri_key)
            if city_def:
                if now_city:
                    ret_data.append(now_city)
                now_city = copy.deepcopy( city_template )
                now_city["city"] = city_def["city"]
                row_no += 1
                continue

            if not now_city:
                row_no += 1
                continue

            if tmp_atri_key == "普通世帯総数":
                now_city["setai"] = wsheet.cell_value(row_no,10)
                row_no += 1
                continue
            if tmp_atri_key == "(その1.家計を主に支える者の年齢)":
                now_data = "setai_nushi_age"
                row_no += 1
                continue
            if tmp_atri_key == "(その2.従業上の地位)":
                now_data = None
                row_no += 1
                continue
            if tmp_atri_key == "(その3.世帯の年間収入階級)":
                now_data = "setai_year_income"
                row_no += 1
                continue
            if not now_data:
                row_no += 1
                continue

            now_city[now_data][tmp_atri_key] = wsheet.cell_value(row_no,10)
            row_no += 1
        return ret_data
            
    def __is_city_caption(self, tmp_atry_key):
        re_compile = re.compile("(\d+)([^%d]+)")
        re_result = re_compile.search(tmp_atry_key)
        if not re_result:
            return None

        city_code = re_result.group(1)
        city_tmp  = re_result.group(2)

        city_service = CityService()
        return city_service.find_def_by_code_city( city_code, city_tmp )

    def __find_pref_links(self, hrefs):
        ret_urls = []
        
        re_compile = re.compile("([^\d]+[都道府県]).+2\d\d件")
        for href in hrefs:
            href_text = href.text.replace('\n', '').strip()

            re_result = re_compile.search(href_text)
            if not re_result:
                continue
            
            ret_urls.append( [re_result.group(1),
                              target_host+ href.attrs['href'] ])
        return ret_urls
            
        
    def __find_download(self, pref_req_url ):

        html_content = urllib.request.urlopen(pref_req_url).read()
        soup = BeautifulSoup(html_content, 'html.parser')

        tmp_css_selector = ".stat-dataset_list-item"
        articles = []
        try:
            articles = soup.select(tmp_css_selector)
        except Exception as e:
            logger.error(e)
            logger.error(pref_req_url)
            return ""

        for article in articles:
            download_url = self.__find_download_sub( article )
            if download_url:
                return download_url
            
    def __find_download_sub(self, elm_article ):
        elm_lis = []
        tmp_css_selector = ".stat-dataset_list-detail-item"
        try:
            elm_lis = elm_article.select(tmp_css_selector)
        except Exception as e:
            logger.error(e)
            return ""
        
        if len(elm_lis)==0:
            return ""
        
        tmp_text = elm_lis[0].text.replace('\n', '').strip()
        re_compile = re.compile("111") #= 表番号
        if not re_compile.search(tmp_text):
            return ""
                
        tmp_css_selector = ".stat-link_text"
        try:
            elm_lis = elm_article.select(tmp_css_selector)
        except Exception as e:
            logger.error(e)
            return ""

        tmp_text = elm_lis[0].text.replace('\n', '').strip()
            
        if tmp_text != target_tbl_name:
            return ""

        hrefs = []
        try:
            hrefs = elm_article.select("a")
        except Exception as e:
            logger.error(e)
            return ""

        re_compile = re.compile(target_tbl_url)
        
        for href in hrefs:
            download_url = href.attrs['href']

            if re_compile.search(download_url):
                return target_host + download_url

            
    def find_download_urls(self):
        logger.info("start")
        
        req_url = target_host+ '/SG1/estat/NewList.do?tid=000001063455'
        html_content = urllib.request.urlopen(req_url).read()
        soup = BeautifulSoup(html_content, 'html.parser')
        
        tmp_css_selector = ".stat-matter3 a"
        try:
            hrefs = soup.select(tmp_css_selector)
        except Exception as e:
            logger.error(e)
            logger.error(req_url)
            return []

        pref_urls = self.__find_pref_links(hrefs)

        ret_urls = []
        for pref_url in pref_urls:
            logger.info("start "+pref_url[0]+" "+ pref_url[1])
            ret_url = self.__find_download(pref_url[1] )
            logger.info("done "+pref_url[0]+" "+ ret_url)

            ret_urls.append( ret_url )
            
        return ret_urls

xlrd.open_workbook( file_contents=~ ) - downloadしたexcelを直接開く

一方、xlrd.open_workbook( file_contents=~ ) では、 ダウンロードしたファイルの直接読み込みや、 標準入力からの読み込みが可能です。

#!python
# -*- coding: utf-8 -*-

#refer to
# https://www.e-stat.go.jp/stat-search/files
#   ?layout=datalist&toukei=00200522&tstat=000001127155&tclass1=000001133386
# 市町村-1
#   居住世帯の有無(8区分)別住宅数及び住宅以外で人が
#   居住する建物数―全国,都道府県,市区町村

from bs4 import BeautifulSoup
from psycopg2  import extras # for bulk insert
from service.city import CityService

import xlrd # for xls
import os
import re
import service.estat_jutakutochi
import tempfile
import urllib.request
from service.city       import CityService

download_url = \
    "https://www.e-stat.go.jp/stat-search/file-download?statInfId=000031866048&fileKind=0"
logger = None

class EstatJutakuTochiD001Service(
        service.estat_jutakutochi.EstatJutakuTochiService):

    def __init__(self):
        global logger
        logger = self.get_logger()
        
    def calc_download_filename(self,headers, download_url):

        re_compile_1 = "^attachment;\s+"
        filename = None
        for header in headers:
            if header[0] == "Content-Disposition":
                filename = self.calc_download_filename_sub(header[1])
                break

        if filename:
            return filename
        
        filename = os.basename(download_url)
        if filename:
            return filename
        
        return None
                
    def calc_download_filename_sub(self,header_vals_str):
        header_vals = re.compile("\s*;\s*").split(header_vals_str)

        # rfc 6266
        # https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Content-Disposition
        re_compile = re.compile("^filename\*\s*=(.+)''(.+)")
        for header_val in header_vals:
            re_result = re_compile.search( header_val )
            if not re_result:
                continue

            filename_enc = re_result.group(1)
            filename     = re_result.group(2)
            filename = filename.encode(filename_enc).decode("UTF-8")
            return filename
        
        re_compile = re.compile("^filename\s*=['\"]?(.+)['\"]?")
        for header_val in header_vals:
            re_result = re_compile.search( header_val )
            if not re_result:
                continue

            filename     = re_result.group(1)
            return filename
        
        return None
            
        
    def download_file(self, download_url):
        logger.info(download_url)
        try:
            res = urllib.request.urlopen(download_url)
        except Exception as e:
            logger.error(download_url)
            logger.error(e)
            return None

        content = res.read()
        filename = self.calc_download_filename(res.getheaders(),
                                               download_url )
        return {"filename":filename, "content":content}

        
    def download_src_data(self):
        logger.info(download_url)
        downloaded = self.download_file( download_url )
        
        ret_data = []
        wbook = xlrd.open_workbook( file_contents=downloaded["content"] )
        
        for sheetname in wbook.sheet_names():
            wsheet = wbook.sheet_by_name(sheetname)
            logger.info("start %s %d rows" % (sheetname, wsheet.nrows) )

            tmp_ret_data = self.load_wsheet( wsheet )
            ret_data.extend( tmp_ret_data )

        return ret_data
                                   
    def load_wsheet( self, wsheet ):
        
        city_service = CityService()
        ret_data = []
        row_no = 17
        
        while row_no < wsheet.nrows :
            city_code = wsheet.cell_value(row_no,7)
            city_name = wsheet.cell_value(row_no,8).strip()
            # print(city_code, city, total)

            city_def = city_service.find_def_by_code_city(city_code,
                                                          city_name)
            if not city_def:
                row_no += 1
                continue
            new_info = {
                "pref"         : city_def["pref"],
                "city"         : city_def["city"],
                "house"        : wsheet.cell_value(row_no,10), # 住宅数
                "lived_house"  : wsheet.cell_value(row_no,11), # 居住世帯あり
                "nolived_house": wsheet.cell_value(row_no,14), # 居住世帯なし
            }

            ret_data.append(new_info)
            
            row_no += 1
        return ret_data


    def save_tbl_rows(self, rows):
        logger.info("start")
        logger.info(rows[0])

        bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
        atri_keys = ["pref","city","house","lived_house","nolived_house"]
        row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
        
        sql = """
INSERT INTO estat_jutakutochi_d001 (%s) VALUES %s
"""
        sql = sql % (",".join(atri_keys), "%s")
        
        with self.db_connect() as db_conn:
            with self.db_cursor(db_conn) as db_cur:

                for row_group in row_groups:
                    try:
                        # bulk insert
                        extras.execute_values(db_cur,sql,row_group)
                    except Exception as e:
                        logger.error(e)
                        logger.error(sql)
                        logger.error(row_group)
                        return False
                    
            db_conn.commit()
        return True

    def divide_rows(self, org_rows, chunk_size, atri_keys):
        i = 0
        chunk = []
        ret_rows = []
        for org_row in org_rows:
            new_tuple = ()
            for atri_key in atri_keys:
                new_tuple += (org_row[atri_key],)
            chunk.append( new_tuple )
            
            if len(chunk) >= chunk_size:
                ret_rows.append(chunk)
                chunk = []
            i += 1

        if len(chunk) > 0:
            ret_rows.append(chunk)

        return ret_rows