143 lines
3.9 KiB
Python
Executable file
143 lines
3.9 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# nonstdlib requirements: dnspython
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from typing import Dict, List
|
|
|
|
import dns.resolver
|
|
import imaplib
|
|
import os
|
|
import requests
|
|
import smtplib
|
|
import ssl
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
MATRIX_ACCESS_TOKEN = os.environ['MATRIX_ACCESS_TOKEN']
|
|
MATRIX_NOTIFICATION_ROOM = os.environ['MATRIX_NOTIFICATION_ROOM']
|
|
CHECK_INTERVAL = int(sys.argv[1])
|
|
|
|
MONITORED_URLS = [
|
|
|
|
]
|
|
|
|
MONITORED_MAIL = [
|
|
|
|
]
|
|
|
|
HTTPS_PORT = 443
|
|
SMTP_PORT = 25
|
|
SMTPS_PORT = 465
|
|
|
|
def check_tlsa(url: str, port: int, dercert: bytes) -> Dict[str, bytes]:
|
|
cert = x509.load_der_x509_certificate(dercert)
|
|
pubkey = cert.public_key()
|
|
pubkey = pubkey.public_bytes(
|
|
serialization.Encoding.DER,
|
|
serialization.PublicFormat.SubjectPublicKeyInfo
|
|
)
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
digest.update(pubkey)
|
|
keyhash = digest.finalize()
|
|
|
|
port = str(port)
|
|
tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA')
|
|
|
|
return {'cert': tlsa[0].cert, 'dns': keyhash}
|
|
|
|
# check that websites are alive
|
|
def web_check() -> List[str]:
|
|
errors = []
|
|
for url in MONITORED_URLS:
|
|
try:
|
|
res = requests.head(url)
|
|
if not res.ok:
|
|
errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}')
|
|
except Exception as e:
|
|
errors.append(f'{url} check failed: {e}')
|
|
return errors
|
|
|
|
def mail_check() -> List[str]:
|
|
errors = []
|
|
for url in MONITORED_MAIL:
|
|
# check that SMTP(S) is alive
|
|
try:
|
|
with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp:
|
|
res = smtp.noop()
|
|
if res != (250, b'2.0.0 I have sucessfully done nothing'):
|
|
errors.append(f'{url}:{SMTPS_PORT} check returned {res}')
|
|
except Exception as e:
|
|
errors.append(f'{url} SMTPS check failed: {e}')
|
|
try:
|
|
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
|
|
smtp.starttls()
|
|
res = smtp.noop()
|
|
if res != (250, b'2.0.0 I have sucessfully done nothing'):
|
|
errors.append(f'{url}:{SMTP_PORT} check returned {res}')
|
|
except Exception as e:
|
|
errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}')
|
|
|
|
# check that IMAP is alive
|
|
try:
|
|
with imaplib.IMAP4_SSL(url) as imap:
|
|
res = imap.noop()
|
|
if res != ('OK', [b'NOOP completed']):
|
|
errors.append(f'{url} IMAP noop returned {res}')
|
|
except Exception as e:
|
|
errors.append(f'{url} IMAP check failed: {e}')
|
|
|
|
# check that SMTP TLSA records are valid
|
|
try:
|
|
with smtplib.SMTP(url, port=SMTP_PORT) as smtp:
|
|
smtp.starttls()
|
|
dercert = smtp.sock.getpeercert(binary_form=True)
|
|
tlsa_hash = check_tlsa(url, SMTP_PORT, dercert)
|
|
if tlsa_hash['cert'] != tlsa_hash['dns']:
|
|
errors.append(
|
|
f'{url}:{SMTP_PORT} TLSA record \
|
|
{str(tlsa_hash["cert"])} != {str(tlsa_hash["dns"])}'
|
|
)
|
|
except Exception as e:
|
|
errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}')
|
|
|
|
return errors
|
|
|
|
def report_results(errors: List[str]) -> None:
|
|
if not errors:
|
|
errors = 'All systems nominal'
|
|
print(errors)
|
|
|
|
txn_id_nonce = str(int(time.time()))
|
|
url = f'https://conduit.koesters.xyz/_matrix/client/r0/rooms/{MATRIX_NOTIFICATION_ROOM}/send/m.room.message/{txn_id_nonce}'
|
|
header_dict = {
|
|
'Accept': 'application/json',
|
|
'Authorization' : f'Bearer {MATRIX_ACCESS_TOKEN}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
body = f'"msgtype":"m.text", "body":"{errors}"'
|
|
body = '{' + body + '}'
|
|
try:
|
|
res = requests.put(url, data=body, headers=header_dict)
|
|
if res.status_code != 200:
|
|
print(res.json())
|
|
subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {res.json()}'])
|
|
except Exception as e:
|
|
print(e)
|
|
subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {e}'])
|
|
|
|
errors = []
|
|
prev_errors = []
|
|
print('Monitoring...')
|
|
while True:
|
|
errors += web_check()
|
|
errors += mail_check()
|
|
if errors != prev_errors:
|
|
report_results(errors)
|
|
prev_errors = errors.copy()
|
|
errors = []
|
|
sys.stdout.flush()
|
|
time.sleep(CHECK_INTERVAL)
|
|
|