end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

LibreDWGのdwgreadコマンド + python で AutoCAD DWGファイルからのtext抽出 - windows版

https://end0tknr.hateblo.jp/entry/20230219/1676760625

同様の内容は以前、上記entryで記載しましたが、今回はwindows版。

libredwg for winは、 https://github.com/LibreDWG/libredwg/releases から、exeを ダウンロードできますので、インストールは簡単ですが windowsの場合、文字コード=cp932となる為か、前回からは少々、修正しています。

#!/usr/bin/python
# -*- coding: utf-8 -*-

from datetime import datetime
import csv
import glob
import json
import logging.config
import os
import re
import subprocess
import sys
import zipfile

CONF = {
    "common" : {
        "scp_cmd" :"scp",
        "rsa_private_key" : "id_rsa.pri",
        "local_tmp_dir"   : "c:/tmp",
    },
    "serial" : {
        "backup_server" : "???.???.???.???",
        "remote_zumens_data" : "/tmp/zumens_*.zip",
        "dwg_read_cmd" : "c:/Users/end0t/local/libredwg-0.12-win64/dwgread.exe",
    },
    "log":{
        'version': 1,
        'loggers': {"mainLogger":
                    {'level':"INFO",'handlers':["mainHandler"]},
                    },
        'handlers': { "mainHandler": {
            'formatter': "mainFormatter",
            'class'    : 'logging.handlers.RotatingFileHandler',
            'filename' : 'extract_txt4bizsearch.log',
            'maxBytes' : 1024*1024, # 1MB
            'backupCount': 30       # rotation
        }},
        'formatters': {  "mainFormatter":{
            "format": '%(asctime)s\t%(levelname)s\t%(message)s',
            "datefmt": '%Y/%m/%d %I:%M:%S'
        }},
    }
}

logging.config.dictConfig(CONF["log"])
logger = logging.getLogger('mainLogger')

def main():
    func_name = sys._getframe().f_code.co_name
    logger.info("START "+func_name)
    serial = Serial()
    serial.extract_txts_from_org_datas()
    
    logger.info("GOAL "+func_name)

