scripts/availability-monitor.py
Martin Matous 5a4419bb4e
add scripts
Signed-off-by: Martin Matous <m@matous.dev>
2022-03-28 00:46:25 +02:00

143 lines
3.9 KiB
Python
Executable file

#!/usr/bin/env python
# nonstdlib requirements: dnspython
from cryptography import x509
from cryptography.hazmat.primitives import serialization, hashes
from typing import Dict, List
import dns.resolver
import imaplib
import os
import requests
import smtplib
import ssl
import subprocess
import sys
import time
MATRIX_ACCESS_TOKEN = os.environ['MATRIX_ACCESS_TOKEN']
MATRIX_NOTIFICATION_ROOM = os.environ['MATRIX_NOTIFICATION_ROOM']
CHECK_INTERVAL = int(sys.argv[1])
MONITORED_URLS = [
]
MONITORED_MAIL = [
]
HTTPS_PORT = 443
SMTP_PORT = 25
SMTPS_PORT = 465
def check_tlsa(url: str, port: int, dercert: bytes) -> Dict[str, bytes]:
cert = x509.load_der_x509_certificate(dercert)
pubkey = cert.public_key()
pubkey = pubkey.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
digest = hashes.Hash(hashes.SHA256())
digest.update(pubkey)
keyhash = digest.finalize()
port = str(port)
tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA')
return {'cert': tlsa[0].cert, 'dns': keyhash}
# check that websites are alive
def web_check() -> List[str]:
errors = []
for url in MONITORED_URLS:
try:
res = requests.head(url)
if not res.ok:
errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}')
except Exception as e:
errors.append(f'{url} check failed: {e}')
return errors
def mail_check() -> List[str]:
errors = []
for url in MONITORED_MAIL:
# check that SMTP(S) is alive
try:
with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp:
res = smtp.noop()
if res != (250, b'2.0.0 I have sucessfully done nothing'):
errors.append(f'{url}:{SMTPS_PORT} check returned {res}')
except Exception as e:
errors.append(f'{url} SMTPS check failed: {e}')
try:
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
smtp.starttls()
res = smtp.noop()
if res != (250, b'2.0.0 I have sucessfully done nothing'):
errors.append(f'{url}:{SMTP_PORT} check returned {res}')
except Exception as e:
errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}')
# check that IMAP is alive
try:
with imaplib.IMAP4_SSL(url) as imap:
res = imap.noop()
if res != ('OK', [b'NOOP completed']):
errors.append(f'{url} IMAP noop returned {res}')
except Exception as e:
errors.append(f'{url} IMAP check failed: {e}')
# check that SMTP TLSA records are valid
try:
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
smtp.starttls()
dercert = smtp.sock.getpeercert(binary_form=True)
tlsa_hash = check_tlsa(url, SMTP_PORT, dercert)
if tlsa_hash['cert'] != tlsa_hash['dns']:
errors.append(
f'{url}:{SMTP_PORT} TLSA record \
{str(tlsa_hash["cert"])} != {str(tlsa_hash["dns"])}'
)
except Exception as e:
errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}')
return errors
def report_results(errors: List[str]) -> None:
if not errors:
errors = 'All systems nominal'
print(errors)
txn_id_nonce = str(int(time.time()))
url = f'https://conduit.koesters.xyz/_matrix/client/r0/rooms/{MATRIX_NOTIFICATION_ROOM}/send/m.room.message/{txn_id_nonce}'
header_dict = {
'Accept': 'application/json',
'Authorization' : f'Bearer {MATRIX_ACCESS_TOKEN}',
'Content-Type': 'application/json'
}
body = f'"msgtype":"m.text", "body":"{errors}"'
body = '{' + body + '}'
try:
res = requests.put(url, data=body, headers=header_dict)
if res.status_code != 200:
print(res.json())
subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {res.json()}'])
except Exception as e:
print(e)
subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {e}'])
errors = []
prev_errors = []
print('Monitoring...')
while True:
errors += web_check()
errors += mail_check()
if errors != prev_errors:
report_results(errors)
prev_errors = errors.copy()
errors = []
sys.stdout.flush()
time.sleep(CHECK_INTERVAL)