modernize porkbun-ddns
Modernizes the Python DDNS client: * Use argparse for CLI argument handling (better help text, validation and error messages) * Accept config file from stdin * Use modern Python naming conventions * Use ipaddress for validation * Default value for the base url of the API * Closes #2 * Closes #3
This commit is contained in:
parent
7e263cd083
commit
99afcf45bf
135
porkbun-ddns.py
135
porkbun-ddns.py
|
@ -1,53 +1,94 @@
|
|||
import json
|
||||
#!/usr/bin/env python
|
||||
"""Porkbun Dynamic DNS client, Python Edition.
|
||||
|
||||
Examples:
|
||||
python porkbun-ddns.py /path/to/config.json example.com
|
||||
python porkbun-ddns.py /path/to/config.json example.com www
|
||||
python porkbun-ddns.py /path/to/config.json example.com '*'
|
||||
python porkbun-ddns.py /path/to/config.json example.com -i 10.0.0.1
|
||||
"""
|
||||
|
||||
import argparse, json, re, sys, ipaddress
|
||||
import requests
|
||||
import re
|
||||
import sys
|
||||
|
||||
def getRecords(domain): #grab all the records so we know which ones to delete to make room for our record. Also checks to make sure we've got the right domain
|
||||
allRecords=json.loads(requests.post(apiConfig["endpoint"] + '/dns/retrieve/' + domain, data = json.dumps(apiConfig)).text)
|
||||
if allRecords["status"]=="ERROR":
|
||||
print('Error getting domain. Check to make sure you specified the correct domain, and that API access has been switched on for this domain.');
|
||||
sys.exit();
|
||||
return(allRecords)
|
||||
|
||||
def getMyIP():
|
||||
ping = json.loads(requests.post(apiConfig["endpoint"] + '/ping/', data = json.dumps(apiConfig)).text)
|
||||
return(ping["yourIp"])
|
||||
|
||||
def deleteRecord():
|
||||
for i in getRecords(rootDomain)["records"]:
|
||||
if i["name"]==fqdn and (i["type"] == 'A' or i["type"] == 'ALIAS' or i["type"] == 'CNAME'):
|
||||
print("Deleting existing " + i["type"] + " Record")
|
||||
deleteRecord = json.loads(requests.post(apiConfig["endpoint"] + '/dns/delete/' + rootDomain + '/' + i["id"], data = json.dumps(apiConfig)).text)
|
||||
def err(msg, *args, **kwargs):
|
||||
msg = "Error: " + str(msg)
|
||||
sys.stderr.write(msg.format(*args, **kwargs))
|
||||
raise SystemExit(kwargs.get("code", 1))
|
||||
|
||||
def createRecord():
|
||||
createObj=apiConfig.copy()
|
||||
createObj.update({'name': subDomain, 'type': 'A', 'content': myIP, 'ttl': 300})
|
||||
endpoint = apiConfig["endpoint"] + '/dns/create/' + rootDomain
|
||||
print("Creating record: " + fqdn + " with answer of " + myIP)
|
||||
create = json.loads(requests.post(apiConfig["endpoint"] + '/dns/create/'+ rootDomain, data = json.dumps(createObj)).text)
|
||||
return(create)
|
||||
|
||||
if len(sys.argv)>2: #at least the config and root domain is specified
|
||||
apiConfig = json.load(open(sys.argv[1])) #load the config file into a variable
|
||||
rootDomain=sys.argv[2]
|
||||
|
||||
if len(sys.argv)>3 and sys.argv[3]!='-i': #check if a subdomain was specified as the third argument
|
||||
subDomain=sys.argv[3]
|
||||
fqdn=subDomain + "." + rootDomain
|
||||
else:
|
||||
subDomain=''
|
||||
fqdn=rootDomain
|
||||
def api(args, target, data=None):
|
||||
data = data or args.cfg
|
||||
return json.loads(
|
||||
requests.post(
|
||||
args.cfg["endpoint"] + target, data=json.dumps(data)
|
||||
).text
|
||||
)
|
||||
|
||||
if len(sys.argv)>4 and sys.argv[3]=='-i': #check if IP is manually specified. There's probably a more-elegant way to do this
|
||||
myIP=sys.argv[4]
|
||||
elif len(sys.argv)>5 and sys.argv[4]=='-i':
|
||||
myIP=sys.argv[5]
|
||||
else:
|
||||
myIP=getMyIP() #otherwise use the detected exterior IP address
|
||||
|
||||
deleteRecord()
|
||||
print(createRecord()["status"])
|
||||
|
||||
else:
|
||||
print("Porkbun Dynamic DNS client, Python Edition\n\nError: not enough arguments. Examples:\npython porkbun-ddns.py /path/to/config.json example.com\npython porkbun-ddns.py /path/to/config.json example.com www\npython porkbun-ddns.py /path/to/config.json example.com '*'\npython porkbun-ddns.py /path/to/config.json example.com -i 10.0.0.1\n")
|
||||
|
||||
def get_records(args):
|
||||
"""grab all records, then find the correct one to replace."""
|
||||
all_records = api(args, "/dns/retrieve/" + args.domain)
|
||||
if all_records["status"] == "ERROR":
|
||||
err(
|
||||
"Failed to get records. "
|
||||
"Make sure you specified the correct domain ({}), "
|
||||
"and that API access has been enabled for this domain.",
|
||||
args.domain,
|
||||
)
|
||||
return all_records
|
||||
|
||||
|
||||
def delete_record(args):
|
||||
for i in get_records(args)["records"]:
|
||||
if i["name"] == args.fqdn and i["type"] in ["A", "AAAA", "ALIAS", "CNAME"]:
|
||||
print("Deleting existing {}-Record: {}".format(i["type"], i))
|
||||
api(args, "/dns/delete/" + args.domain + "/" + i["id"])
|
||||
|
||||
|
||||
def create_record(args):
|
||||
obj = args.cfg.copy()
|
||||
type_ = "A" if args.public_ip.version == 4 else "AAAA"
|
||||
obj.update({"name": args.subdomain, "type": type_, "content": args.public_ip.exploded, "ttl": 300})
|
||||
print("Creating {}-Record for '{}' with answer of '{}'".format(type_, args.fqdn, args.public_ip))
|
||||
return api(args, "/dns/create/" + args.domain, obj)
|
||||
|
||||
|
||||
def main(args):
|
||||
parser = argparse.ArgumentParser(
|
||||
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
parser.add_argument("config", nargs=1, help="path to config file")
|
||||
parser.add_argument("domain", nargs=1, help="domain to be updated")
|
||||
parser.add_argument("subdomain", nargs="?", default="", help="optional subdomain")
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--public-ip",
|
||||
help="skip auto-detection and use this IP for entry",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
args.domain, args.config = args.domain[0], args.config[0]
|
||||
args.fqdn = "{}.{}".format(args.subdomain, args.domain).strip(".")
|
||||
|
||||
try:
|
||||
with sys.stdin if args.config == "-" else open(args.config) as file_:
|
||||
args.cfg = json.load(file_)
|
||||
except Exception as e:
|
||||
err(e)
|
||||
required = ["secretapikey", "apikey"]
|
||||
if any(x not in args.cfg for x in required) or not isinstance(args.cfg, dict):
|
||||
err("all of the following are required in '{}': {}", args.config, required)
|
||||
args.cfg.setdefault("endpoint", "https://porkbun.com/api/json/v3/")
|
||||
|
||||
if not args.public_ip:
|
||||
args.public_ip = api(args, "/ping/")["yourIp"]
|
||||
args.public_ip = ipaddress.ip_address(args.public_ip)
|
||||
|
||||
delete_record(args)
|
||||
print(create_record(args)["status"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1:])
|
||||
|
|
Reference in a new issue