代码拉取完成,页面将自动刷新
import time
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import requests
import re
import os
import threading
from queue import Queue
import eventlet
eventlet.monkey_patch()
urls = [
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iSGViZWki", # Hebei (河北)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iYmVpamluZyI%3D", # Beijing (北京)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iZ3Vhbmdkb25nIg%3D%3D", # Guangdong (广东)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ic2hhbmdoYWki", # Shanghai (上海)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0idGlhbmppbiI%3D", # Tianjin (天津)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iY2hvbmdxaW5nIg%3D%3D", # Chongqing (重庆)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ic2hhbnhpIg%3D%3D", # Shanxi (山西)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iU2hhYW54aSI%3D", # Shaanxi (陕西)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ibGlhb25pbmci", # Liaoning (辽宁)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iamlhbmdzdSI%3D", # Jiangsu (江苏)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iemhlamlhbmci", # Zhejiang (浙江)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5a6J5b69Ig%3D%3D", # Anhui (安徽)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iRnVqaWFuIg%3D%3D", # 福建
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rGf6KW%2FIg%3D%3D", # 江西
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5bGx5LicIg%3D%3D", # 山东
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rKz5Y2XIg%3D%3D", # 河南
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rmW5YyXIg%3D%3D", # 湖北
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rmW5Y2XIg%3D%3D" # 湖南
]
def modify_urls(url):
modified_urls = []
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_end_index]
port = url[ip_end_index:]
ip_end = "/iptv/live/1000.json?key=txiptv"
for i in range(1, 256):
modified_ip = f"{ip_address[:-1]}{i}"
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
modified_urls.append(modified_url)
return modified_urls
def is_url_accessible(url):
try:
response = requests.get(url, timeout=0.5)
if response.status_code == 200:
return url
except requests.exceptions.RequestException:
pass
return None
results = []
for url in urls:
# 创建一个Chrome WebDriver实例
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 使用WebDriver访问网页
driver.get(url) # 将网址替换为你要访问的网页地址
time.sleep(10)
# 获取网页内容
page_content = driver.page_source
# 关闭WebDriver
driver.quit()
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
ip_dot_start = url.find(".") + 1
ip_dot_second = url.find(".", ip_dot_start) + 1
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_dot_three]
port = url[ip_end_index:]
ip_end = "1"
modified_ip = f"{ip_address}{ip_end}"
x_url = f"{base_url}{modified_ip}{port}"
x_urls.append(x_url)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls:
url = url.strip()
modified_urls = modify_urls(url)
for modified_url in modified_urls:
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
valid_urls.append(result)
for url in valid_urls:
print(url)
# 遍历网址列表,获取JSON文件并解析
for url in valid_urls:
try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒
ip_start_index = url.find("//") + 2
ip_dot_start = url.find(".") + 1
ip_index_second = url.find("/", ip_dot_start)
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_index_second]
url_x = f"{base_url}{ip_address}"
json_url = f"{url}"
response = requests.get(json_url, timeout=0.5)
json_data = response.json()
try:
# 解析JSON文件,获取name和url字段
for item in json_data['data']:
if isinstance(item, dict):
name = item.get('name')
urlx = item.get('url')
if ',' in urlx:
urlx=f"aaaaaaaa"
#if 'http' in urlx or 'udp' in urlx or 'rtp' in urlx:
if 'http' in urlx:
urld = f"{urlx}"
else:
urld = f"{url_x}{urlx}"
if name and urlx:
# 删除特定文字
name = name.replace("cctv", "CCTV")
name = name.replace("中央", "CCTV")
name = name.replace("央视", "CCTV")
name = name.replace("高清", "")
name = name.replace("超高", "")
name = name.replace("HD", "")
name = name.replace("标清", "")
name = name.replace("频道", "")
name = name.replace("-", "")
name = name.replace(" ", "")
name = name.replace("PLUS", "+")
name = name.replace("+", "+")
name = name.replace("(", "")
name = name.replace(")", "")
name = re.sub(r"CCTV(\d+)台", r"CCTV\1", name)
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
results.append(f"{name},{urld}")
except:
continue
except:
continue
channels = []
for result in results:
line = result.strip()
if result:
channel_name, channel_url = result.split(',')
channels.append((channel_name, channel_url))
# 线程安全的队列,用于存储下载任务
task_queue = Queue()
# 线程安全的列表,用于存储结果
results = []
error_channels = []
# 定义工作线程函数
def worker():
while True:
# 从队列中获取一个任务
channel_name, channel_url = task_queue.get()
try:
channel_url_t = channel_url.rstrip(channel_url.split('/')[-1]) # m3u8链接前缀
lines = requests.get(channel_url, timeout = 1).text.strip().split('\n') # 获取m3u8文件内容
ts_lists = [line.split('/')[-1] for line in lines if line.startswith('#') == False] # 获取m3u8文件下视频流后缀
ts_lists_0 = ts_lists[0].rstrip(ts_lists[0].split('.ts')[-1]) # m3u8链接前缀
ts_url = channel_url_t + ts_lists[0] # 拼接单个视频片段下载链接
# 多获取的视频数据进行5秒钟限制
with eventlet.Timeout(5, False):
start_time = time.time()
content = requests.get(ts_url, timeout = 1).content
end_time = time.time()
response_time = (end_time - start_time) * 1
if content:
with open(ts_lists_0, 'ab') as f:
f.write(content) # 写入文件
file_size = len(content)
# print(f"文件大小:{file_size} 字节")
download_speed = file_size / response_time / 1024
# print(f"下载速度:{download_speed:.3f} kB/s")
normalized_speed = min(max(download_speed / 1024, 0.001), 100) # 将速率从kB/s转换为MB/s并限制在1~100之间
#print(f"标准化后的速率:{normalized_speed:.3f} MB/s")
# 删除下载的文件
os.remove(ts_lists_0)
result = channel_name, channel_url, f"{normalized_speed:.3f} MB/s"
results.append(result)
numberx = (len(results) + len(error_channels)) / len(channels) * 100
print(f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。")
except:
error_channel = channel_name, channel_url
error_channels.append(error_channel)
numberx = (len(results) + len(error_channels)) / len(channels) * 100
print(f"可用频道:{len(results)} 个 , 不可用频道:{len(error_channels)} 个 , 总频道:{len(channels)} 个 ,总进度:{numberx:.2f} %。")
# 标记任务完成
task_queue.task_done()
# 创建多个工作线程
num_threads = 10
for _ in range(num_threads):
t = threading.Thread(target=worker, daemon=True) # 将工作线程设置为守护线程
t.start()
# 添加下载任务到队列
for channel in channels:
task_queue.put(channel)
# 等待所有任务完成
task_queue.join()
def channel_key(channel_name):
match = re.search(r'\d+', channel_name)
if match:
return int(match.group())
else:
return float('inf') # 返回一个无穷大的数字作为关键字
# 对频道进行排序
results.sort(key=lambda x: (x[0], -float(x[2].split()[0])))
results.sort(key=lambda x: channel_key(x[0]))
result_counter = 8 # 每个频道需要的个数
with open("itvlist.txt", 'w', encoding='utf-8') as file:
channel_counters = {}
file.write('央视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
file.write('卫视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if '卫视' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
file.write('其他频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' not in channel_name and '卫视' not in channel_name and '测试' not in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"{channel_name},{channel_url}\n")
channel_counters[channel_name] = 1
with open("itvlist.m3u", 'w', encoding='utf-8') as file:
channel_counters = {}
file.write('#EXTM3U\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"央视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"央视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
#file.write('卫视频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if '卫视' in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"卫视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"卫视频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
channel_counters = {}
#file.write('其他频道,#genre#\n')
for result in results:
channel_name, channel_url, speed = result
if 'CCTV' not in channel_name and '卫视' not in channel_name and '测试' not in channel_name:
if channel_name in channel_counters:
if channel_counters[channel_name] >= result_counter:
continue
else:
file.write(f"#EXTINF:-1 group-title=\"其他频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] += 1
else:
file.write(f"#EXTINF:-1 group-title=\"其他频道\",{channel_name}\n")
file.write(f"{channel_url}\n")
channel_counters[channel_name] = 1
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。