scripts/availability-monitor.py
2025-03-18 19:50:58 +01:00

137 lines
3.8 KiB
Python
Executable file

#!/usr/bin/env python3
# nonstdlib deps: apprise, cryptography, dnspython, requests
import imaplib
import os
import smtplib
import sys
import time
from datetime import UTC, datetime
from enum import Enum
import apprise
import dns.resolver
import requests
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
MATRIX_USER = os.environ['MATRIX_USER']
MATRIX_PASSWORD = os.environ['MATRIX_PASSWORD']
MATRIX_ROOM = os.environ['MATRIX_ROOM']
CHECK_INTERVAL_SEC = int(sys.argv[1])
MONITORED_URLS = [
'https://git.matous.dev',
'https://nextcloud.matous.dev',
'https://mta-sts.matous.dev/.well-known/mta-sts.txt',
]
MONITORED_MAIL = ['mx1.matous.dev']
HTTPS_PORT = 443
SMTP_PORT = 25
SMTPS_PORT = 465
class SmtpStatus(Enum):
OK = 250
def check_tlsa(url: str, port: int, cert: x509.Certificate) -> dict[str, bytes]:
pubkey = cert.public_key()
pubkey_bytes = pubkey.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
digest = hashes.Hash(hashes.SHA256())
digest.update(pubkey_bytes)
keyhash = digest.finalize()
tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA')
return {'local': tlsa[0].cert, 'dns': keyhash}
# check that websites are alive
def web_check() -> list[str]:
errors = []
for url in MONITORED_URLS:
try:
res = requests.head(url, timeout=10)
if not res.ok:
errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}')
except Exception as e:
errors.append(f'{url} check failed: {e}')
return errors
def mail_check() -> list[str]:
errors = []
for url in MONITORED_MAIL:
# check that SMTP(S) is alive
try:
with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp:
res = smtp.noop()
if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'):
errors.append(f'{url}:{SMTPS_PORT} check returned {res}')
except Exception as e:
errors.append(f'{url} SMTPS check failed: {e}')
try:
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
smtp.starttls()
res = smtp.noop()
if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'):
errors.append(f'{url}:{SMTP_PORT} check returned {res}')
except Exception as e:
errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}')
# check that IMAP is alive
try:
with imaplib.IMAP4_SSL(url) as imap:
res = imap.noop()
if res != ('OK', [b'NOOP completed']):
errors.append(f'{url} IMAP noop returned {res}')
except Exception as e:
errors.append(f'{url} IMAP check failed: {e}')
# check that SMTP TLSA records are valid
try:
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
smtp.starttls()
dercert = smtp.sock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(dercert)
if cert.not_valid_after_utc < datetime.now(UTC):
errors.append(f'{url} certificate expired {cert.not_valid_after}')
tlsa_hash = check_tlsa(url, SMTP_PORT, cert)
if tlsa_hash['local'] != tlsa_hash['dns']:
errors.append(f'{url}:{SMTP_PORT} TLSA record {tlsa_hash["local"]!s} != {tlsa_hash["dns"]!s}')
except Exception as e:
errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}')
return errors
def report_results(errors: list[str]) -> None:
if not errors:
errors = ['All systems nominal']
errors: str = '\n\n'.join(errors)
print(errors)
apobj = apprise.Apprise()
with apprise.LogCapture(level=apprise.logging.INFO) as output:
apobj.add(f'matrixs://{MATRIX_USER}@nitro.chat/{MATRIX_ROOM}?pass={MATRIX_PASSWORD}')
apobj.add('glib://')
apobj.notify(title='Server status', body=errors)
print(output.getvalue())
errors = []
prev_errors = None
print('Monitoring...')
while True:
errors += web_check()
errors += mail_check()
if errors != prev_errors:
report_results(errors)
prev_errors = errors.copy()
errors = []
sys.stdout.flush()
time.sleep(CHECK_INTERVAL_SEC)