#!/usr/bin/env python3.10 import argparse import configparser import os import sys import time import xmlrpc.client import http.client class ProxiedTransport(xmlrpc.client.Transport): def set_proxy(self, host, port=None, headers=None): self.proxy = host, port self.proxy_headers = headers def make_connection(self, host): connection = http.client.HTTPSConnection(*self.proxy) connection.set_tunnel(host, headers=self.proxy_headers) self._connection = host, connection return connection def main(): if all([x in os.environ for x in ['CERTBOT_DOMAIN', 'CERTBOT_VALIDATION']]): domain = os.environ['CERTBOT_DOMAIN'] subdomain = '_acme-challenge' token = os.environ['CERTBOT_VALIDATION'] # According to inside sources: "If it takes longer than 2 minutes, # something is probably wrong". Adding an extra minute to be sure. waittime = 180 else: parser = argparse.ArgumentParser(description='Update acme-record for subdomain') parser.add_argument('--domain', '-d', nargs=1, required=True, help='domain to update') parser.add_argument('--token', '-t', nargs=1, required=True, help='token to set as txt record') args = parser.parse_args() domain = args.domain[0] subdomain = '_acme-challenge' token = args.token[0] waittime = 0 new_record = { 'type': 'TXT', 'ttl': '300', 'rdata': token, 'record_id': 0, 'priority': 0 } config = configparser.ConfigParser() config.read('/usr/local/etc/loopiaapi.ini') url = config.get('default', 'url') user = config.get('default', 'username') pwd = config.get('default', 'password') proxy = os.environ.get('http_proxy') if not proxy: proxy = os.environ.get('HTTP_PROXY') if proxy: transport = ProxiedTransport() proto, host, port = proxy.split(':') transport.set_proxy(host.strip('/'), int(port)) client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8', transport=transport) else: client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8') parts=[] while domain: try: res = client.getSubdomains(user, pwd, domain) except xmlrpc.client.Fault as err: if err.faultCode != 404: raise err else: break part, domain = domain.split('.', maxsplit=1) parts.append(part) subdomain = '_acme-challenge.{}'.format('.'.join(parts)) if not res: print("Failed to find domain in loopiadns") return 1 if subdomain not in res: res = client.addSubdomain(user, pwd, domain, subdomain) if res != 'OK': print('Adding subdomain failed with status: {}'.format(res)) return 1 res = client.getZoneRecords(user, pwd, domain, subdomain) for rec in res: if rec['type'] == 'TXT': if rec['rdata'] == token: return 0 new_record['record_id'] = rec['record_id'] break if new_record['record_id']: res = client.updateZoneRecord(user, pwd, domain, subdomain, new_record) else: res = client.addZoneRecord(user, pwd, domain, subdomain, new_record) if res != 'OK': print('Setting zone record failed with status: {}'.format(res)) return 1 time.sleep(waittime) return 0 if __name__ == '__main__': sys.exit(main())