加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
download_malwares.py 4.94 KB
一键复制 编辑 原始数据 按行查看 历史
EITarget 提交于 2022-03-14 09:09 . New master 0314 (#8)
import os
import requests
import lxml.etree as le
import zipfile
def file_transfer_handler(default_dir, target_dir):
"""
移动文件夹 file_transfer_handler('../SaveFile', '../targetFile')
:param default_dir: 默认存储路径
:param target_dir: 目标路径
:return:
"""
try:
import shutil
if not os.path.exists(target_dir):
os.makedirs(target_dir)
shutil.move(default_dir, target_dir)
print('移动完成: ', target_dir)
return True
except Exception as e:
print(e)
return False
class DownloadMalware(object):
def __init__(self, params='hourly'):
self.abuse_info = {
'daily': {'url': 'https://datalake.abuse.ch/malware-bazaar/daily/', 'save_dir': 'output_daily'},
'hourly': {'url': 'https://datalake.abuse.ch/malware-bazaar/hourly/', 'save_dir': 'output'}}
self.base_url = self.abuse_info[params].get('url')
self.cur_py_path, _ = os.path.split(os.path.abspath(__file__))
self.save_dir = os.path.join(self.cur_py_path, self.abuse_info[params].get('save_dir'))
self.download_list = []
self.thunder_download()
print(self.base_url, '\n', self.save_dir)
# self.idm_download()
def get_malware_bazaar(self):
response = requests.get(self.base_url)
if response.status_code == 200:
res = response.content.decode('utf-8', 'ignore').replace('\n', '')
content_xs = le.HTML(res)
href_xs = content_xs.xpath('//td/a/@href')
lists = [href_xs[href] for href in range(1, len(href_xs))]
return lists
return False
def download_prepend(self):
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
@staticmethod
def exist2remove(path):
if os.path.exists(path):
os.remove(path)
def thunder_download(self):
# os.system(r'"D:\Program Files (x86)\Thunder Network\Thunder\Program\ThunderStart.exe" {url}'.format(url=url))
# AddTask("下载地址", "另存为文件名", "保存目录", "任务注释", "引用地址", "开始模式", "只从原始地址下载", "从原始地址下载线程数")
self.download_prepend()
try:
from win32com.client import Dispatch
thunder = Dispatch('ThunderAgent.Agent64.1')
bazaar_list = self.get_malware_bazaar()
if bazaar_list:
for name in bazaar_list:
file_path = os.path.join(self.save_dir, name)
if zipfile.is_zipfile(file_path):
continue
self.exist2remove(file_path)
self.download_list.append(name)
thunder.AddTask(self.base_url + name, name, self.save_dir)
thunder.CommitTasks()
print(f'下载长度{len(self.download_list)}\n')
except Exception as e:
print(e)
def idm_download(self):
""" /d URL - 下载文件。
IDMan.exe /d "http://www.xxxx Name.zip"
/s - 开始任务调度里的队列
/p 本地路径 - 定义要保存的文件放在哪个本地路径
/f 本地local文件名 - 定义要保存的文件到本地的文件名
/q - IDM 将在成功下载之后退出。这个参数只为第一个副本工作
/h - IDM 将在成功下载之后挂起您的连接
/n - 当不要 IDM 询问任何问题时启用安静模式
/a - 添加一个指定的文件 用 /d 到下载队列,但是不要开始下载
"""
self.download_prepend()
try:
idm = r'D:\工具\IDM\IDMan.exe'
bazaar_list = self.get_malware_bazaar()
if bazaar_list:
from subprocess import call
for name in bazaar_list:
file_path = os.path.join(self.save_dir, name)
if zipfile.is_zipfile(file_path):
continue
self.exist2remove(file_path)
self.download_list.append(name)
call([idm, '/d', self.base_url + name, '/p', self.save_dir, '/f', name, '/n', '/a'])
call([idm, '/s'])
print(f'下载长度{len(self.download_list)}\n')
except Exception as e:
print(e)
def select_date(filelist, first, last):
"""
# start = input('start_time')
# end = input('end_time')
# num = select_date(file_list, start, end)
# print(file_list[num[0]:num[1]])
"""
# 文件列表中选择指定范围文件
first = first + '.zip'
last = last + '.zip'
first_i = 0
last_i = 0
for index, file in enumerate(filelist):
if first == file:
first_i = index
if last == file:
last_i = index
return first_i, last_i
if __name__ == '__main__':
types = {'h': 'hourly', 'd': 'daily'}
DownloadMalware(types.get('h'))
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化