加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
dnscf.py 3.21 KB
一键复制 编辑 原始数据 按行查看 历史
伶枫 提交于 2024-01-20 00:18 . Update dnscf.py
import requests
import traceback
import time
import os
import json
# API 密钥
CF_API_TOKEN = os.environ["CF_API_TOKEN"]
CF_ZONE_ID = os.environ["CF_ZONE_ID"]
CF_DNS_NAME = os.environ["CF_DNS_NAME"]
# pushplus_token
PUSHPLUS_TOKEN = os.environ["PUSHPLUS_TOKEN"]
headers = {
'Authorization': f'Bearer {CF_API_TOKEN}',
'Content-Type': 'application/json'
}
def get_cf_speed_test_ip(timeout=10, max_retries=5):
for attempt in range(max_retries):
try:
# 发送 GET 请求,设置超时
response = requests.get('https://ip.164746.xyz/ipTop.html', timeout=timeout)
# 检查响应状态码
if response.status_code == 200:
return response.text
except Exception as e:
traceback.print_exc()
print(f"get_cf_speed_test_ip Request failed (attempt {attempt + 1}/{max_retries}): {e}")
# 如果所有尝试都失败,返回 None 或者抛出异常,根据需要进行处理
return None
# 获取 DNS 记录
def get_dns_records(name):
def_info = []
url = f'https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/dns_records'
response = requests.get(url, headers=headers)
if response.status_code == 200:
records = response.json()['result']
for record in records:
if record['name'] == name:
def_info.append(record['id'])
return def_info
else:
print('Error fetching DNS records:', response.text)
# 更新 DNS 记录
def update_dns_record(record_id, name, cf_ip):
url = f'https://api.cloudflare.com/client/v4/zones/{CF_ZONE_ID}/dns_records/{record_id}'
data = {
'type': 'A',
'name': name,
'content': cf_ip
}
response = requests.put(url, headers=headers, json=data)
if response.status_code == 200:
print(f"cf_dns_change success: ---- Time: " + str(
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ---- ip:" + str(cf_ip))
return "ip:" + str(cf_ip) + "解析" + str(name) + "成功"
else:
traceback.print_exc()
print(f"cf_dns_change ERROR: ---- Time: " + str(
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + " ---- MESSAGE: " + str(e))
return "ip:" + str(cf_ip) + "解析" + str(name) + "失败"
# 消息推送
def push_plus(content):
url = 'http://www.pushplus.plus/send'
data = {
"token": PUSHPLUS_TOKEN,
"title": "IP优选DNSCF推送",
"content": content,
"template": "markdown",
"channel": "wechat"
}
body = json.dumps(data).encode(encoding='utf-8')
headers = {'Content-Type': 'application/json'}
requests.post(url, data=body, headers=headers)
# 主函数
def main():
# 获取最新优选IP
ip_addresses_str = get_cf_speed_test_ip()
ip_addresses = ip_addresses_str.split(',')
dns_records = get_dns_records(CF_DNS_NAME)
push_plus_content = []
# 遍历 IP 地址列表
for index, ip_address in enumerate(ip_addresses):
# 执行 DNS 变更
dns = update_dns_record(dns_records[index], CF_DNS_NAME, ip_address)
push_plus_content.append(dns)
push_plus('\n'.join(push_plus_content))
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化