以下の通りで、ポイントは
- urllib.request による HTTP GET
- 「/usr/sbin/sendmail -t」によるmail送信
- subprocess による標準入力渡し
- python3 の標準moduleで実現OK
かと思います。
#!/usr/bin/python3 # -*- coding: utf-8 -*- import json import os import ssl ssl._create_default_https_context = ssl._create_unverified_context import sys import time, datetime import urllib.request import subprocess from subprocess import PIPE ret_urls = [ "https://ないしょ1.jp", "https://ないしょ2.jp" ] http_timeout = 10 #sec retry_sleep = 10 #sec retry_max = 5 #times chk_history_file \ = os.path.dirname(os.path.abspath(__file__)) + "/chk_alive_urls.history" mail_cond = { "cmd" :"/usr/sbin/sendmail -t", "from":"ないしょ@ないしょ.com", # 宛先複数は「,」で接続 "to" :"ないしょ@ないしょ.com", "subject": "!!!! server has DOWN URL !!!!" } def main(): # 過去のcheck結果をload chk_history = load_chk_history() # 実際にaccessすることで、check for req_url in ret_urls: if not req_url in chk_history: chk_history[req_url] = {'last_check': '', 'repeat_ok' : 0, 'repeat_ng' : 0 } i = 0 while i < retry_max: i += 1 time.sleep( retry_sleep ) datetime_str = \ datetime.datetime.fromtimestamp(time.time()).strftime( '%Y-%m-%d %H:%M' ) res = http_get(req_url) if res: break chk_history[req_url]['last_check'] = datetime_str if res: chk_history[req_url]['repeat_ok'] += 1 chk_history[req_url]['repeat_ng'] = 0 else: chk_history[req_url]['repeat_ok'] = 0 chk_history[req_url]['repeat_ng'] += 1 # check結果を保存 save_chk_history(chk_history) # NGなurlが1つでもあれば、通知メール通知 for req_url in chk_history: if chk_history[req_url]['repeat_ng']: return notify_chk_result(chk_history) def notify_chk_result(chk_history): msg_head_body = "" msg_head_body += "From: " +mail_cond["from"]+"\n" msg_head_body += "To: " +mail_cond["to"]+"\n" msg_head_body += "Subject: "+mail_cond["subject"]+"\n" msg_head_body += "\n" msg_head_body += "PLEASE CHECK BELOW REPEAT_NG !!\n" msg_head_body += json.dumps(chk_history, indent=2) proc = subprocess.run(mail_cond["cmd"], shell=True, input=msg_head_body.encode(), #★要byte列化 stdout=PIPE, stderr=PIPE) def save_chk_history(chk_history): with open(chk_history_file, mode='w') as file: json.dump(chk_history, file, indent=2) def load_chk_history(): if not os.path.exists(chk_history_file): return {} fh = open(chk_history_file,'r') fh_content = fh.read() if len(fh_content) == 0: return {} return json.loads(fh_content) def http_get(req_url): req = urllib.request.Request(req_url) try: res = urllib.request.urlopen(req, data=None, timeout=http_timeout) except urllib.error.HTTPError as err: print("WARN",err.code, req_url,file=sys.stderr) return None except urllib.error.URLError as err: print("ERROR",err.reason, req_url,file=sys.stderr) return None # print("DONE HTTP",res.getcode(), res.geturl() ) # print(res.read() ) # res.close() return res if __name__ == '__main__': main()