autodesk fusion用 mcpサーバー には、2025年6月時点で GitHub - ArchimedesCrypto/fusion360-mcp-server がありますが、利用できるコマンド(tool)が、 四角形の描画や押出し等、10コのみでしたので、pythonで自作してみました。
目次
全体構成と、ポイント
claude desktop + claude_desktop_config.json
┃
(mcp)
┃
mcp_fusion_api.py
┃
(http)
┃
┌ api_server.py───────────────┐
│ ┃ │
│fastapi │
│ ┣━(job queue実行の場合 )━┓ │
│(即時実行の場合) sqlite │
└─╂─────────────╂──────┘
api直接呼出し custom event+main thread経由
┣━━━━━━━━━━━━━┛
fusion python api addin
┃
fusion本体
claude desktop からの http request送信
claude desktop の mcp は、httpでの外部アクセスに対応していないようです。 そこで、claude desktop の mcp では、mcp_fusion_api.py を呼び出し、 この mcp_fusion_api.py から httpxを用い、fusion 側へ http request送信しています。
fusion 側での http request受信
一方の fusion側は 上記のhttp requestを受信する為、 fusion内のpython に pip install fastapi uvicorn し、 fastapiから fusionのpython apiを実行しています。
尚、fusion内 python への pip installは、以前の autodesk fusion for win への外部moduleの pip install - end0tknr's kipple - web写経開発 のentryに記載している通りです。
job queue での fusionのpython api実行
今回、fusionのpython apiを用い、外部fileをインポートする際等、 fusion自体が固まることがありました。
どうやら、fusion側のスレッド管理が原因のようでしたので、 一部の機能は、adsk.core.CustomEventHandler を用い、job queue 実行しています。
尚、jobの管理には、sqliteを使用しています。
mcpから利用可能な機能
四角形の描画や押出し、コピー、反転、スクリーンショットのapiを mcpから利用可能にしています。詳細は後述のsrcをご覧ください。
尚、利用可能な機能は今後も追加予定で、動作テストはスモークテスト程度しか おこなっていません。
参考url
https://help.autodesk.com/view/fusion360/ENU/?guid=GUID-7B5A90C8-E94C-48DA-B16B-430729B734DC
自作した各script
c:/Users/end0t/AppData/Roaming/Claude/claude_desktop_config.json
{ "mcpServers": { "fusion": { "command": "C:\\Users\\end0t\\miniconda3\\python.exe", "args": [ "c:\\Users\\end0t\\dev\\FUSION\\mcp_fusion_api.py" ] }, "playwright": { "command": "npx", "args": [ "@playwright/mcp@latest" ] }, "filesystem": { "command": "npx", "args": [ "-y", "@modelcontextprotocol/server-filesystem", "C:/Users/end0t/dev/FUSION", "C:/inetpub/wwwroot/fusion" ] } } }
c:/Users/end0t/dev/FUSION/mcp_fusion_api.py
from mcp.server.fastmcp import FastMCP import httpx import inspect, traceback import logging import sys import typing remote_host = "http://localhost:5000" # c:/Users/end0t/AppData/Roaming/Claude/logs 以下にログ出力されます logger = logging.getLogger(__name__) handler = logging.StreamHandler(sys.stderr) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.DEBUG) mcp = FastMCP("FusionApi", debug=True, log_level="DEBUG") def main(): mcp.run(transport="stdio") @mcp.tool() def active_occur(): """ activeなoccurrenceの名称を返す。 引数: なし 返り値: occurrence名称 """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def active_compo(): """ activeなcomponentの名称を返す。 引数: なし 返り値: component名称 """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_occur(): """ 新規 occurrence兼component 追加後、そのoccurrenceをactiveにする。 引数: なし 返り値: occurrence名称 """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def save_doc_as(file_name:str,folder_path:typing.List[str]): """ 現在のdocumentに名前をつけて保存。 引数: file_name: document名称 folder_path: 保存するdocumentのpathを構成する各階層folder名の配列. []を渡す場合、root folderに保存 Returns: result:処理結果(ok/ng)、name:保存したdocument名称 """ with httpx.Client() as client: data = {"file_name":file_name, "folder_path":folder_path} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def save_doc(): """ 現在のdocumentを上書き保存。 引数: なし Returns: result: 処理結果(ok/ng)、name: 保存したdocument名称 """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def import_doc(file_path:typing.List[str]): """ fusion cloudにある documentを 現在の componentへimport。 引数: file_path: fusion cloudにある documentのpath Returns: importしたdocumentのcomponent名称 """ with httpx.Client() as client: data = {"file_path":file_path} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) @mcp.tool() def add_sketch( sketch_dir=[0,0,1], body_name=None ): """ activeなcomponetへ 新規sketch 追加 引数: sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 body_name: 指定した場合、そのbodyの面にsketchを追加する。 返り値: sketch名称 """ with httpx.Client() as client: data = {"sketch_dir":sketch_dir} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_hollow_rect( sketch_name:str, size=[100,100], t:int=5, r:int=0, rect_type=None, position=[0,0]): """ sketch面へ 中空四角形を追加 引数: size: 外周サイズ. [幅mm, 高さmm] t: 外周四角と内周四角の距離 mm r: フィレットの半径 mm sketch_name: 描くsketchの名称 rect_type: "center_point"とした場合、四角の中心を基準に描画 position: 描画の基準点 返り値: 追加した中空四角形のtoken """ with httpx.Client() as client: data = {"size":size, "t":t, "r":r, "sketch_name":sketch_name, "rect_type":rect_type, "position":position} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_hollow_circle( sketch_name:str, r:int, t:int, position=[0,0] ): """ sketch面へ 中空の円を追加 引数: r: 外周円の半径 mm t: 外周と内周の距離 mm sketch_name: 描くsketchの名称 position: 描画の基準点 返り値: 追加した円のtoken """ with httpx.Client() as client: data = {"r":r, "t":t, "sketch_name":sketch_name, "position":position} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_rect( sketch_name:str, size=[100,100], rect_type=None, position=[0,0]): """ sketch面へ 四角形を追加 引数: size: 外周サイズ. [幅mm, 高さmm] sketch_name: 描くsketchの名称 rect_type: "center_point"とした場合、四角の中心を基準に描画 position: 描画の基準点 返り値: 追加した四角形のtoken """ with httpx.Client() as client: data = {"size":size, "sketch_name":sketch_name, "rect_type":rect_type, "position":position } method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_circle( r:int, sketch_name:str, position=[0,0] ): """ sketch面へ 円を追加 引数: r: 半径 mm sketch_name: 描くsketchの名称 position: 描画の基準点 返り値: 追加した円のtoken """ with httpx.Client() as client: data = {"r":r, "sketch_name":sketch_name, "position":position} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_polyline( sketch_name:str, points:typing.List ): """ sketch面へ ポリラインを追加 引数: points: ポリラインを構成する点の各座標(x mm, y mm) sketch_name: sketch名称 返り値: 追加した円のtoken """ with httpx.Client() as client: data = {"points":points, "sketch_name":sketch_name} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def extrude( profile_token:str , d:int ): """ sketch面に描いたprofileを押し出し、body化する 引数: profile_token: profileのtoken d: 押し出し高さ mm 返り値: 押し出しにより立体化したbodyの名称 """ with httpx.Client() as client: data = {"profile_token":profile_token, "d":d } method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_dist_dim( profile_token:str, dir:str="h" ): """ sketch面に描いたprofileへ距離寸法線を追加。 引数: profile_token: profileのtoken dir: 寸法線向き。h:水平寸法、v:垂直寸法。 返り値: 作成された寸法線の token """ with httpx.Client() as client: data = {"profile_token":profile_token, "dir":dir } method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_pipe(r:int, d:int, t:int=5, sketch_dir=[0,0,1], body_name=None ): """ パイプの追加 引数: r: パイプの外側半径 mm d: パイプの長さ mm t: パイプの肉厚 mm sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称 """ with httpx.Client() as client: data = {"r":r, "d":d, "t":t, "sketch_dir":sketch_dir} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_box(size=[100,100,100], sketch_dir=[0,0,1] ): """ 直方体の追加。 引数: size: サイズ. [幅mm, 高さmm, 直方体の高さ mm]で構成されるList sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称と、bodyのextrude元となったsketch profile のtoken """ with httpx.Client() as client: data = {"size":size, "sketch_dir":sketch_dir} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_board(size=[100,100,100], sketch_dir=[0,0,1] ): """ 板の追加。 引数: size: サイズ. [幅mm, 高さmm, 厚さ mm] sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称と、bodyのextrude元となったsketch profile のtoken """ return add_box(size, sketch_dir ) @mcp.tool() def add_rect_pipe(size=[100,100,100], t:int=5, r:int=0, sketch_dir=[0,0,1] ): """ 四角パイプの追加 引数: size: サイズ. [断面幅mm, 断面高さmm, 高さ mm] t: パイプの肉厚 mm r: 外側断面の角の丸め mm sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称 """ with httpx.Client() as client: data = {"size":size, "t":t, "r":r, "sketch_dir":sketch_dir} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def add_circular_board(size=[100,100], sketch_dir=[0,0,1] ): """ 円形の板の追加 引数: size: サイズ. [半径mm,厚さmm] sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称 """ return add_cylinder(size, sketch_dir ) @mcp.tool() def add_cylinder(size=[100,100], sketch_dir=[0,0,1] ): """ 円柱の追加 引数: size: サイズ. [半径mm,長さmm] sketch_dir: 面の向き。未指定の場合、[0,0,1]。 [0,0,1]:xy水平面、[0,1,0]:xz垂直面、[1,0,0]:yz垂直面 返り値: body名称 """ with httpx.Client() as client: data = {"size":size, "sketch_dir":sketch_dir} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def body_bounding_box( name:str ): """ bodyのbounding box座標を取得。 Args: name: body名称 Returns: min: 最小座標 (x,y,z) max: 最大座標 (x,y,z) """ with httpx.Client() as client: data = {"name":name } method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) @mcp.tool() def move_body(name:str, dist=(0,0,0)): """ bodyを移動する Args: name: 移動対象のbody名称 dist: x,y,z方向の各移動量 mm Returns: 移動対象のbody名称 """ with httpx.Client() as client: data = {"name":name, "dist":dist} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def rotate_body(name:str, degree=(0,0,0), center=(0,0,0) ): """ bodyを回転する。 Args: name: 回転対象のbody名称 degree: x,y,z軸方向それぞれの回転角度。 center: 回転中心座標 Returns: 回転対象のbody名称 """ with httpx.Client() as client: data = {"name":name, "degree":degree, "centor":centor} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def del_body(name:str) -> bool: """ 立体を削除する。 Args: name: 削除対象のbody名称 Returns: True: 成功 False: 失敗 """ with httpx.Client() as client: data = {"name":name} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) res = res_json.json() return res_json.json() @mcp.tool() def add_const_point( point=(0,0,0) ): """ construction pointを追加。 Args: point: construction pointを追加する座標。 Returns: 追加した construction point の名称 """ with httpx.Client() as client: data = {"name":name, "dist":dist} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) return res_json.json() @mcp.tool() def subtract_body(taraget_name:str,tool_name:str) -> str: """ 立体から、立体を差し引く。 Args: target_name: 差し引かれる立体の名前 tool_name: 差し引く立体の名前 Returns: 差し引かれ残った立体の名前 """ with httpx.Client() as client: data = {"target_name":taraget_name, "tool_name":tool_name} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) res = res_json.json() return res_json.json() @mcp.tool() def screenshot() -> str: """ スクリーンショットを取得。 Args: なし Returns: スクリーンショットを保存した画像ファイルのurl """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) res = res_json.json() return res_json.json() @mcp.tool() def undo() ->bool: """ 直前の操作の取り消し。 Args: なし Returns: True: 成功、False: 失敗 """ with httpx.Client() as client: data = {} method = inspect.currentframe().f_code.co_name res_json = client.post(remote_host+"/"+method, json=data) res = res_json.json() return res_json.json() if __name__ == "__main__": main()
c:/Users/end0t/AppData/Roaming/Autodesk/Autodesk Fusion 360/API/AddIns/McpServer/McpServer.py
from . import api_server def run(context): api_server.start_servers() def stop(context): api_server.stop_servers()
c:/Users/end0t/AppData/Roaming/Autodesk/Autodesk Fusion 360/API/AddIns/McpServer/api_server.py
from datetime import datetime, timedelta import adsk.core, adsk.fusion, adsk.cam import asyncio import fastapi, uvicorn import inspect, traceback import json import math import pydantic import random import re import sqlite3 import threading import time import typing import httpx import logging.config import os, sys app_base_dir = os.path.dirname(os.path.abspath(__file__)) app_data_dir = "c:/Users/end0t/dev/FUSION" # screenshotされた画像をllm+playwrightに読ませる為 app_img_dir = ["c:/inetpub/wwwroot/fusion","http://localhost/fusion"] conf = { "log": { "version": 1, "disable_existing_loggers": False, "root": { "level": "INFO", "handlers": ["logFileHandler"] }, "handlers": {"logFileHandler": { "class": "logging.FileHandler", "level": "DEBUG", "formatter": "logFileFormatter", "filename": app_data_dir+"/"+os.path.basename(__file__)+".log", "mode": "a", "encoding": "utf-8" } }, "formatters": { "logFileFormatter": { "format": "%(asctime)s\t%(levelname)s\t%(filename)s"+ "\tL%(lineno)d\t%(funcName)s\t%(message)s", "datefmt": '%Y/%m/%d %H:%M:%S' } } }, "fastapi": {"host":"127.0.0.1","port":5000,"log_level":"info"}, "sqlite" : app_data_dir +'/fusion.sqlite', "default_pj" : "Admin Project" } logging.config.dictConfig( conf["log"] ) logger = logging.getLogger() # userからのrequest受付用. fusionのsub threadとして動作 fapi_server = None fapi_thread = None # app.documents.item(?).activate()等のuiに影響する操作?は # sub threadでは処理できない為、custom event で main threadへ委譲 # https://help.autodesk.com/view/fusion360/ENU/?guid=GUID-85edd118-c2a4-11e6-b401-3417ebc87622 thread_handlers = {} thread_stop_flag = None custom_event_ids = ['job_queue'] fusion_app = adsk.core.Application.get() def start_servers(): try: api_server = MyApiServer() api_server.start_fastapi() job_q = JobQueue() job_q.init_db() api_server.start_thread_handler() logger.info("DONE") except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def stop_servers(): try: api_server = MyApiServer() api_server.stop_fastapi() api_server.stop_thread_handler() logger.info("DONE") except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) class MyApiServer(): def __init__(self): pass def start_fastapi(self): try: my_fast_api = FastApiRouter() fapi = fastapi.FastAPI() fapi.include_router( my_fast_api.router ) global fapi_server, fapi_thread tmp_conf = {"app":fapi} | conf["fastapi"] fapi_server = uvicorn.Server( config=uvicorn.Config(**tmp_conf) ) def run(): asyncio.run( fapi_server.serve() ) fapi_thread = threading.Thread(target=run, daemon=True) fapi_thread.start() except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def stop_fastapi(self): try: if fapi_server and fapi_server.started: fapi_server.should_exit = True fapi_thread.join( timeout=5 ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def start_thread_handler(self): try: global thread_handlers, thread_stop_flag for event_id in custom_event_ids: custom_event = fusion_app.registerCustomEvent(event_id) handler = MyEventHandler(event_id) custom_event.add(handler) thread_handlers[event_id] = {'handler': handler, 'event' : custom_event } thread_stop_flag = threading.Event() myThread = MyThread(thread_stop_flag) myThread.start() except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def stop_thread_handler(self): try: for event_id, data in thread_handlers.items(): data['event'].remove(data['handler']) fusion_app.unregisterCustomEvent(event_id) if thread_stop_flag: thread_stop_flag.set() except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) class MyApi(): def __init__(self): pass def active_occur(self): """ root component が activeの場合、Noneを返す """ try: design = adsk.fusion.Design.cast(fusion_app.activeProduct) active_occur = design.activeOccurrence return active_occur except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def activate_occur(self, name): """ occurrence / component を active化 """ try: occur = self.find_occur_by_name( name ) return occur except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def active_compo(self): occur = self.active_occur() if occur != None: return occur.component design = adsk.fusion.Design.cast(fusion_app.activeProduct) return design.rootComponent def add_occur(self): try: parent_compo = self.active_compo() new_occur = parent_compo.occurrences.addNewComponent( adsk.core.Matrix3D.create() ) new_occur.isGroundToParent = False new_occur.activate() new_compo = new_occur.component # 配下のcomponentのnameを occurrence も共有する為 # occurrence自身には name を setできない new_compo.name = str( time.time() ) return new_occur except Exception as e: logger.error("%s %s" %(e,traceback.fourmat_exc())) def save_doc_as(self, file_name, folder_path=[]): try: folder = self.get_folder_by_path(folder_path) result = fusion_app.activeDocument.saveAs(file_name, folder, description="", tag="") if result == True: return fusion_app.activeDocument except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def save_doc(self): try: result = fusion_app.activeDocument.save( description="" ) if result == True: return fusion_app.activeDocument except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def get_plane(self, plane_dir=[0,0,1], body_name=None ): try: if body_name != None: body = self.find_body_by_name( body_name ) for i in range( body.faces.count ): face = body.faces.item(i) # 中心点における法線ベクトル算出 evaluator = face.evaluator center_point = face.pointOnFace success, uvs = evaluator.getParameterAtPoint(center_point) success, normal = evaluator.getNormalAtParameter(uvs) if [normal.x, normal.y, normal.z] == plane_dir: return face return compo = self.active_compo() if plane_dir==[0,0,1]: return compo.xYConstructionPlane if plane_dir==[0,1,0]: return compo.xZConstructionPlane if plane_dir==[1,0,0]: return compo.yZConstructionPlane except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_sketch(self, sketch_dir=[0,0,1], body_name=None ): try: compo = self.active_compo() plane = self.get_plane(sketch_dir, body_name ) sketch = compo.sketches.add( plane ) sketch.name = str( time.time() ) return sketch except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def get_sketch(self, sketch_name): try: compo = self.active_compo() for sketch in compo.sketches: if sketch.name != sketch_name: continue return sketch except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_rect(self, size, sketch_name, rect_type=None, position=[0,0]): try: sketch = self.get_sketch(sketch_name) lines = sketch.sketchCurves.sketchLines line = None if rect_type=="center_point": line = lines.addCenterPointRectangle( adsk.core.Point3D.create( *position,0 ), adsk.core.Point3D.create( position[0]+size[0]/2, # 幅 position[1]+size[1]/2, # 高さ 0) ) else: line = lines.addTwoPointRectangle( adsk.core.Point3D.create(*position,0), adsk.core.Point3D.create(position[0]+size[0], # 幅 position[1]+size[1], # 高さ 0) ) profile = sketch.profiles.item(0) return profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_circle(self, r, sketch_name, position=[0,0]): try: sketch = self.get_sketch(sketch_name) circles = sketch.sketchCurves.sketchCircles circle = circles.addByCenterRadius( adsk.core.Point3D.create( *position,0 ), r ) profile = sketch.profiles.item(0) return profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_pipe(self, r, d, t, sketch_dir=[0,0,1], body_name=None): try: sketch = self.add_sketch(sketch_dir,body_name) # 中空のprofileですので、profile.profileLoops.count==2です profile = self.add_hollow_circle(r, t, sketch.name) logger.info( profile.entityToken ) compo = self.active_compo() extrudes = compo.features.extrudeFeatures ext_input = extrudes.createInput( profile, adsk.fusion.FeatureOperations.NewBodyFeatureOperation ) distance = adsk.core.ValueInput.createByReal(d) ext_input.setDistanceExtent(False, distance) ext = extrudes.add(ext_input) body = ext.bodies.item(0) body.name = str( time.time() ) return body, profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def extrude(self, profile_token, d): try: entity = fusion_app.activeProduct.findEntityByToken( profile_token ) profile= adsk.fusion.Profile.cast( entity[0] ) compo = self.active_compo() extrudes = compo.features.extrudeFeatures ext_input = extrudes.createInput( profile, adsk.fusion.FeatureOperations.NewBodyFeatureOperation ) distance = adsk.core.ValueInput.createByReal(d) ext_input.setDistanceExtent(False, distance) ext = extrudes.add(ext_input) body = ext.bodies.item(0) body.name = str( time.time() ) return body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_dist_dim(self, profile_token, dir="h"): try: entity = fusion_app.activeProduct.findEntityByToken(profile_token ) profile= adsk.fusion.Profile.cast( entity[0] ) min_co = [None,None,0] max_co = [None,None,0] for loop in profile.profileLoops: for curve in loop.profileCurves: for point in [curve.geometry.startPoint, curve.geometry.endPoint ]: if min_co[0] == None or point.x < min_co[0]: min_co[0] = point.x if max_co[0] == None or max_co[0] < point.x: max_co[0] = point.x if min_co[1] == None or point.y < min_co[1]: min_co[1] = point.y if max_co[1] == None or max_co[1] < point.y: max_co[1] = point.y dim_orient = None text_offset= 100 text_point = None if dir=="v": # 垂直方向の寸法線 dim_orient = \ adsk.fusion.DimensionOrientations.VerticalDimensionOrientation text_point = adsk.core.Point3D.create( max_co[0] +text_offset, (min_co[1]+max_co[1]) /2, 0 ) else: # 水平方向の寸法線 dim_orient = \ adsk.fusion.DimensionOrientations.HorizontalDimensionOrientation text_point = adsk.core.Point3D.create( (min_co[0]+max_co[0]) /2, min_co[1] -text_offset, 0 ) sketch = profile.parentSketch sketch.areDimensionsShown = True sketch.isVisible = True points = sketch.sketchPoints p1 = points.add(adsk.core.Point3D.create(*min_co) ) p2 = points.add(adsk.core.Point3D.create(*max_co) ) dim = sketch.sketchDimensions.addDistanceDimension( p1, p2, dim_orient, text_point ) #dim.isVisible = True return dim except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_box(self, size, sketch_dir=[0,0,1], body_name=None, rect_type=None, position=[0,0] ): try: sketch = self.add_sketch(sketch_dir, body_name) profile = self.add_rect(size[:2], sketch.name,rect_type,position) body = self.extrude(profile.entityToken, size[2] ) return body, profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_cylinder(self, size, # [半径,押出し長さ] sketch_dir=[0,0,1], body_name=None, position=[0,0] ): try: sketch = self.add_sketch(sketch_dir, body_name) profile = self.add_circle(size[0], sketch.name, position) body = self.extrude(profile.entityToken, size[1]) return body, profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_rect_pipe(self, size, # [幅,高さ,押出し長さ], t, r, # 厚さ、アール sketch_dir=[0,0,1], body_name=None): try: sketch = self.add_sketch(sketch_dir, body_name) # 中空のprofileですので、profile.profileLoops.count==2です profile = self.add_hollow_rect(size, t, r, sketch.name) body = self.extrude(profile.entityToken, d) return body, profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_hollow_rect(self, size, t, r, sketch_name, rect_type=None, position=[0,0]): """ 中空の四角 """ try: sketch = self.get_sketch(sketch_name) lines = sketch.sketchCurves.sketchLines line = None if rect_type=="center_point": line = lines.addCenterPointRectangle( adsk.core.Point3D.create( *position,0 ), adsk.core.Point3D.create( position[0]+size[0]/2, position[1]+size[1]/2, 0)) else: line = lines.addTwoPointRectangle( adsk.core.Point3D.create(*position,0), adsk.core.Point3D.create(position[0]+size[0], position[1]+size[1], 0) ) if r> 0: # 面取り self.fillet_rect_lines(sketch, line, r) # 中空化 self.offset_rect_lines(sketch, line, size, t) profile = sketch.profiles.item(0) return profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_hollow_circle(self, r, t, sketch_name, position=[0,0]): """ 中空の円 """ try: sketch = self.get_sketch(sketch_name) circles = sketch.sketchCurves.sketchCircles circle = circles.addByCenterRadius( adsk.core.Point3D.create(*position,0), r) if t> 0: # 中空化 self.offset_circle_line(sketch, circle, t) profile = sketch.profiles.item(0) return profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def offset_circle_line(self, sketch, circle_line, t): try: curves = sketch.findConnectedCurves( circle_line ) dir_point = adsk.core.Point3D.create(0, 0, 0) sketch.offset(curves, dir_point, t ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def offset_rect_lines(self, sketch, rect_lines, size, t): try: curves = sketch.findConnectedCurves( rect_lines[0] ) dir_point = adsk.core.Point3D.create(size[0]/2.0, size[1]/2.0, 0) sketch.offset(curves, dir_point, t ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def fillet_rect_lines(self, sketch, rect_lines, r): try: arcs = sketch.sketchCurves.sketchArcs arcs.addFillet( rect_lines[0],rect_lines[0].endSketchPoint.geometry, rect_lines[1],rect_lines[1].startSketchPoint.geometry, r) arcs.addFillet( rect_lines[1],rect_lines[1].endSketchPoint.geometry, rect_lines[2],rect_lines[2].startSketchPoint.geometry, r) arcs.addFillet( rect_lines[2],rect_lines[2].endSketchPoint.geometry, rect_lines[3],rect_lines[3].startSketchPoint.geometry, r) arcs.addFillet( rect_lines[3],rect_lines[3].endSketchPoint.geometry, rect_lines[0],rect_lines[0].startSketchPoint.geometry, r) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_polyline(self, points, sketch_name ): try: sketch = self.get_sketch(sketch_name) lines = sketch.sketchCurves.sketchLines point3ds = [ adsk.core.Point3D.create(x, y, 0) for (x, y) in points ] pre_line = None tmp_lines = [] for i, point in enumerate(points): if i == 0: start_co = adsk.core.Point3D.create(points[i][0], points[i][1], points[i][2]) end_co = adsk.core.Point3D.create(points[i+1][0], points[i+1][1], points[i+1][2]) pre_line = lines.addByTwoPoints(start_co,end_co) elif i+1 == len(points): start_co = pre_line.endSketchPoint end_co = tmp_lines[0].startSketchPoint lines.addByTwoPoints(start_co,end_co) else: start_co = pre_line.endSketchPoint end_co = adsk.core.Point3D.create(points[i+1][0], points[i+1][1], points[i+1][2]) pre_line = lines.addByTwoPoints(start_co,end_co) profile = sketch.profiles.item(0) return profile except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def add_const_point(self, point=(0,0,0) ): """ construction point """ try: compo = self.active_compo() const_points = compo.constructionPoints const_input = const_points.createInput() const_input.setByPoint( adsk.core.Point3D.create(*point) ) const_point = const_points.add( const_input ) const_point.name = str( time.time() ) return const_point except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def rotate_compo(self, degree=(0,0,0), centor=(0,0,0)): try: occur = self.active_occur() rotation = adsk.core.Matrix3D.create() if degree[0] != 0: # x軸まわりの回転 rotation.setToRotation(math.radians( degree[0] ), adsk.core.Vector3D.create(1, 0, 0), adsk.core.Point3D.create(*centor)) new_trans = occur.transform.copy() new_trans.transformBy(rotation) occur.transform = new_trans if degree[1] != 0: # y軸まわりの回転 rotation.setToRotation(math.radians( degree[1] ), adsk.core.Vector3D.create(0, 1, 0), adsk.core.Point3D.create(*centor)) new_trans = occur.transform.copy() new_trans.transformBy(rotation) occur.transform = new_trans if degree[2] != 0: # z軸まわりの回転 rotation.setToRotation(math.radians( degree[2] ), adsk.core.Vector3D.create(0, 0, 1), adsk.core.Point3D.create(*centor)) new_trans = occur.transform.copy() new_trans.transformBy(rotation) occur.transform = new_trans return occur except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def find_body_by_name(self, name, compo_name=None): try: compo = self.active_compo() if compo_name!= None: design = adsk.fusion.Design.cast(fusion_app.activeProduct) for tmp_compo in design.allComponents: if tmp_compo.name == compo_name: compo = tmp_compo break for body in compo.bRepBodies: if body.name == name: return body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def set_joint(self, compo_1_name,point_1_name, compo_2_name,point_2_name): try: design = adsk.fusion.Design.cast(fusion_app.activeProduct) active_compo = self.active_compo() def get_compo(compo_name): occs = active_compo.occurrences for i in range(occs.count): occ = occs.item(i) comp = occ.component if comp.name == compo_name: return comp def get_const_point(compo,point_name): points = compo.constructionPoints for i in range( points.ocount ): if points.item(i).name == point_name: return points.item(i) compo_1 = get_compo(compo_1_name) compo_2 = get_compo(compo_2_name) point_1 = get_const_point(compo_1, point_1_name) point_2 = get_const_point(compo_2, point_2_name) geo_1 = adsk.fusion.JointGeometry.createByPoint( point_1 ) geo_2 = adsk.fusion.JointGeometry.createByPoint( point_2 ) joints = active_compo.joints joint_input = joints.createInput(geo_1, geo_2) joint_input.setAsRigidJointMotion() joint = joints.add(joint_input) return active_compo except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def subtract_body(self, target_name, tool_name): try: target_body = self.find_body_by_name( target_name ) tool_body = self.find_body_by_name( tool_name ) compo = self.active_compo() feats = compo.features.combineFeatures collect = adsk.core.ObjectCollection.create() collect.add(tool_body) input = feats.createInput(target_body, collect) input.operation = \ adsk.fusion.FeatureOperations.CutFeatureOperation input.isKeepToolBodies = False # tool_body を削除 feats.add( input ) return target_body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def body_bounding_box(self, name ): try: body = self.find_body_by_name( name ) b_box = body.boundingBox ret_data = {"min":[round( b_box.minPoint.x, 5), round( b_box.minPoint.y, 5 ), round( b_box.minPoint.z, 5 )], "max":[round( b_box.maxPoint.x, 5 ), round( b_box.maxPoint.y, 5 ), round( b_box.maxPoint.z, 5 ) ] } return ret_data except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def mirror_occur(self, axis=[0,1,0] ): """ activeなoccurrenceをミラー """ try: occur = self.active_occur() compo = occur.component bbox = occur.boundingBox center = adsk.core.Point3D.create( (bbox.minPoint.x + bbox.maxPoint.x) / 2, (bbox.minPoint.y + bbox.maxPoint.y) / 2, (bbox.minPoint.z + bbox.maxPoint.z) / 2) base_plane = None if axis == [1, 0, 0]: base_plane = compo.xYConstructionPlane elif axis == [0, 1, 0]: base_plane = compo.xZConstructionPlane elif axis == [0, 0, 1]: base_plane = compo.yZConstructionPlane planes = compo.constructionPlanes axis_vec = adsk.core.Vector3D.create(*axis) axis_vec.normalize() origin_point = base_plane.geometry.origin offset_vector = adsk.core.Vector3D.create(center.x - origin_point.x, center.y - origin_point.y, center.z - origin_point.z) offset = offset_vector.dotProduct(axis_vec) offset_val = adsk.core.ValueInput.createByReal(offset) offset_input = planes.createInput() offset_input.setByOffset(base_plane, offset_val) plane = planes.add( offset_input ) org_bodies = adsk.core.ObjectCollection.create() org_body_names = set() for body in compo.bRepBodies: org_body_names.add( body.name ) org_bodies.add( body ) # ミラー処理 mirror_feats = compo.features.mirrorFeatures mirror_input = mirror_feats.createInput(org_bodies, plane) mirror_feat = mirror_feats.add(mirror_input) # 元のbodyを削除します。尚、reversed()がないと元のbodyが削除されません for body in reversed(compo.bRepBodies): if not body.name in org_body_names: copied_body = self.copy_body( body.name ) body.deleteMe() plane.deleteMe() return occur except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def mirror_body(self, name, axis=[0,1,0] ): try: body = self.find_body_by_name(name) compo = self.active_compo() min_pt = body.boundingBox.minPoint max_pt = body.boundingBox.maxPoint center_x = (min_pt.x + max_pt.x) / 2 center_y = (min_pt.y + max_pt.y) / 2 center_z = (min_pt.z + max_pt.z) / 2 center_point = adsk.core.Point3D.create(center_x, center_y, center_z) origin_plane = None if axis == [1, 0, 0]: origin_plane = compo.xYConstructionPlane elif axis == [0, 1, 0]: origin_plane = compo.xZConstructionPlane elif axis == [0, 0, 1]: origin_plane = compo.yZConstructionPlane vector = adsk.core.Vector3D.create(*axis) vector.normalize() origin_point = origin_plane.geometry.origin origin_to_center = adsk.core.Vector3D.create( center_x - origin_point.x, center_y - origin_point.y, center_z - origin_point.z ) offset = origin_to_center.dotProduct(vector) # オフセット平面を作成 planes = compo.constructionPlanes offset_input = planes.createInput() offset_value = adsk.core.ValueInput.createByReal(offset) offset_input.setByOffset(origin_plane, offset_value) plane = planes.add(offset_input) # Mirror 処理 collect = adsk.core.ObjectCollection.create() collect.add(body) mirror_input = compo.features.mirrorFeatures.createInput(collect, plane) mirror_feat = compo.features.mirrorFeatures.add(mirror_input) #time.sleep(1) # mirror_feats.add()直後のname変更が反映しない為 tmp_mirrored = mirror_feat.bodies.item(0) tmp_mirrored.name = str( time.time() ) # 元のbodyを削除する為、copy mirrored = self.copy_body(tmp_mirrored.name) tmp_mirrored.deleteMe() plane.deleteMe() body.deleteMe() return mirrored except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def is_intersect_body(self, name_1, name_2): try: body_1 = self.find_body_by_name(name_1) body_2 = self.find_body_by_name(name_2) compo = self.active_compo() input = compo.features.combineFeatures.createInput(body_1, body_2) input.operation = adsk.fusion.FeatureOperations.IntersectFeatureOperation input.isKeepToolBodies = True input.isNewComponent = False result = compo.features.combineFeatures.add( input ) intersected_body = None if result.bodies.count > 0: result.deleteMe() return True return False except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def is_intersect_compo(self, name_1, name_2): try: parent_compo = self.active_compo() compos_1 = self.find_compo_by_compo_name( name_1 ) compos_2 = self.find_compo_by_compo_name( name_2 ) bodies_1 = compos_1[0].bRepBodies bodies_2 = compos_2[0].bRepBodies feats = parent_compo.features.combineFeatures for body_1 in bodies_1: for body_2 in bodies_2: input = feats.createInput(body_1, body_2) input.operation = \ adsk.fusion.FeatureOperations.IntersectFeatureOperation input.isKeepToolBodies = True input.isNewComponent = False result = feats.add( input ) if result.bodies.count > 0: # 干渉あり result.deleteMe() return True return False except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def move_body(self,name, dist): try: body = self.find_body_by_name(name) compo = self.active_compo() vector = adsk.core.Vector3D.create( *dist ) transform = adsk.core.Matrix3D.create() transform.translation = vector moveFeats = compo.features.moveFeatures collection = adsk.core.ObjectCollection.create() collection.add(body) moveInput = moveFeats.createInput(collection, transform) moveFeats.add(moveInput) return body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def rotate_body(self, name, degree=[0,0,0], centor=[0,0,0]): try: transform = adsk.core.Matrix3D.create() center_3d = adsk.core.Point3D.create( *centor ) if degree[0] != 0: rot_x = adsk.core.Matrix3D.create() rot_x.setToRotation( math.radians(degree[0] ), adsk.core.Vector3D.create(1, 0, 0), center_3d ) transform.transformBy( rot_x ) if degree[1] != 0: rot_y = adsk.core.Matrix3D.create() rot_y.setToRotation( math.radians(degree[1] ), adsk.core.Vector3D.create(0, 1, 0), center_3d ) transform.transformBy(rot_y) if degree[2] != 0: rot_z = adsk.core.Matrix3D.create() rot_z.setToRotation( math.radians(degree[2]), adsk.core.Vector3D.create(0, 0, 1), center_3d ) transform.transformBy( rot_z ) compo = self.active_compo() body = self.find_body_by_name(name) collection = adsk.core.ObjectCollection.create() collection.add(body) move_feats = compo.features.moveFeatures move_input = move_feats.createInput( collection, transform ) move_feats.add( move_input ) return body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def del_body(self, name ): try: body = self.find_body_by_name( name ) body.deleteMe() return body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def copy_body(self, body_name ): try: body = self.find_body_by_name(body_name) compo = self.active_compo() copied = compo.features.copyPasteBodies.add( body ) # 以下はcopy直後にnameを変更しても反映されない為 time.sleep(1) last_body = compo.bRepBodies.item(compo.bRepBodies.count - 1) last_body.name = str( time.time() ) time.sleep(1) return last_body except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def get_project(self, pj_name=conf["default_pj"]): for tmp_pj in fusion_app.data.dataProjects: if tmp_pj.name == pj_name: return tmp_pj def import_doc(self, file_path): target_pj = self.get_project( conf["default_pj"] ) for tmp_pj in fusion_app.data.dataProjects: if tmp_pj.name == pj_name: target_pj = tmp_pj break if target_pj == None: logger.warning( "not found project %s" % (pj_name) ) return target_file = self.get_file_by_path(file_path,target_pj.rootFolder) if target_file == None: logger.warning( "not found file %s" % ("/".join(file_path)) ) return to_doc = fusion_app.activeDocument des: adsk.fusion.Design = \ to_doc.products.itemByProductType('DesignProductType') to_compo = des.activeComponent new_occ = to_compo.occurrences.addByInsert( target_file, adsk.core.Matrix3D.create(), False) new_compo = new_occ.component new_compo.name = str( time.time() ) return new_compo def get_folder_by_path(self, path, target_folder=None): if target_folder == None: target_pj = self.get_project( conf["default_pj"] ) if len(path) ==0: return target_pj.rootFolder target_folder = target_pj.rootFolder folder_name = path.pop(0) for folder in target_folder.dataFolders: if folder.name == folder_name: if len(path) > 0: return self.get_file_by_path(file_path, folder) return folder def get_file_by_path(self, file_path, target_folder): if len(file_path) > 1: folder_name = file_path.pop(0) for folder in target_folder.dataFolders: if folder.name == folder_name: return self.get_file_by_path(file_path, folder) logger.warning( "fail find() %s" % (" > ".join(file_path)) ) return None if len(file_path) == 1: file_name = file_path.pop(0) files = target_folder.dataFiles for i in range(0, files.count): if str(files.item(i).name) == file_name: return files.item(i) logger.warning( "fail find() %s" % (" > ".join(file_path)) ) return None def find_occur_by_name(self, name, base_compo=None): if base_compo==None: design = adsk.fusion.Design.cast(fusion_app.activeProduct) base_compo = design.rootComponent occurs = base_compo.occurrences for i in range( occurs.count ): occur = occurs.item(i) if occur.component.name == target_name: return occur for i in range( occurs.count ): occur = occurs.item(i) found_occur = self.find_occur_by_name(name, occur.component) if found_occur != None: return found_occur def get_occurrence_by_compo_path(self, compo_path, base_compo=None): target_name = compo_path.pop(0) if base_compo==None: design = adsk.fusion.Design.cast(fusion_app.activeProduct) base_compo = design.rootComponent occs = base_compo.occurrences for i in range(occs.count): occ = occs.item(i) comp = occ.component if comp.name != target_name: continue if len(compo_path) > 0 : return self.get_compo_by_path(compo_path, comp) return occ def find_compo_by_compo_name(self, target_name ): design = adsk.fusion.Design.cast(fusion_app.activeProduct) ret_datas = [] for comp in design.allComponents: if comp.name == target_name: ret_datas.append(comp) return ret_datas def get_compo_by_path(self, compo_path, target_compo=None): target_name = compo_path.pop(0) if target_compo==None: design = adsk.fusion.Design.cast(fusion_app.activeProduct) target_compo = design.rootComponent occs = target_compo.occurrences for i in range(occs.count): occ = occs.item(i) comp = occ.component if comp.name != target_name: continue if len(compo_path) > 0 : return self.get_compo_by_path(compo_path, comp) return comp def undo(self): try: ui = fusion_app.userInterface ui.commandDefinitions.itemById('UndoCommand').execute() return True except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return False def screenshot(self,angle:str="isometric"): try: app = adsk.core.Application.get() ui = app.userInterface viewport = app.activeViewport camera = viewport.camera camera.isFitView = True camera.target = adsk.core.Point3D.create(0, 0, 0) org_style = fusion_app.activeViewport.visualStyle fusion_app.activeViewport.visualStyle = 5 # wire frame # isometric ( default ) camera.eye = adsk.core.Point3D.create(100, -100, 100) camera.upVector = adsk.core.Vector3D.create(0, 0, 1) if angle=="top": camera.eye = adsk.core.Point3D.create(0, 0, 100) camera.upVector = adsk.core.Vector3D.create(0, 1, 0) elif angle=="bottom": camera.eye = adsk.core.Point3D.create(0, 0, -100) camera.upVector = adsk.core.Vector3D.create(0, 1, 0) elif angle=="back": camera.eye = adsk.core.Point3D.create(0, 100, 0) camera.upVector = adsk.core.Vector3D.create(0, 0, 1) elif angle=="right": camera.eye = adsk.core.Point3D.create(100, 0, 0) camera.upVector = adsk.core.Vector3D.create(0, 0, 1) elif angle=="left": camera.eye = adsk.core.Point3D.create(-100, 0, 0) camera.upVector = adsk.core.Vector3D.create(0, 0, 1) elif angle=="isometric_reverse": camera.eye = adsk.core.Point3D.create(100, 100, -100) camera.upVector = adsk.core.Vector3D.create(0, 0, 1) viewport.camera = camera file_name = str( time.time() ) +".png" img_path = app_img_dir[0] + "/"+ file_name img_url = app_img_dir[1] + "/"+ file_name viewport.saveAsImageFile(img_path,800,600) fusion_app.activeViewport.visualStyle = org_style return img_url except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) class FastApiRouter(): def __init__(self): rtr = fastapi.APIRouter() for name, member in \ inspect.getmembers(self,predicate=inspect.ismethod): if name[0] == "_": # 「_」で始まるmethodは対象外 continue rtr.add_api_route("/"+name, member, methods=["POST"]) #rtr.add_api_route("/hello", self.hello, methods=["POST"]) self.router = rtr async def hello(self): try: return {"result":"ok" ,"message":"Hello FUSON 360 PYTHON API !"} except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def copy_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_sketch(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_const_point(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def extrude(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_rect_pipe(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_pipe(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def rotate_compo(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def set_joint(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def mirror_occur(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name req_body = await req.json() return JobQueue().do_job(method_name, req_body) #return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def mirror_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def move_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def rotate_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def del_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def subtract_body(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_req( req, method_name, {"name":"name"} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def _handle_req(self, req, method_name, ret_attrs): try: req_body = await req.json() api_method = getattr(MyApi(), method_name) tmp_ret = api_method(**req_body) ret_datas = {"result": "ok" } for attr_key, attr_disp in ret_attrs.items(): ret_datas[attr_disp] = getattr(tmp_ret, attr_key) return ret_datas except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result": "ng", "message": str(e)} async def add_cylinder(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_add_body_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_box(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_add_body_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def _handle_add_body_req(self, req, method_name): try: req_body = await req.json() api_method = getattr(MyApi(), method_name) body, profile = api_method(**req_body) return {"result": "ok", "body_name" : body.name, "profile_token": profile.entityToken } except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result": "ng", "message": str(e)} async def screenshot(self,req:fastapi.Request): try: req_body = await req.json() img_url = MyApi().screenshot(**req_body) return {"result":"ok","url":img_url } except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result": "ng", "message": str(e)} async def body_bounding_box(self,req:fastapi.Request): try: req_body = await req.json() ret_data = MyApi().body_bounding_box(**req_body) return {"result":"ok", **ret_data} except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result": "ng", "message": str(e)} async def add_rect(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_circle(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_hollow_rect(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_hollow_circle(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_polyline(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def add_dist_dim(self, req: fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name return await self._handle_profile_req(req, method_name) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def _handle_profile_req(self,req:fastapi.Request,method:str): try: req_body = await req.json() api_method = getattr(MyApi(), method) profile = api_method(**req_body) return {"result":"ok", "token":profile.entityToken } except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result": "ng", "message": str(e)} async def active_occur(self): """ root componentが activeの場合、NGを返す """ try: occur = MyApi().active_occur() return {"result":"ok", "name":occur.component.name } except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result":"ng","message":str(e)} async def add_occur(self): """ activeな occurrence/component以下に追加し、active化します """ try: method_name = inspect.currentframe().f_code.co_name return JobQueue().do_job(method_name, {} ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def import_doc(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name req_body = await req.json() return JobQueue().do_job(method_name, req_body) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def save_doc_as(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name req_body = await req.json() return JobQueue().do_job(method_name, req_body) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def save_doc(self, req:fastapi.Request): try: method_name = inspect.currentframe().f_code.co_name req_body = await req.json() return JobQueue().do_job(method_name, req_body) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) async def undo(self): try: result = MyApi().undo() if result == True: return {"result":"ok"} except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result":"ng"} class MyEventHandler(adsk.core.CustomEventHandler): """" 共通event handlerのbase class """ def __init__(self, event_id): super().__init__() self.event_id = event_id def notify(self, args): try: ui = fusion_app.userInterface # SelectCommand とはuserからのコマンド受付待ち状態らしく # 何らかの処理が実行中の場合、SelectCommand に戻す if ui.activeCommand != 'SelectCommand': ui.commandDefinitions.itemById('SelectCommand').execute() if self.event_id != 'job_queue': logger.error("unknown event_id %s" %(self.event_id) ) return my_api = MyApi() job_q = JobQueue() job_info = json.loads( args.additionalInfo ) if job_info["name"] == "open_doc": job_q.set_job_doing( job_info["id"] ) result = my_api.open_doc(job_info["input"]["pj"], job_info["input"]["path"] ) if result == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) return job_q.set_job_done(job_info["id"], {"result":"ok"}) if job_info["name"] == "add_occur": job_q.set_job_doing( job_info["id"] ) new_occur = my_api.add_occur() if new_occur == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) # occurrence名の最後に version?が付加される為 occur_name = re.sub('\:\d+$', '', new_occur.name) return job_q.set_job_done(job_info["id"], {"result":"ok","name":occur_name}) if job_info["name"] == "mirror_occur": job_q.set_job_doing( job_info["id"] ) new_occur = my_api.mirror_occur(**job_info["input"] ) if new_occur == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) # occurrence名の最後に version?が付加される為 occur_name = re.sub('\:\d+$', '', new_occur.name) return job_q.set_job_done(job_info["id"], {"result":"ok","name":occur_name}) if job_info["name"] == "import_doc": job_q.set_job_doing( job_info["id"] ) new_compo = my_api.import_doc(job_info["input"]["path"] ) if new_compo == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) return job_q.set_job_done(job_info["id"], {"result":"ok","name":new_compo.name}) if job_info["name"] == "save_doc_as": job_q.set_job_doing( job_info["id"] ) doc = my_api.save_doc_as( **job_info["input"] ) if doc == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) return job_q.set_job_done(job_info["id"], {"result":"ok","name":doc.name}) if job_info["name"] == "save_doc": job_q.set_job_doing( job_info["id"] ) doc = my_api.save_doc( **job_info["input"] ) if doc == None: return job_q.set_job_done(job_info["id"], {"result":"ng"}) return job_q.set_job_done(job_info["id"], {"result":"ok","name":doc.name}) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) adsk.autoTerminate(False) class MyThread(threading.Thread): def __init__(self, event): threading.Thread.__init__(self) self.stopped = event def run(self): while not self.stopped.wait(1): # 停止flagがない場合、?秒毎に実行 try: new_job = JobQueue().get_new_job() if new_job == None: continue input_json = json.dumps({"id" :new_job["id"], "name" :new_job["name"], "input":new_job["input"] }) fusion_app.fireCustomEvent("job_queue",input_json ) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) class JobQueue(): def __init__(self): self.db_con = self.connect_db() def connect_db(self): try: # 「isolation_level=None」 means AUTOCOMMIT. db_con = sqlite3.connect(conf["sqlite"], isolation_level=None) db_con.row_factory = sqlite3.Row return db_con except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def do_job(self,method_name, req_body): try: # job queue へ登録 job_id = self.set_new_job(method_name,req_body) # job queue経由で結果確認 for i in range(0,20): time.sleep(1) job_info = self.get_job_info(job_id) if job_info!=None and job_info["status"]=="done": return json.loads(job_info["output"]) except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) return {"result":"ng"} def init_db(self): try: db_cur = self.db_con.cursor() drop_sql = "DROP TABLE IF EXISTS job_queue" creae_sql = """\ CREATE TABLE job_queue ( id TEXT PRIMARY KEY, name TEXT, input TEXT, output TEXT, status TEXT, update_time TEXT ) """ db_cur.execute( drop_sql ) db_cur.execute( creae_sql ) return True except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def del_old_jobs(self): try: db_cur = self.db_con.cursor() sql = """\ DELETE FROM job_queue WHERE status='done' AND update_time < ? """ update_time = datetime.now() - timedelta(hours=1) vals = ( update_time.strftime('%Y/%m/%d %H:%M') ) db_cur.execute( sql, vals ) return True except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def set_new_job(self,name,args): try: sql = """\ INSERT INTO job_queue (id, name, input, update_time) VALUES (?,?,?,?) """ vals = (str( time.time() ), name, json.dumps(args), datetime.now().strftime('%Y/%m/%d %H:%M') ) db_cur = self.db_con.cursor() db_cur.execute( sql, vals ) return vals[0] # ID except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def get_new_job(self): try: sql = """ SELECT * FROM job_queue WHERE status IS NULL ORDER BY update_time LIMIT 1 """ db_cur = self.db_con.cursor() db_cur.execute( sql ) ret_data = db_cur.fetchone() if ret_data == None: return None ret_data = dict(ret_data) ret_data["input"] = json.loads(ret_data["input"]) return ret_data except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def get_job_info(self,id): try: sql = """ SELECT * FROM job_queue WHERE id=? """ vals = ( id,) db_cur = self.db_con.cursor() db_cur.execute( sql, vals ) ret_data = db_cur.fetchone() if ret_data == None: return None ret_data = dict(ret_data) ret_data["input"] = json.loads(ret_data["input"]) return ret_data except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def set_job_doing(self,id): try: sql = """ UPDATE job_queue SET status=?, update_time=? WHERE id=? """ vals = ("doing", datetime.now().strftime('%Y/%m/%d %H:%M'), id ) db_cur = self.db_con.cursor() db_cur.execute( sql, vals ) return True except Exception as e: logger.error("%s %s" %(e,traceback.format_exc())) def set_job_done(self,id,output): try: sql = """ UPDATE job_queue SET status=?, output=?, update_time=? WHERE id=? """ vals = ("done", json.dumps(output), datetime.now().strftime('%Y/%m/%d %H:%M'), id ) db_cur = self.db_con.cursor() db_cur.execute( sql, vals ) return True except Exception as e: logger.error("%s %s" %(e,traceback.format_exc()))