百度域名解析API+python实现百度云DDNS功能绑定动态公网ip

背景:前段时间,朋友为了使用NAS服务,开通了电信的动态公网IP,也就是那种公网IP地址会定期变化,重启光猫也会导致IP变化,朋友的水星低端路由器支持花生壳DDNS,去花生壳官网申请服务后能够正常使用,但是隔一两天就会出现路由器花生壳服务掉线,重新连接不上的奇怪问题,久久不能解决,然后他找到了我,本着助人为乐的精神,我给他找到了一个替代方案。

简述:百度注册的域名可以提供域名解析API服务,但是需要工单申请API权限,申请后可以通过 python实现域名解析的动态更新,适合动态公网用户。

前期准备:

1.你需要拥有一个百度云注册的域名,并且已经备案(政策规定,国内未备案域名不能提供域名解析服务)。

2.去百度云个人中心>安全认证界面,创建Access Key,记录下这个Access Key和Secret Key。

3.去工单>域名服务>申请开通域名API>创建工单,贴上Access Key,申请开通域名解析API。

4.去域名管理界面设置一个域名解析A记录,具体设置教程自行百度。

一些说明:

百度云提供了域名解析API相关的帮助文档(域名服务-百度智能云),根据文档,其最重要的部分是生成认证Authorization字符串,具体步骤可以参考其网址(https://cloud.baidu.com/doc/Reference/s/Njwvz1wot),这里就不做具体阐述。其网站提供了一个例子,签名算法没有问题,但是使用的是python requests模块,经过实践并不能成功,原因在于requests会在请求数据header自动添加一些无关的头,但是这些头服务器会纳入鉴权计算,导致服务器鉴权Authorization字符串算出来和本地不一致。

代码部分:

import hmac import time import requests import urllib.parse import socket import json import os class DDNS: def __init__(self,url,AK,SK): self.url=url self.AK=AK self.SK=SK def getTime(self):#获取网络时间戳,用于鉴权 url="http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp" res=requests.get(url,timeout=5).text res=json.loads(res)["data"]["t"] return int(res)/1000 def getIP(self):#获取本地公网IP url="http://pv.sohu.com/cityjson?ie=utf-8" res=requests.get(url,timeout=5).text res=res.split("=")[1].split(";")[0] #转换javascript为json格式 res=json.loads(res) res=res["cip"] return res def enc(self,key,message): h=hmac.new(key,message,digestmod="SHA256") return h.hexdigest() def post(self,URI,_data): #填写参数 #获取UTC时间 utc = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime(self.getTime())) #POST字典1 data=json.dumps(_data) #创建请求header字典 _headers={ "host":"bcd.baidubce.com", "x-bce-date":utc, "Content-Type":"application/json;charset=utf-8", "Content-Length":str(len(data)) } #格式化字典为http头标准格式 headers="" for a,b in _headers.items(): headers+=a+":"+b+"\r\n" #生成CanonicalHeaders method = "POST" CanonicalQueryString="" CanonicalURI = urllib.parse.quote(URI) result = [] for key,value in _headers.items(): tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe="")) result.append(tempStr) result.sort() CanonicalHeaders = "\n".join(result) #拼接得到CanonicalRequest CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders #计算signingKey signingKey=self.enc(self.SK.encode(),b"bce-auth-v1/%b/%b/1800"%(self.AK.encode(),utc.encode())) #计算signature signature=self.enc(signingKey.encode(),CanonicalRequest.encode()) #生成认证Authorization Authorization="bce-auth-v1/%s/%s/1800/content-length;content-type;host;x-bce-date/%s"%(self.AK,utc,signature) #最后生成完整的http请求 http="POST %s HTTP/1.1\r\n%sAuthorization:%s\r\n\r\n"%(URI,headers,Authorization)+data #建立socket链接,发送http数据 s=socket.socket() s.connect(("bcd.baidubce.com",80)) s.send(http.encode()) res=b"" while True: _res=s.recv(10240) res+=_res if b"chunked" not in res: #如果不是chunked,一次性读完退出 break if _res[-5:]==b"0\r\n\r\n": #chunked标志,连续读取到标识符退出 break s.close() return res def getID(self): data={ 'domain':self.url, 'pageNo':1, 'pageSize':100 } url="/v1/domain/resolve/list" res=self.post(url,data) res=res.decode().split("\r\n{")[1].split("}\r\n")[0] res="{"+res+"}" res=json.loads(res)["result"] recordId={} for i in res: recordId[i["domain"]]=(i["recordId"],i["rdata"]) return recordId def SET(self,domain,ip): recordId=self.getID() self.urlIP=recordId[domain][1] if ip!=recordId[domain][1]: url="/v1/domain/resolve/edit" data={ "domain" : domain, "rdType" : "A", "rdata" : ip, "ttl" : 60, "zoneName" : self.url, "recordId" : recordId[domain][0] } res=self.post(url,data) #print(res) if __name__ == "__main__": #参数设置 url="" #填写你的域名,如http://www.baidu.com,仅填写baidu.com AK="" #Access Key 百度云控制台申请 SK="" #Secret Key 百度云控制台申请 sync=60 #检测间隔时间,建议60秒 #循环监控 ddns=DDNS(url,AK,SK) first=True jsq=sync while True: os.system("title DDNS服务已启动【%s秒后更新】"%jsq) if jsq==0 or first==True: try: ddns.SET("@",ddns.getIP()) os.system("cls") now=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ddns.getTime())) print("---------百度域名DDNS信息---------\n\n域名:%s\n-----\n域名解析IP:%s\n本地公网IP:%s\n更新时间:%s"%(url,ddns.urlIP,ddns.getIP(),now)) except: pass jsq=sync first=False time.sleep(1) jsq-=1
import hmac import time import requests import urllib.parse import socket import json import os class DDNS: def __init__(self,url,AK,SK): self.url=url self.AK=AK self.SK=SK def getTime(self):#获取网络时间戳,用于鉴权 url="http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp" res=requests.get(url,timeout=5).text res=json.loads(res)["data"]["t"] return int(res)/1000 def getIP(self):#获取本地公网IP url="http://pv.sohu.com/cityjson?ie=utf-8" res=requests.get(url,timeout=5).text res=res.split("=")[1].split(";")[0] #转换javascript为json格式 res=json.loads(res) res=res["cip"] return res def enc(self,key,message): h=hmac.new(key,message,digestmod="SHA256") return h.hexdigest() def post(self,URI,_data): #填写参数 #获取UTC时间 utc = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime(self.getTime())) #POST字典1 data=json.dumps(_data) #创建请求header字典 _headers={ "host":"bcd.baidubce.com", "x-bce-date":utc, "Content-Type":"application/json;charset=utf-8", "Content-Length":str(len(data)) } #格式化字典为http头标准格式 headers="" for a,b in _headers.items(): headers+=a+":"+b+"\r\n" #生成CanonicalHeaders method = "POST" CanonicalQueryString="" CanonicalURI = urllib.parse.quote(URI) result = [] for key,value in _headers.items(): tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe="")) result.append(tempStr) result.sort() CanonicalHeaders = "\n".join(result) #拼接得到CanonicalRequest CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders #计算signingKey signingKey=self.enc(self.SK.encode(),b"bce-auth-v1/%b/%b/1800"%(self.AK.encode(),utc.encode())) #计算signature signature=self.enc(signingKey.encode(),CanonicalRequest.encode()) #生成认证Authorization Authorization="bce-auth-v1/%s/%s/1800/content-length;content-type;host;x-bce-date/%s"%(self.AK,utc,signature) #最后生成完整的http请求 http="POST %s HTTP/1.1\r\n%sAuthorization:%s\r\n\r\n"%(URI,headers,Authorization)+data #建立socket链接,发送http数据 s=socket.socket() s.connect(("bcd.baidubce.com",80)) s.send(http.encode()) res=b"" while True: _res=s.recv(10240) res+=_res if b"chunked" not in res: #如果不是chunked,一次性读完退出 break if _res[-5:]==b"0\r\n\r\n": #chunked标志,连续读取到标识符退出 break s.close() return res def getID(self): data={ 'domain':self.url, 'pageNo':1, 'pageSize':100 } url="/v1/domain/resolve/list" res=self.post(url,data) res=res.decode().split("\r\n{")[1].split("}\r\n")[0] res="{"+res+"}" res=json.loads(res)["result"] recordId={} for i in res: recordId[i["domain"]]=(i["recordId"],i["rdata"]) return recordId def SET(self,domain,ip): recordId=self.getID() self.urlIP=recordId[domain][1] if ip!=recordId[domain][1]: url="/v1/domain/resolve/edit" data={ "domain" : domain, "rdType" : "A", "rdata" : ip, "ttl" : 60, "zoneName" : self.url, "recordId" : recordId[domain][0] } res=self.post(url,data) #print(res) if __name__ == "__main__": #参数设置 url="" #填写你的域名,如http://www.baidu.com,仅填写baidu.com AK="" #Access Key 百度云控制台申请 SK="" #Secret Key 百度云控制台申请 sync=60 #检测间隔时间,建议60秒 #循环监控 ddns=DDNS(url,AK,SK) first=True jsq=sync while True: os.system("title DDNS服务已启动【%s秒后更新】"%jsq) if jsq==0 or first==True: try: ddns.SET("@",ddns.getIP()) os.system("cls") now=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ddns.getTime())) print("---------百度域名DDNS信息---------\n\n域名:%s\n-----\n域名解析IP:%s\n本地公网IP:%s\n更新时间:%s"%(url,ddns.urlIP,ddns.getIP(),now)) except: pass jsq=sync first=False time.sleep(1) jsq-=1 
import hmac import time import requests import urllib.parse import socket import json import os class DDNS: def __init__(self,url,AK,SK): self.url=url self.AK=AK self.SK=SK def getTime(self):#获取网络时间戳,用于鉴权 url="http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp" res=requests.get(url,timeout=5).text res=json.loads(res)["data"]["t"] return int(res)/1000 def getIP(self):#获取本地公网IP url="http://pv.sohu.com/cityjson?ie=utf-8" res=requests.get(url,timeout=5).text res=res.split("=")[1].split(";")[0] #转换javascript为json格式 res=json.loads(res) res=res["cip"] return res def enc(self,key,message): h=hmac.new(key,message,digestmod="SHA256") return h.hexdigest() def post(self,URI,_data): #填写参数 #获取UTC时间 utc = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime(self.getTime())) #POST字典1 data=json.dumps(_data) #创建请求header字典 _headers={ "host":"bcd.baidubce.com", "x-bce-date":utc, "Content-Type":"application/json;charset=utf-8", "Content-Length":str(len(data)) } #格式化字典为http头标准格式 headers="" for a,b in _headers.items(): headers+=a+":"+b+"\r\n" #生成CanonicalHeaders method = "POST" CanonicalQueryString="" CanonicalURI = urllib.parse.quote(URI) result = [] for key,value in _headers.items(): tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe="")) result.append(tempStr) result.sort() CanonicalHeaders = "\n".join(result) #拼接得到CanonicalRequest CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders #计算signingKey signingKey=self.enc(self.SK.encode(),b"bce-auth-v1/%b/%b/1800"%(self.AK.encode(),utc.encode())) #计算signature signature=self.enc(signingKey.encode(),CanonicalRequest.encode()) #生成认证Authorization Authorization="bce-auth-v1/%s/%s/1800/content-length;content-type;host;x-bce-date/%s"%(self.AK,utc,signature) #最后生成完整的http请求 http="POST %s HTTP/1.1\r\n%sAuthorization:%s\r\n\r\n"%(URI,headers,Authorization)+data #建立socket链接,发送http数据 s=socket.socket() s.connect(("bcd.baidubce.com",80)) s.send(http.encode()) res=b"" while True: _res=s.recv(10240) res+=_res if b"chunked" not in res: #如果不是chunked,一次性读完退出 break if _res[-5:]==b"0\r\n\r\n": #chunked标志,连续读取到标识符退出 break s.close() return res def getID(self): data={ 'domain':self.url, 'pageNo':1, 'pageSize':100 } url="/v1/domain/resolve/list" res=self.post(url,data) res=res.decode().split("\r\n{")[1].split("}\r\n")[0] res="{"+res+"}" res=json.loads(res)["result"] recordId={} for i in res: recordId[i["domain"]]=(i["recordId"],i["rdata"]) return recordId def SET(self,domain,ip): recordId=self.getID() self.urlIP=recordId[domain][1] if ip!=recordId[domain][1]: url="/v1/domain/resolve/edit" data={ "domain" : domain, "rdType" : "A", "rdata" : ip, "ttl" : 60, "zoneName" : self.url, "recordId" : recordId[domain][0] } res=self.post(url,data) #print(res) if __name__ == "__main__": #参数设置 url="" #填写你的域名,如http://www.baidu.com,仅填写baidu.com AK="" #Access Key 百度云控制台申请 SK="" #Secret Key 百度云控制台申请 sync=60 #检测间隔时间,建议60秒 #循环监控 ddns=DDNS(url,AK,SK) first=True jsq=sync while True: os.system("title DDNS服务已启动【%s秒后更新】"%jsq) if jsq==0 or first==True: try: ddns.SET("@",ddns.getIP()) os.system("cls") now=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(ddns.getTime())) print("---------百度域名DDNS信息---------\n\n域名:%s\n-----\n域名解析IP:%s\n本地公网IP:%s\n更新时间:%s"%(url,ddns.urlIP,ddns.getIP(),now)) except: pass jsq=sync first=False time.sleep(1) jsq-=1

原文链接:https://blog.csdn.net/weixin_42169081/article/details/119299368?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168466843816800211583225%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=168466843816800211583225&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~times_rank-10-119299368-null-null.blog_rank_default&utm_term=NAS%E3%80%81%E7%BE%A4%E6%99%96%E3%80%81%E9%98%BF%E9%87%8C%E4%BA%91%E3%80%81%E5%9F%9F%E5%90%8D%E8%A7%A3%E6%9E%90%E3%80%81%E5%86%85%E7%BD%91%E7%A9%BF%E9%80%8F%E3%80%81ipv6%E3%80%81ddns%E3%80%81%E8%BD%BB%E9%87%8F%E7%BA%A7%E4%BA%91%E6%9C%8D%E5%8A%A1%E5%99%A8%E3%80%81%E9%93%81%E5%A8%81%E9%A9%AC%E3%80%81%E5%A8%81%E8%81%94%E9%80%9A%E3%80%81DSM%E3%80%81DSM6.0%E3%80%81%E7%BE%A4%E6%99%96nas%E3%80%81%E4%BA%91%E6%9C%8D%E5%8A%A1%E5%99%A8%E3%80%81%E8%9C%97%E7%89%9B%E6%98%9F%E9%99%85%E3%80%81%E9%BB%91%E7%BE%A4%E6%99%96%E3%80%81docker%E3%80%81%E5%AE%B9%E5%99%A8%E9%95%9C%E5%83%8F%E3%80%81%E5%9F%9F%E5%90%8D%E6%B3%A8%E5%86%8C%E3%80%81%E5%AE%9D%E5%A1%94%E3%80%81%E5%8F%8D%E5%90%91%E4%BB%A3%E7%90%86%E3%80%81nginx%E3%80%81frp%E3%80%81%E5%8A%A8%E6%80%81%E5%9F%9F%E5%90%8D%E8%A7%A3%E6%9E%90

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享