end0tknr's kipple - web写経開発

太宰府天満宮の狛犬って、妙にカワイイ

pythonによる工程計画の自動作成(AI?) - その2

pythonによる工程計画の自動作成 - end0tknr's kipple - web写経開発

上記 entry の続きです。

先程までは、プロジェクト自体?の休日を考慮し、 各タスクの着手日、完工日を算出していましたが、 更に、作業者のアサイン可否を考慮してみた。

平行する複数プロジェクトの工程計画など、 実務に使用するには、機能強化すべき点はありますが、今回はここまで。

$ python3 build_project.py
2022-12-26 00:00:00 2023-04-05 00:00:00
1       内装    None    2022-12-26      2023-02-02
1.1     内装    8       2022-12-26      2023-01-10
1.2     外構    0       2023-01-11      2023-01-11
1.3     電気    5       2023-01-11      2023-01-19
1.4     外構    1       2023-01-20      2023-01-20
1.5     外構    8       2023-01-23      2023-02-02
2       外構    None    2023-02-03      2023-03-03
2.1     内装    6       2023-02-03      2023-02-10
2.2     電気    3       2023-02-13      2023-02-15
2.3     電気    0       2023-02-13      2023-02-13
2.4     外構    5       2023-02-13      2023-02-17
2.5     外構    8       2023-02-17      2023-03-03
3       電気    None    2023-03-06      2023-04-05
3.1     内装    9       2023-03-06      2023-03-16
3.2     電気    9       2023-03-17      2023-03-30
3.3     内装    None    2023-03-20      2023-04-05
3.3.1   外構    6       2023-03-20      2023-03-28
3.3.2   外構    5       2023-03-29      2023-04-05
3.3.3   外構    3       2023-03-29      2023-03-31

外構 2023-01-30 ['1.5', '-', '-']
外構 2023-02-21 ['2.4', '-']
<略>
内装 2022-12-26 []
内装 2022-12-27 ['1.1', '-']
<略>
電気 2023-01-12 ['1.2', '-', '-']
電気 2023-03-29 ['3.3.3']
<略>

↓こう書くと、↑こう表示されます

build_project.py

# -*- coding: utf-8 -*-
import datetime
import os
import sys
sys.path.append( os.path.join(os.path.dirname(__file__), './lib') )
from project import Project
from team    import Team

def main():
    proj = Project()

    teams = {}
    man_days = proj.man_days_group_by_type()
    for type, man_days in man_days.items():
        proj.teams[type] = Team(type)
    
    start_date,goal_date = proj.calc_project_period("2022-12-25")
    print( start_date,goal_date )
    for task_id, task in proj.tasks.items():

        disp_str = "%s\t%s\t%s\t%s\t%s" % \
            (task.id,
             task.type,
             task.man_days,
             task.start_date.strftime('%Y-%m-%d'),
             task.goal_date.strftime('%Y-%m-%d'))
        print( disp_str )

    for type, team in proj.teams.items():
        for date, assign in team.assigns.items():
            print(type, date.strftime('%Y-%m-%d'), assign )

if __name__ == '__main__':
    main()

lib/project.py

# -*- coding: utf-8 -*-
import datetime
import dateutil.parser
import uuid
import sys
from bizcalendar import Calendar
from task        import Task

#┌1 ────────┐┌2 ───────┐┌3 ─────────┐
#│1.1┳1.2┳1.4━1.5┝┥2.1┳2.2┳2.5 ┳┝┥3.1┳3.2 ━━━━┳ │
#│   ┗1.3┛        ││   ┣2.3┛    ┃││   ┗3.3.1┳3.3.2┫ │
#│                  ││   ┗2.4━━━┛││          ┗3.3.3┛ │
#└─────────┘└────────┘└──────────┘
default_tasks = [
    # id     pre task      parent
    ["1",    {},           ""    ],
    ["1.1",  {},           "1"   ],
    ["1.2",  {"1.1"},      "1"   ],
    ["1.3",  {"1.1"},      "1"   ],
    ["1.4",  {"1.2","1.3"},"1"   ],
    ["1.5",  {"1.4"},      "1"   ],
    ["2",    {"1"},        ""    ],
    ["2.1",  {},           "2"   ],
    ["2.2",  {"2.1"},      "2"   ],
    ["2.3",  {"2.1"},      "2"   ],
    ["2.4",  {"2.1"},      "2"   ],
    ["2.5",  {"2.2","2.3"},"2"   ],
    ["3",    {"2"},        ""    ],
    ["3.1",  {},           "3"   ],
    ["3.2",  {"3.1"},      "3"   ],
    ["3.3",  {"3.1"},      "3"   ],
    ["3.3.1",{},           "3.3" ],
    ["3.3.2",{"3.3.1"},    "3.3" ],
    ["3.3.3",{"3.3.1"},    "3.3" ],
]
max_man_days = 100       # 無限loopを避ける為

