画面 | 補足 |
---|---|
|
|
|
毎日16時に実行する場合 |
|
Lambda関数を指定。 |
画面 | 補足 |
---|---|
|
|
|
|
|
以下アクション権限が付いている ポリシーをアタッチ ①logs:DescribeLogStreams ②logs:GetLogEvents |
コード | 補足 |
---|---|
import boto3 import json import datetime import traceback import urllib.request from urllib import parse def lambda_handler(event, context): try: error_flg=False info='' info_detail='■ログストリーム詳細\n' #①CloudWatch logs:DescribeLogStreams #・特定ロググループの2番目に最新のログストリームの情報を取得、辞書型変数d1に格納 #・そのログストリームのイベント年月日が当日のものでなければエラーとする。 d1=boto3.client("logs").describe_log_streams( logGroupName='/aws/lambda/Lambda1', orderBy='LastEventTime', descending=True ) event_timestamp=d1["logStreams"][1]["lastEventTimestamp"] event_time=datetime.datetime.fromtimestamp(event_timestamp/1000)+datetime.timedelta(hours=9) today_time=datetime.datetime.now()+datetime.timedelta(hours=9) if event_time.date()!=today_time.date(): error_flg=True info_detail+='・ログストリームの最終イベント時刻\n' info_detail+=' '+str(event_time.replace(microsecond=0))+'\n\n' #②CloudWatch logs:GetLogEvents #・d1のログイベント情報を取得、辞書型変数d2に格納 d2=boto3.client("logs").get_log_events( logGroupName='/aws/lambda/Lambda1', logStreamName=d1["logStreams"][1]["logStreamName"], startFromHead=True ) #③d2をチェック #・メッセージの中の特定のワードから始まるメッセージが何個あるかチェック、エラー判定 #・※ここでは『「START」1個「END」1個』を満たさない場合、エラーとする。 str_log_events='' i=0 start_qty=0 end_qty=0 for event in d2["events"]: i+=1 msg=event["message"][:40] if '\n' in msg: msg=msg[:msg.find('\n')] event_time=datetime.datetime.fromtimestamp(event["timestamp"]/1000)+datetime.timedelta(hours=9) if i<=100: str_log_events+=(' '+str(event_time.replace(microsecond=0))+','+msg+'\n') if event["message"][:5]=='START': start_qty+=1 elif event["message"][:3]=='END': end_qty+=1 if not (start_qty==1 and end_qty==1): error_flg=True info_detail+='・ログイベント全件のメッセージ集計\n' info_detail+=' 「START」から始まるメッセージ数:'+str(start_qty)+'\n' info_detail+=' 「END」から始まるメッセージ数:'+str(end_qty)+'\n\n' info_detail+='・ログイベントの日時とメッセージ(省略版)(最大100件)\n' info_detail+=str_log_events except: #①②③でエラーが生じた場合の処理 error_flg=True info+='!!!関数実行エラー(ログチェック時)!!!\n' print('■エラー内容') print(traceback.format_exc()) finally: #④通知 #・HTTP POSTを特定のURLに実施 #・ここではBacklogの課題コメント欄へ通知 info+='■監視ログチェック結果\n' if error_flg==False: info+=' 異常なし'+'\n\n' else: info+=' 異常あり'+'\n\n' info+=info_detail apikey='WtvTMoN2o146fJlfxfABGZYwk8zKbpF54zgL2vJHJ4Np5ggjpalXHRygBhjXJLEq' url="https://arm12345.backlog.com/api/v2/issues/TEST1-1/comments?apiKey={}".format(apikey) headers = {'content-type':'application/x-www-form-urlencoded'} try: data={'content':info} req=urllib.request.Request(url,headers=headers,data=parse.urlencode(data).encode(),method='POST') res=urllib.request.urlopen(req) res.close() except: data={'content':'!!!関数実行エラー(通知時)!!!'} req=urllib.request.Request(url,headers=headers,data=parse.urlencode(data).encode(),method='POST') res=urllib.request.urlopen(req) res.close() print('■エラー内容') print(traceback.format_exc()) return |
・エラー(異常あり)条件 以下のいずれかに該当する場合、 「異常あり」「エラー」等を通知する。 1.ストリームの特定イベントの個数が 正常ではない ※本来監視時にチェックすべき事 2.チェックのために取得したストリームの日時が その日の年月日ではない 3.その他(関数実行エラー) ※この場合のエラー詳細は、この関数自体の CloudWatchログに出力される ・1つ1つのイベントの詳細を通知するが、 最大100件とした。 Teamsの場合、 1チャットあたりの容量制限が25KB なので100件程度であれば大丈夫。 ・Teams通知の場合のリクエストパラメータ json形式なので、 dataに以下情報をbytes型にし指定する {"title":~,"text":~} |
実行結果 通知画面 | 補足 |
---|---|
|
・左図は、Backlogの場合 ※Teamsでも恐らく実現可能 ・左図は、異常なしの場合 ※異常ありの場合、 メンション付きで通知 |