added loopia method
This commit is contained in:
109
files/acme-auth-loopia.py
Executable file
109
files/acme-auth-loopia.py
Executable file
@ -0,0 +1,109 @@
|
||||
#!/usr/bin/env python3.7
|
||||
|
||||
import argparse
|
||||
import configparser
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import xmlrpc.client
|
||||
import http.client
|
||||
|
||||
class ProxiedTransport(xmlrpc.client.Transport):
|
||||
|
||||
def set_proxy(self, host, port=None, headers=None):
|
||||
self.proxy = host, port
|
||||
self.proxy_headers = headers
|
||||
|
||||
def make_connection(self, host):
|
||||
connection = http.client.HTTPSConnection(*self.proxy)
|
||||
connection.set_tunnel(host, headers=self.proxy_headers)
|
||||
self._connection = host, connection
|
||||
return connection
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
if all([x in os.environ for x in ['CERTBOT_DOMAIN', 'CERTBOT_VALIDATION']]):
|
||||
domain = os.environ['CERTBOT_DOMAIN']
|
||||
subdomain = '_acme-challenge'
|
||||
token = os.environ['CERTBOT_VALIDATION']
|
||||
waittime = 600
|
||||
else:
|
||||
parser = argparse.ArgumentParser(description='Update acme-record for subdomain')
|
||||
parser.add_argument('--domain', '-d', nargs=1, required=True, help='domain to update')
|
||||
parser.add_argument('--token', '-t', nargs=1, required=True, help='token to set as txt record')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
domain = args.domain[0]
|
||||
subdomain = '_acme-challenge'
|
||||
token = args.token[0]
|
||||
waittime = 0
|
||||
|
||||
new_record = {
|
||||
'type': 'TXT',
|
||||
'ttl': '300',
|
||||
'rdata': token,
|
||||
'record_id': 0,
|
||||
'priority': 0
|
||||
}
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read('/usr/local/etc/loopiaapi.ini')
|
||||
url = config.get('default', 'url')
|
||||
user = config.get('default', 'username')
|
||||
pwd = config.get('default', 'password')
|
||||
|
||||
proxy = os.environ.get('http_proxy')
|
||||
if not proxy:
|
||||
proxy = os.environ.get('HTTP_PROXY')
|
||||
|
||||
if proxy:
|
||||
transport = ProxiedTransport()
|
||||
proto, host, port = proxy.split(':')
|
||||
transport.set_proxy(host.strip('/'), int(port))
|
||||
client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8', transport=transport)
|
||||
else:
|
||||
client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8')
|
||||
|
||||
|
||||
while domain:
|
||||
res = client.getSubdomains(user, pwd, domain)
|
||||
if 'UNKNOWN_ERROR' not in res:
|
||||
break
|
||||
subdomain, domain = domain.split('.', maxsplit=1)
|
||||
subdomain = '_acme-challenge.{}'.format(subdomain)
|
||||
|
||||
if 'UNKNOWN_ERROR' in res:
|
||||
print("Failed to find domain in loopiadns")
|
||||
return 1
|
||||
|
||||
if subdomain not in res:
|
||||
res = client.addSubdomain(user, pwd, domain, subdomain)
|
||||
if res != 'OK':
|
||||
print('Adding subdomain failed with status: {}'.format(res))
|
||||
return 1
|
||||
res = client.getZoneRecords(user, pwd, domain, subdomain)
|
||||
|
||||
for rec in res:
|
||||
if rec['type'] == 'TXT':
|
||||
if rec['rdata'] == token:
|
||||
return 0
|
||||
new_record['record_id'] = rec['record_id']
|
||||
break
|
||||
if new_record['record_id']:
|
||||
res = client.updateZoneRecord(user, pwd, domain, subdomain, new_record)
|
||||
else:
|
||||
res = client.addZoneRecord(user, pwd, domain, subdomain, new_record)
|
||||
if res != 'OK':
|
||||
print('Setting zone record failed with status: {}'.format(res))
|
||||
return 1
|
||||
|
||||
time.sleep(waittime)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
93
files/acme-cleanup-loopia.py
Executable file
93
files/acme-cleanup-loopia.py
Executable file
@ -0,0 +1,93 @@
|
||||
#!/usr/bin/env python3.7
|
||||
|
||||
import argparse
|
||||
import configparser
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import xmlrpc.client
|
||||
import http.client
|
||||
|
||||
class ProxiedTransport(xmlrpc.client.Transport):
|
||||
|
||||
def set_proxy(self, host, port=None, headers=None):
|
||||
self.proxy = host, port
|
||||
self.proxy_headers = headers
|
||||
|
||||
def make_connection(self, host):
|
||||
connection = http.client.HTTPSConnection(*self.proxy)
|
||||
connection.set_tunnel(host, headers=self.proxy_headers)
|
||||
self._connection = host, connection
|
||||
return connection
|
||||
|
||||
def main():
|
||||
|
||||
if all([x in os.environ for x in ['CERTBOT_DOMAIN', 'CERTBOT_VALIDATION']]):
|
||||
domain = os.environ['CERTBOT_DOMAIN']
|
||||
subdomain = '_acme-challenge'
|
||||
token = os.environ['CERTBOT_VALIDATION']
|
||||
else:
|
||||
parser = argparse.ArgumentParser(description='Update acme-record for subdomain')
|
||||
parser.add_argument('--domain', '-d', nargs=1, required=True, help='domain to update')
|
||||
parser.add_argument('--token', '-t', nargs=1, required=True, help='token to set as txt record')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
domain = args.domain[0]
|
||||
subdomain = '_acme-challenge'
|
||||
token = args.token[0]
|
||||
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read('/usr/local/etc/loopiaapi.ini')
|
||||
url = config.get('default', 'url')
|
||||
user = config.get('default', 'username')
|
||||
pwd = config.get('default', 'password')
|
||||
|
||||
proxy = os.environ.get('http_proxy')
|
||||
if not proxy:
|
||||
proxy = os.environ.get('HTTP_PROXY')
|
||||
|
||||
if proxy:
|
||||
transport = ProxiedTransport()
|
||||
proto, host, port = proxy.split(':')
|
||||
transport.set_proxy(host.strip('/'), int(port))
|
||||
client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8', transport=transport)
|
||||
else:
|
||||
client = xmlrpc.client.ServerProxy(uri = url, encoding='utf-8')
|
||||
|
||||
while domain:
|
||||
res = client.getSubdomains(user, pwd, domain)
|
||||
if 'UNKNOWN_ERROR' not in res:
|
||||
break
|
||||
subdomain, domain = domain.split('.', maxsplit=1)
|
||||
subdomain = '_acme-challenge.{}'.format(subdomain)
|
||||
|
||||
if 'UNKNOWN_ERROR' in res:
|
||||
print("Failed to find domain in loopiadns")
|
||||
return 1
|
||||
|
||||
if subdomain not in res:
|
||||
return 0
|
||||
|
||||
res = client.getZoneRecords(user, pwd, domain, subdomain)
|
||||
|
||||
ret = 0
|
||||
for rec in res:
|
||||
if rec['type'] == 'TXT':
|
||||
if rec['rdata'] == token:
|
||||
res = client.removeZoneRecord(user, pwd, domain, subdomain, rec['record_id'])
|
||||
if res != 'OK':
|
||||
print('Failed to clean up record, loopia response: {}'.format(res))
|
||||
ret = 1
|
||||
res = client.removeSubdomain(user, pwd, domain, subdomain)
|
||||
if res != 'OK':
|
||||
print('Failed to clean up subdomain, loopia response: {}'.format(res))
|
||||
ret = 1
|
||||
return ret
|
||||
return ret
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
Reference in New Issue
Block a user