class Project():
    def __init__(self):
        self.id           = uuid.uuid1()
        self.calendar     = Calendar()
        self.start_date   = None
        self.goal_date    = None
        self.tasks        = {}
        self.teams        = {}
        self.max_man_days = max_man_days

        for task_info in default_tasks:
            task = Task( task_info[0],task_info[1],task_info[2] )
            self.tasks[task.id] = task

        # 先行&後続、親&子 の関係を双方向list化
        for task_id, task in self.tasks.items():
            # 後続taskとして登録
            for pre_id in task.pre_ids:
                self.tasks[pre_id].next_ids.add( task_id )

            # root taskに親taskはありません
            if not task.parent_id:
                continue
            self.tasks[task.parent_id].child_ids.add( task_id )
            
        # 子taskがある場合、工数は子taskに依存
        for task_id, task in self.tasks.items():
            if len( task.child_ids ):
                task.man_days = None

    def calc_project_period(self,start_date_str):
        self.start_date = dateutil.parser.parse( start_date_str )
        root_task_ids = self.find_root_task_ids()
        start_date,goal_date = self.calc_tasks_period( root_task_ids )
        return start_date,goal_date

    def calc_tasks_period(self,org_task_ids):
        func_name = sys._getframe().f_code.co_name
        
        ret_start_date = None
        ret_goal_date  = None
        task_ids = self.find_first_task_ids(org_task_ids)
        
        while len(task_ids):
            next_ids = set()
            
            for task_id in task_ids:
                task = self.tasks[task_id]
                start_goal_date = self.calc_task_period(task)
                task.start_date = start_goal_date[0]
                task.goal_date  = start_goal_date[1]

                if not ret_start_date or task.start_date < ret_start_date:
                    ret_start_date = task.start_date
                if not ret_goal_date or ret_goal_date < task.goal_date:
                    ret_goal_date = task.goal_date
                    
                next_ids.update( task.next_ids )
            task_ids = self.select_next_task(next_ids)
        return ret_start_date, ret_goal_date

    def select_next_task(self,next_ids):
        task_ids = set()
        for next_id in next_ids:
            next_task = self.tasks[next_id]
            goal_dates = []
            for pre_id in next_task.pre_ids:
                pre_task = self.tasks[pre_id]
                
                if self.tasks[pre_id].goal_date:
                    goal_dates.append(pre_task.goal_date)

            if len(goal_dates) == len(next_task.pre_ids):
                task_ids.add(next_id)

            next_task.start_date = max(goal_dates) + datetime.timedelta(days=1)
        return task_ids
                
    def get_default_start_date(self, task):
        func_name = sys._getframe().f_code.co_name
        
        goal_dates = []
        # 先行taskがある場合
        for pre_id in task.pre_ids:
            pre_task = self.tasks[pre_id]
            
            if not pre_task.goal_date:
                return None
            goal_dates.append( pre_task.goal_date )
            
        if len( goal_dates ):
            start_date = max(goal_dates) + datetime.timedelta(days=1)
            return start_date

        # 親taskがある場合
        if task.parent_id:
            parent_task = self.tasks[task.parent_id]
            return self.get_default_start_date( parent_task )

        # root taskを参照
        return self.start_date
            
    def calc_task_period(self, task):
        func_name = sys._getframe().f_code.co_name
        
        default_start_date = self.get_default_start_date( task )

        # 子taskがある場合
        if len(task.child_ids):
            return self.calc_tasks_period(task.child_ids)
        
        start_date = self.calc_task_start_date(task, default_start_date)
        if not start_date:
            return [None,None]

        team = self.teams[ task.type ]

        assign_dates = team.assign_dates(self, task, start_date)
        return assign_dates
        # goal_date = self.calc_task_goal_date(task, start_date)
        # return [start_date,goal_date]
        
    def is_biz_day(self, date):
        return self.calendar.is_biz_day( date )
    
    def calc_task_start_date(self, task, start_date):
        ret_date = start_date
        while (ret_date - start_date).days < self.max_man_days:
            if self.is_biz_day( ret_date ):
                return ret_date
            
            ret_date = ret_date + datetime.timedelta(days=1)
        return None

    # def calc_task_goal_date(self, task, start_date):
    #     business_days = 0
    #     ret_date = start_date
    #     while (ret_date - start_date).days < self.max_man_days:
    #         if self.is_biz_day( ret_date ):
    #             business_days += 1
    #         if task.man_days <= business_days:
    #             return ret_date
    #         ret_date = ret_date + datetime.timedelta(days=1)
    #     return None
    
    def find_first_task_ids(self, task_ids):
        ret_ids = set()
        for task_id in task_ids:
            
            task = self.tasks[task_id]
            can_add = True
            
            for pre_id in task.pre_ids:
                pre_task = self.tasks[pre_id]
                
                if pre_task.parent_id == task.parent_id:
                    can_add = False
                    break
            if can_add == True:
                ret_ids.add(task_id)
        return ret_ids
                
    def find_root_task_ids(self):
        root_task_ids = []
        for id, task in self.tasks.items():
            if not task.parent_id and len(task.pre_ids) == 0:
                root_task_ids.append( id )
        return root_task_ids

    def man_days_group_by_type(self):
        ret_datas = {}
        for i, task in self.tasks.items():
            if not task.type in ret_datas.keys():
                ret_datas[task.type] = 0

            if task.man_days:
                ret_datas[task.type] += task.man_days
        return ret_datas