class Serial():
    def __init__(self):
        pass

    def extract_txts_from_org_datas(self):
        self.__scp_get_org_data()

        org_data_pattern = "%s/zumens_*.zip" % CONF["common"]["local_tmp_dir"]
        for org_zumen_data in ( glob.glob(org_data_pattern) ):
            self.__extract_txts_from_org_data(org_zumen_data)
                
    def __extract_txts_from_org_data(self,org_zumen_data):
        # with zipfile.ZipFile(org_zumen_data,"r") as zipf:
        #     zipf.extractall(CONF["common"]["local_tmp_dir"])
        
        zumens_list_path = \
            glob.glob( CONF["common"]["local_tmp_dir"] + "/tmp/zumens_*.tsv" )[0]
        zumens_info = self.___load_zumens_list(zumens_list_path)
        for zumen_info in ( zumens_info ):
            #print( zumen_info )
        
            org_zumen_paths = \
                glob.glob( CONF["common"]["local_tmp_dir"]+zumen_info["cad_path"])
            if len(org_zumen_paths) != 1:
                print("WARN not found "+zumen_info["cad_path"], file=sys.stderr)
                continue

            org_zumen_path = org_zumen_paths[0]
            
            if zumen_info["cad_or_image"] == "A":
                txts = self.__extract_from_dwg(org_zumen_path,zumen_info)
                self.__make_html_for_dwg(org_zumen_path, zumen_info ,txts)
            # elif zumen_info["cad_or_image"] == "C":
            #     self.__extract_from_me10(org_zumen_path)
            # elif zumen_info["cad_or_image"] in ("i","I"):
            #     self.__extract_from_tiff(org_zumen_path)
            else:
                tmp_msg = "ERROR bad cad_or_image %s %s" % \
                    (zumen_info["cad_or_image"],zumen_info["cad_path"] )
                print(tmp_msg,file=sys.stderr)
                continue
            print( org_zumen_path )

    def __make_html_for_dwg(self,org_zumen_path, zumen_info ,txts):
        plan_num_full = "%s-%s-%03d" % (zumen_info["plan_num"].strip(),
                                        zumen_info["addition_num"],
                                        int(zumen_info["revision"].strip()) )
        create_date = datetime.strptime(zumen_info["create_day"],"%Y%m%d")
        create_date_str = create_date.strftime("%Y/%m/%d")

        html = serial_html.format(
            name_kanji   =zumen_info["name_kanji"],
            plan_num     =zumen_info["plan_num"].strip(),
            plan_num_full=plan_num_full,
            create_date  =create_date_str,
            size         =self.__calc_file_size(org_zumen_path),
            txts         = " ".join(txts)
        )

    def __calc_file_size(self,org_zumen_path):
        size = os.path.getsize(org_zumen_path)

        for unit in ['B','KB','MB','GB','TB','PB']:
            if abs(size) < 1024.0:
                return "%3.0f%s" % (size, unit)
            size /= 1024.0
        return ""

    def __extract_from_dwg(self,dwg_file_path,zumen_info):
        dwg_objs = self.__read_dwg_file(dwg_file_path)
        if not dwg_objs:
            return []

        txts = []
        for obj in dwg_objs["OBJECTS"]:
            for text_key in ["text_value","text"]:
                if not text_key in obj:
                    continue

                dwg_txt = obj[text_key].strip()
                if text_key == "text":
                    dwg_txt = dwg_txt.lstrip("{")
                    dwg_txt = dwg_txt.rstrip("}")

                if len(dwg_txt) == 0:
                    continue

                txts.append(dwg_txt)
        return txts

    # libredwgが出力するjson strのescapeがイマイチの為
    def __recover_json_str(self,stdout_line):
        # shift_jisの影響か「\上」のような謎のescapeをsanitize
        re_escape =  re.compile( r"(\\)[^a-z]",re.IGNORECASE)
        stdout_line = re_escape.sub("", stdout_line )
        
        repalce_escape = (
            ("\\\\P"," "),
            ('": inf','": "inf"'),
            ('": nan','": "nan"'),
            ('": [ nan, nan, nan ]', '": [ "nan", "nan", "nan" ]'),
            ('": [ nan, nan, 0.0 ]', '": [ "nan", "nan", 0.0 ]'),
            ('": [ nan, nan ]', '": [ "nan", "nan" ]') )
        for replaces in repalce_escape:
            stdout_line = stdout_line.replace(replaces[0],replaces[1])
        return stdout_line

    def __read_dwg_file(self,dwg_file_path):

        cmd = " ".join([CONF["serial"]["dwg_read_cmd"],
                        "--format JSON",
                        dwg_file_path ])

        (stdout,stderr,return_code) = exec_subprocess(cmd)
        if not stdout:
            return None

        new_stdout_lines = []
        stdout_lines = stdout.decode("utf8","ignore").split("\n")
        
        for i, stdout_line in enumerate( stdout_lines ):
            stdout_line = stdout_line.strip()
            if len(stdout_line) == 0:
                continue
                
            stdout_line = self.__recover_json_str(stdout_line)
            
            new_stdout_lines.append(stdout_line)

        stdout = "\n".join(new_stdout_lines)
        try:
            return json.loads( stdout, strict=False)
        except Exception as e:
            print("ERROR",e,dwg_file_path,file=sys.stderr)
        return None

    
    def __extract_from_me10(self,org_zumen_path):
        pass
    def __extract_from_tiff(self,org_zumen_path):
        pass
        
    def ___load_zumens_list(self, zumens_list_path):
        ret_data = []
        with open(zumens_list_path, encoding='utf-8', newline='') as f:
            for cols in csv.DictReader(f, delimiter='\t'):
                ret_data.append(cols)
        return ret_data
        
    def __scp_get_org_data(self):
        cmd = "%s -p -i %s %s:%s %s" % \
            ( CONF["common"]["scp_cmd"],
              CONF["common"]["rsa_private_key"],
              CONF["serial"]["backup_server"],
              CONF["serial"]["remote_zumens_data"],
              CONF["common"]["local_tmp_dir"] )
        # exec_subprocess(cmd)
        print(cmd)
    

# cf. https://qiita.com/fetaro/items/a3b3bd4ea197b600ac45
def exec_subprocess(cmd:str, raise_error=True):
    child = subprocess.Popen( cmd,
                              shell=True,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE )
    stdout, stderr = child.communicate()
    rt = child.returncode
    if rt != 0 and raise_error:
        print("ERROR",stderr,file=sys.stderr)
        return (None,None,None)

    return stdout, stderr, rt

serial_html = """
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>{name_kanji} {plan_num_full}</title>
    <style>
      .ocr_txts {{color:#888; font-size:small;}}
      h2  {{font-size: x-large;}}   div {{padding : 0 10px;}}
    </style>
</head>
<body>
   <h1>{name_kanji}</h1>
   <h2>図面ファイル情報</h2>
   <div>
     図番-付番-版数:{plan_num_full}、 update:{create_date} 、 size:{size}
   </div>
   <div>
     <a href="http://hogehoge.sexy.co.jp/xcan.xcanplan?type=viewzumen&CODE={plan_num}">
       XCAN</a>
     <a href="">図面ダウンロード</a>
   </div>
   <h2>図面ファイルからの抽出文字</h2>
   <div class="ocr_txts">{txts}</div>
</body>
</html>
"""

if __name__ == '__main__':
    main()