#!/usr/bin/env python # nonstdlib requirements: dnspython from cryptography import x509 from cryptography.hazmat.primitives import serialization, hashes from typing import Dict, List import dns.resolver import imaplib import os import requests import smtplib import ssl import subprocess import sys import time MATRIX_ACCESS_TOKEN = os.environ['MATRIX_ACCESS_TOKEN'] MATRIX_NOTIFICATION_ROOM = os.environ['MATRIX_NOTIFICATION_ROOM'] CHECK_INTERVAL = int(sys.argv[1]) MONITORED_URLS = [ ] MONITORED_MAIL = [ ] HTTPS_PORT = 443 SMTP_PORT = 25 SMTPS_PORT = 465 def check_tlsa(url: str, port: int, dercert: bytes) -> Dict[str, bytes]: cert = x509.load_der_x509_certificate(dercert) pubkey = cert.public_key() pubkey = pubkey.public_bytes( serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo ) digest = hashes.Hash(hashes.SHA256()) digest.update(pubkey) keyhash = digest.finalize() port = str(port) tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA') return {'cert': tlsa[0].cert, 'dns': keyhash} # check that websites are alive def web_check() -> List[str]: errors = [] for url in MONITORED_URLS: try: res = requests.head(url) if not res.ok: errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}') except Exception as e: errors.append(f'{url} check failed: {e}') return errors def mail_check() -> List[str]: errors = [] for url in MONITORED_MAIL: # check that SMTP(S) is alive try: with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp: res = smtp.noop() if res != (250, b'2.0.0 I have sucessfully done nothing'): errors.append(f'{url}:{SMTPS_PORT} check returned {res}') except Exception as e: errors.append(f'{url} SMTPS check failed: {e}') try: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() res = smtp.noop() if res != (250, b'2.0.0 I have sucessfully done nothing'): errors.append(f'{url}:{SMTP_PORT} check returned {res}') except Exception as e: errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}') # check that IMAP is alive try: with imaplib.IMAP4_SSL(url) as imap: res = imap.noop() if res != ('OK', [b'NOOP completed']): errors.append(f'{url} IMAP noop returned {res}') except Exception as e: errors.append(f'{url} IMAP check failed: {e}') # check that SMTP TLSA records are valid try: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() dercert = smtp.sock.getpeercert(binary_form=True) tlsa_hash = check_tlsa(url, SMTP_PORT, dercert) if tlsa_hash['cert'] != tlsa_hash['dns']: errors.append( f'{url}:{SMTP_PORT} TLSA record \ {str(tlsa_hash["cert"])} != {str(tlsa_hash["dns"])}' ) except Exception as e: errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}') return errors def report_results(errors: List[str]) -> None: if not errors: errors = 'All systems nominal' print(errors) txn_id_nonce = str(int(time.time())) url = f'https://conduit.koesters.xyz/_matrix/client/r0/rooms/{MATRIX_NOTIFICATION_ROOM}/send/m.room.message/{txn_id_nonce}' header_dict = { 'Accept': 'application/json', 'Authorization' : f'Bearer {MATRIX_ACCESS_TOKEN}', 'Content-Type': 'application/json' } body = f'"msgtype":"m.text", "body":"{errors}"' body = '{' + body + '}' try: res = requests.put(url, data=body, headers=header_dict) if res.status_code != 200: print(res.json()) subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {res.json()}']) except Exception as e: print(e) subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {e}']) errors = [] prev_errors = [] print('Monitoring...') while True: errors += web_check() errors += mail_check() if errors != prev_errors: report_results(errors) prev_errors = errors.copy() errors = [] sys.stdout.flush() time.sleep(CHECK_INTERVAL)