lib/task.py

# -*- coding: utf-8 -*-
import random
import uuid

class Task():
    def __init__(self,id, pre_ids,parent_id):
        if id:
            self.id = id
        else:
            self.id = uuid.uuid1()
            
        self.pre_ids    = pre_ids    # 前工程
        self.next_ids   = set()      # 後工程
        self.parent_id  = parent_id  # 親工程(1コ)
        self.child_ids  = set()      # 子工程
        self.start_date = None
        self.goal_date  = None
        self.progress   = 0          # 進捗 %
        # 標準工数 人日
        self.man_days   = random.randint(0, 10)
        # 職種
        self.type       = ["電気","内装","外構"][random.randint(0, 2)]

lib/bizcalendar.py

# -*- coding: utf-8 -*-
import datetime
import jpholiday

class Calendar():
    def __init__(self):
        pass

    def is_biz_day(self,date):
        if date.weekday() >= 5 or jpholiday.is_holiday(date):
            return False
        return True

lib/team.py

# -*- coding: utf-8 -*-
import datetime
import random
import uuid
from bizcalendar import Calendar

workers_size = 3

# 職人を抱える工務店? と考えてください
class Team():
    
    def __init__(self, type):
        self.id       = uuid.uuid1()
        self.calendar = Calendar()
        self.type     = type # 電気, 内装, 外構
        self.assigns  = {}   # ココへ、日別x職人xtaskを登録

    def assign_dates(self, proj, task, start_date):
        assign_dates = {} # アサインできる日付群
        tmp_date = start_date

        while (tmp_date - start_date).days < proj.max_man_days:
            # プロジェクト自体の定休日
            if not proj.is_biz_day( tmp_date ):
                tmp_date = tmp_date + datetime.timedelta(days=1)
                continue

            woker_no = self.chk_assign_date(tmp_date)
            # アサインできない場合
            if woker_no == None:
                tmp_date = tmp_date + datetime.timedelta(days=1)
                continue
            assign_dates[ tmp_date ] = woker_no

            if len( assign_dates ) >= task.man_days:
                break
            
            tmp_date = tmp_date + datetime.timedelta(days=1)

        if len( assign_dates ) < task.man_days:
            return []

        self.commit_assign_dates(task, assign_dates )
        
        ret_start_date = min(assign_dates)
        ret_goal_date  = max(assign_dates)
        return [ret_start_date,ret_goal_date]

    def commit_assign_dates(self, task, assign_dates ):
        for date, woker_no in assign_dates.items():
            self.assigns[date][woker_no] = task.id
        
    def chk_assign_date(self, date):
        # 予定が未定の場合、予定を初期化
        if not date in self.assigns:
            self.assigns[date] = self.init_assigns( date )
            
        for i, assign in enumerate( self.assigns[date] ):
            if assign == "-":
                return i
        return None
    
    def init_assigns(self,date):
        if not self.is_biz_day(date):
            return [] # 定休日
        
        # "-"を空きのある作業者と考えます
        workers = []
        for i in range( random.randint(0,workers_size) ):
            workers.append("-")
        return workers
    
    def is_biz_day(self, date):
        return self.calendar.is_biz_day( date )