diff --git a/README.md b/README.md index 40227cc..c3147fc 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ Low-budget homemade handmade monitoring for homeserver. Status: active use -Dependencies: python, dnspython, Matrix account +Dependencies: python3, apprise, cryptography, dnspython, requests, Matrix account -Usage: Run as a daemon, `/usr/local/bin/availability-monitor.py ` +Usage: Use provided availability-monitor.service as user unit. ## dnf-search-install.py Wrapper, marks already installed packages for `dnf search`. diff --git a/availability-monitor.py b/availability-monitor.py index 5c257fb..6120c2d 100755 --- a/availability-monitor.py +++ b/availability-monitor.py @@ -1,73 +1,77 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 -# nonstdlib requirements: dnspython +# nonstdlib deps: apprise, cryptography, dnspython, requests -from cryptography import x509 -from cryptography.hazmat.primitives import serialization, hashes -from typing import Dict, List - -import dns.resolver import imaplib import os -import requests import smtplib -import ssl -import subprocess import sys import time +from datetime import UTC, datetime +from enum import Enum -MATRIX_ACCESS_TOKEN = os.environ['MATRIX_ACCESS_TOKEN'] -MATRIX_NOTIFICATION_ROOM = os.environ['MATRIX_NOTIFICATION_ROOM'] -CHECK_INTERVAL = int(sys.argv[1]) +import apprise +import dns.resolver +import requests +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization + +MATRIX_USER = os.environ['MATRIX_USER'] +MATRIX_PASSWORD = os.environ['MATRIX_PASSWORD'] +MATRIX_ROOM = os.environ['MATRIX_ROOM'] + +CHECK_INTERVAL_SEC = int(sys.argv[1]) MONITORED_URLS = [ - + 'https://git.matous.dev', + 'https://nextcloud.matous.dev', + 'https://mta-sts.matous.dev/.well-known/mta-sts.txt', ] -MONITORED_MAIL = [ - -] +MONITORED_MAIL = ['mx1.matous.dev'] HTTPS_PORT = 443 SMTP_PORT = 25 SMTPS_PORT = 465 -def check_tlsa(url: str, port: int, dercert: bytes) -> Dict[str, bytes]: - cert = x509.load_der_x509_certificate(dercert) + +class SmtpStatus(Enum): + OK = 250 + + +def check_tlsa(url: str, port: int, cert: x509.Certificate) -> dict[str, bytes]: pubkey = cert.public_key() - pubkey = pubkey.public_bytes( - serialization.Encoding.DER, - serialization.PublicFormat.SubjectPublicKeyInfo - ) + pubkey_bytes = pubkey.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) digest = hashes.Hash(hashes.SHA256()) - digest.update(pubkey) + digest.update(pubkey_bytes) keyhash = digest.finalize() - port = str(port) tlsa = dns.resolver.resolve(f'_{port}._tcp.{url}', 'TLSA') - return {'cert': tlsa[0].cert, 'dns': keyhash} + return {'local': tlsa[0].cert, 'dns': keyhash} + # check that websites are alive -def web_check() -> List[str]: +def web_check() -> list[str]: errors = [] for url in MONITORED_URLS: try: - res = requests.head(url) + res = requests.head(url, timeout=10) if not res.ok: errors.append(f'{url} HEAD returned {res.status_code}: {res.reason}') except Exception as e: errors.append(f'{url} check failed: {e}') return errors -def mail_check() -> List[str]: + +def mail_check() -> list[str]: errors = [] for url in MONITORED_MAIL: # check that SMTP(S) is alive try: with smtplib.SMTP_SSL(url, port=SMTPS_PORT) as smtp: res = smtp.noop() - if res != (250, b'2.0.0 I have sucessfully done nothing'): + if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'): errors.append(f'{url}:{SMTPS_PORT} check returned {res}') except Exception as e: errors.append(f'{url} SMTPS check failed: {e}') @@ -75,7 +79,7 @@ def mail_check() -> List[str]: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() res = smtp.noop() - if res != (250, b'2.0.0 I have sucessfully done nothing'): + if res != (SmtpStatus.OK, b'2.0.0 I have successfully done nothing'): errors.append(f'{url}:{SMTP_PORT} check returned {res}') except Exception as e: errors.append(f'{url}:{SMTP_PORT} SMTP check failed: {e}') @@ -94,42 +98,33 @@ def mail_check() -> List[str]: with smtplib.SMTP(url, port=SMTP_PORT) as smtp: smtp.starttls() dercert = smtp.sock.getpeercert(binary_form=True) - tlsa_hash = check_tlsa(url, SMTP_PORT, dercert) - if tlsa_hash['cert'] != tlsa_hash['dns']: - errors.append( - f'{url}:{SMTP_PORT} TLSA record \ - {str(tlsa_hash["cert"])} != {str(tlsa_hash["dns"])}' - ) + cert = x509.load_der_x509_certificate(dercert) + if cert.not_valid_after_utc < datetime.now(UTC): + errors.append(f'{url} certificate expired {cert.not_valid_after}') + tlsa_hash = check_tlsa(url, SMTP_PORT, cert) + if tlsa_hash['local'] != tlsa_hash['dns']: + errors.append(f'{url}:{SMTP_PORT} TLSA record {tlsa_hash["local"]!s} != {tlsa_hash["dns"]!s}') except Exception as e: errors.append(f'{url}:{SMTP_PORT} TLSA check failed: {e}') - return errors -def report_results(errors: List[str]) -> None: + +def report_results(errors: list[str]) -> None: if not errors: - errors = 'All systems nominal' + errors = ['All systems nominal'] + errors: str = '\n\n'.join(errors) print(errors) - txn_id_nonce = str(int(time.time())) - url = f'https://conduit.koesters.xyz/_matrix/client/r0/rooms/{MATRIX_NOTIFICATION_ROOM}/send/m.room.message/{txn_id_nonce}' - header_dict = { - 'Accept': 'application/json', - 'Authorization' : f'Bearer {MATRIX_ACCESS_TOKEN}', - 'Content-Type': 'application/json' - } - body = f'"msgtype":"m.text", "body":"{errors}"' - body = '{' + body + '}' - try: - res = requests.put(url, data=body, headers=header_dict) - if res.status_code != 200: - print(res.json()) - subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {res.json()}']) - except Exception as e: - print(e) - subprocess.run(['notify-send', f'Sending error report failed.\nPlease run {sys.argv[0]}\nError {e}']) + apobj = apprise.Apprise() + with apprise.LogCapture(level=apprise.logging.INFO) as output: + apobj.add(f'matrixs://{MATRIX_USER}@nitro.chat/{MATRIX_ROOM}?pass={MATRIX_PASSWORD}') + apobj.add('glib://') + apobj.notify(title='Server status', body=errors) + print(output.getvalue()) + errors = [] -prev_errors = [] +prev_errors = None print('Monitoring...') while True: errors += web_check() @@ -139,5 +134,4 @@ while True: prev_errors = errors.copy() errors = [] sys.stdout.flush() - time.sleep(CHECK_INTERVAL) - + time.sleep(CHECK_INTERVAL_SEC) diff --git a/availability-monitor.service b/availability-monitor.service new file mode 100644 index 0000000..ab33c3a --- /dev/null +++ b/availability-monitor.service @@ -0,0 +1,45 @@ +[Unit] +Description=Server services monitoring +After=network-online.target graphical-session.target + +[Service] +Type=simple +EnvironmentFile=%h/.config/private-env/availability-monitor.env +ExecStart=/usr/local/bin/availability-monitor.py 3600 + +AmbientCapabilities= +CapabilityBoundingSet= +InaccessiblePaths=/home /root +KeyringMode=private +LockPersonality=true +MemoryDenyWriteExecute=true +NoNewPrivileges=true +PrivateDevices=true +PrivateIPC=true +PrivateMounts=true +PrivateTmp=true +PrivateUsers=true +ProcSubset=pid +ProtectClock=true +ProtectControlGroups=true +# can't override rw for %t/bus if "true" (completely inaccessible /run) +# rw necessary for notif +ProtectHome=read-only +ProtectHostname=true +ProtectKernelLogs=true +ProtectKernelModules=true +ProtectKernelTunables=true +ProtectProc=noaccess +ProtectSystem=strict +ReadWritePaths=%t/bus +# AF_UNIX for dbus (notifications), net for checking (duh) +RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6 +RestrictNamespaces=true +RestrictRealtime=true +RestrictSUIDSGID=true +SystemCallArchitectures=native +SystemCallFilter=@system-service +UMask=0277 + +[Install] +WantedBy=multi-user.target