end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

MobileNet , tf2onnx for python 等による類似画像検索 (改)

前回のentryでは、224224サイズの画像ファイルしか扱えませんでしたので 大きなサイズの画像は、224224サイズにタイル分割した上で、 特徴量を算出するようにしました。

( 前回のentryにあった calc_feature(onnx_session, img_path) を 改良しています )

2. 各画像ファイルの特徴量算出

import glob
import os
import numpy as np
import onnxruntime
import PIL.Image

img_base_dir       = os.path.abspath( "./png" )
feature_base_dir   = os.path.abspath( "./feature" )
merged_feature_dir = os.path.abspath( "./merged_feature" )
merge_limit = 200
onnx_model_path = \
    "./mobilenet-v3-tensorflow2-large-100-224-feature-vector-v1.onnx"
#provider = ['CUDAExecutionProvider','CPUExecutionProvider']
# CPUでも十分、短時間で処理できます
provider = ['CPUExecutionProvider']

def main():
    # onnx modelのロード
    onnx_session = onnxruntime.InferenceSession( onnx_model_path,
                                                 providers=provider )

    # 各画像の特徴量算出
    for img_path in glob.glob(img_base_dir + "/**/*.png", recursive=True):
        img_dir_basename = os.path.split( img_path )
        feature_path = img_path.replace(img_base_dir,feature_base_dir)
        feature = calc_feature(onnx_session, img_path)
        #print( len( feature ) )
        
        np.save(feature_path, feature)
        
    # 各特徴量fileを集約
    features  = []
    img_paths = []
    i = 0
    for feature_path in glob.glob(feature_base_dir+"/**/*.npy",recursive=True):
        if len(features) < merge_limit:
            feature = np.load( feature_path )
            features.append(feature)
            img_path = feature_path.replace(feature_base_dir, img_base_dir)
            img_paths.append(img_path)
            continue

        features_path = os.path.join(merged_feature_dir,
                                     "features_{:03d}.npy".format(i) )
        np.save(features_path, features)
        features = []
        img_paths_path = os.path.join(merged_feature_dir,
                                      "img_paths_{:03d}.npy".format(i) )
        np.save(img_paths_path, img_paths)
        img_paths = []
        i += 1
        
def calc_feature(onnx_session, img_path):

    unit_size = 224
    
    image = PIL.Image.open( img_path )
    image = image.crop((40,25,950,610))
    image = image.resize((unit_size*3, unit_size*2))
    
    feature = []
    
    for u_x in [0,1,2]:
        for u_y in [0,1]:
            win_coord = (unit_size*u_x,    unit_size*u_y,
                         unit_size*(u_x+1),unit_size*(u_y+1))

            tmp_img = image.crop( win_coord )
            # tmp_img = tmp_img.convert("L")

            tmp_img = np.array(tmp_img, dtype=np.float32)
            tmp_img = tmp_img / 255

            # model入力に合わせ、1チャンネルのモノクロ画像を3チャンネルに拡張
            tmp_img = np.stack([tmp_img] * 3, axis=-1)
            tmp_feature = onnx_session.run(
                ["feature_vector"],
                {"inputs": np.expand_dims(tmp_img, 0)} )[0][0]
            feature += list(tmp_feature)
    return feature

if __name__ == "__main__":
    main()

3. 類似画像検索

#!/usr/bin/env python3

import os
import sys
import glob
import numpy as np
import onnxruntime
import PIL.Image

merged_feature_dir = os.path.abspath( "./merged_feature" )
merge_limit = 200
onnx_model_path = \
    "./mobilenet-v3-tensorflow2-large-100-224-feature-vector-v1.onnx"
#provider = ['CUDAExecutionProvider','CPUExecutionProvider']
# CPUでも十分、短時間で処理できます
provider = ['CPUExecutionProvider']

def main():
    # onnx modelのロード
    onnx_session = onnxruntime.InferenceSession( onnx_model_path,
                                                 providers=provider )
    
    img_path = "./png/CVAD45-06-000.60.png"
    query_feature = calc_feature(onnx_session, img_path)

    features  = load_merged_features()
    img_paths = load_img_paths()

    # 配列ndarrayを任意のtileに並べる
    query_features = np.tile(query_feature, (len(features), 1))
    #print( query_features )
    
    # 距離算出
    distances = np.linalg.norm(query_features - features, axis=1)
    # print( distances )

    # 類似画像検索の結果出力
    find_limit = 100
    distance_idxs = np.argsort(distances)[:find_limit]

    for idx in distance_idxs:
        print( img_paths[idx], distances[idx] )

    
def load_merged_features():
    ret_datas = []
    for feature_path in glob.glob(merged_feature_dir+"/**/features_*.npy",
                                  recursive=True ):
        ret_datas += list( np.load( feature_path ) )
    return ret_datas

def load_img_paths():
    ret_datas = []
    for imgs_list in glob.glob(merged_feature_dir+"/**/img_paths_*.npy",
                                   recursive=True ):
        ret_datas += list( np.load( imgs_list ) )
    return ret_datas
    
def calc_feature(onnx_session, img_path):

    unit_size = 224
    
    image = PIL.Image.open( img_path )
    image = image.crop((40,25,950,610))
    image = image.resize((unit_size*3, unit_size*2))
    
    feature = []
    
    for u_x in [0,1,2]:
        for u_y in [0,1]:
            win_coord = (unit_size*u_x,    unit_size*u_y,
                         unit_size*(u_x+1),unit_size*(u_y+1))

            tmp_img = image.crop( win_coord )
            # tmp_img = tmp_img.convert("L")

            tmp_img = np.array(tmp_img, dtype=np.float32)
            tmp_img = tmp_img / 255

            # model入力に合わせ、1チャンネルのモノクロ画像を3チャンネルに拡張
            tmp_img = np.stack([tmp_img] * 3, axis=-1)
            tmp_feature = onnx_session.run(
                ["feature_vector"],
                {"inputs": np.expand_dims(tmp_img, 0)} )[0][0]
            feature += list(tmp_feature)
    return feature

if __name__ == "__main__":
    main()