#!/usr/bin/env python3 # nonstdlib deps: apprise, cryptography, dnspython, requests import imaplib import os import smtplib import sys import time from datetime import UTC, datetime from enum import Enum import apprise import dns.resolver import requests from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization MATRIX_USER = os.environ['MATRIX_USER'] MATRIX_PASSWORD = os.environ['MATRIX_PASSWORD'] MATRIX_ROOM = os.environ['MATRIX_ROOM'] CHECK_INTERVAL_SEC = int(sys.argv[1]) MONITORED_URLS = [ 'https://git.matous.dev', 'https://nextcloud.matous.dev', 'https://mta-sts.matous.dev/.well-known/mta-sts.txt', ] MONITORED_MAIL = ['mx1.matous.dev'] HTTPS_PORT = 443 SMTP_PORT = 25 SMTPS_PORT = 465 class SmtpStatus(Enum): OK = 250 def check_tlsa(url: str, port: int, cert: x509.Certificate) -> dict[str, bytes]: pubkey = cert.public_key() pubkey_bytes = pubkey.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) digest = hashes.Hash(hashes.SHA256()) digest.update(pubkey_bytes) keyhash = digest.finalize() tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA') return {'local': tlsa[0].cert, 'dns': keyhash} # check that websites are alive def web_check() -> list[str]: errors = [] for url in MONITORED_URLS: try: res = requests.head(url, timeout=10) if not res.ok: errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}') except Exception as e: errors.append(f'{url} check failed: {e}') return errors def mail_check() -> list[str]: errors = [] for url in MONITORED_MAIL: # check that SMTP(S) is alive try: with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp: res = smtp.noop() if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'): errors.append(f'{url}:{SMTPS_PORT} check returned {res}') except Exception as e: errors.append(f'{url} SMTPS check failed: {e}') try: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() res = smtp.noop() if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'): errors.append(f'{url}:{SMTP_PORT} check returned {res}') except Exception as e: errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}') # check that IMAP is alive try: with imaplib.IMAP4_SSL(url) as imap: res = imap.noop() if res != ('OK', [b'NOOP completed']): errors.append(f'{url} IMAP noop returned {res}') except Exception as e: errors.append(f'{url} IMAP check failed: {e}') # check that SMTP TLSA records are valid try: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() dercert = smtp.sock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(dercert) if cert.not_valid_after_utc < datetime.now(UTC): errors.append(f'{url} certificate expired {cert.not_valid_after}') tlsa_hash = check_tlsa(url, SMTP_PORT, cert) if tlsa_hash['local'] != tlsa_hash['dns']: errors.append(f'{url}:{SMTP_PORT} TLSA record {tlsa_hash["local"]!s} != {tlsa_hash["dns"]!s}') except Exception as e: errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}') return errors def report_results(errors: list[str]) -> None: if not errors: errors = ['All systems nominal'] errors: str = '\n\n'.join(errors) print(errors) apobj = apprise.Apprise() with apprise.LogCapture(level=apprise.logging.INFO) as output: apobj.add(f'matrixs://{MATRIX_USER}@nitro.chat/{MATRIX_ROOM}?pass={MATRIX_PASSWORD}') apobj.add('glib://') apobj.notify(title='Server status', body=errors) print(output.getvalue()) errors = [] prev_errors = None print('Monitoring...') while True: errors += web_check() errors += mail_check() if errors != prev_errors: report_results(errors) prev_errors = errors.copy() errors = [] sys.stdout.flush() time.sleep(CHECK_INTERVAL_SEC)