sysalert/sysalert/util.py
2024-07-13 20:32:11 +02:00

93 lines
2.7 KiB
Python

import configparser
import datetime
import importlib
import os
import sqlite3
import sys
import systemd.journal
import sysalert.db
config_file='/etc/sysalert.ini'
db_file='/var/lib/sysalert/sysalert.db'
def _test_env():
# return True if all MONITOR environment variables are set, indicating
# utility is started as a handler for OnSuccess= or OnError=
return all([x in os.environ for x in (
'MONITOR_SERVICE_RESULT',
'MONITOR_EXIT_CODE',
'MONITOR_EXIT_STATUS',
'MONITOR_INVOCATION_ID',
'MONITOR_UNIT',
)])
def register_exit(config, db):
service_name=sys.argv[1]
if service_name in config.sections():
section_name = service_name
else:
section_name = config.default_section
alert_method = config.get(section_name, 'alert_method')
alert_config = {x[0]:x[1] for x in config.items(alert_method)}
alert = importlib.import_module(alert_method)
if os.environ['MONITOR_SERVICE_RESULT'] == 'success':
# exit with success status
failures = sysalert.db.get_failures(service_name, db)
sysalert.db.register_success(service_name, db)
try:
do_alert = config.getboolean(section_name, 'recovery_alert')
except ValueError:
if config.get(section_name, 'recovery_alert') == 'if-alerted' and failures:
do_alert = True
else:
do_alert = False
if do_alert:
alert.success(service_name, failures, alert_config)
return 0
# exit with failed status
failures = sysalert.db.get_failures(service_name, db)
if len(failures) < config.getint(section_name, 'max_failures'):
sysalert.db.register_failure(service_name, None, db)
return 0
now = datetime.datetime.now()
last_alert = now
for f in reversed(failures):
if f['alert_method']:
last_alert = f['timestamp']
break
diff = datetime.timedelta(seconds=config.getint(section_name, 'resend_alert_time', fallback=0))
if now != last_alert and last_alert + diff > now:
sysalert.db.register_failure(service_name, None, db)
return 0
# refresh failure list before alerting so we include this failure in alert
# call.
sysalert.db.register_failure(service_name, alert_method, db)
failures = sysalert.db.get_failures(service_name, db)
alert.failure(service_name, failures, alert_config)
return 0
def cli():
config = configparser.ConfigParser()
config.read(config_file)
db = sysalert.db.init(db_file)
if _test_env():
# invoked by systemd
ret = register_exit(config, db)
sysalert.db.close(db)
return ret
if __name__ == '__main__':
sys.exit(cli())