代码拉取完成,页面将自动刷新
同步操作将从 didiplus/script 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
'''
@文件 :network_contet.py
@说明 :
@时间 :2020/04/01 17:37:36
@作者 :didiplus
@版本 :1.0
'''
from telnetlib import Telnet
import time
from datetime import datetime
import paramiko
import re
import os
import threading
class MyThreading(threading.Thread):
def __init__(self, func, args=()):
super(MyThreading, self).__init__()
self.func = func
self.args = args
def run(self):
try:
self.result = self.func(*self.args)
except Exception as e:
print(e)
def get_result(self):
# threading.Thread.join(self)
try:
return self.result
except Exception as e:
print(e)
return None
class BaseClass(object):
def __init__(self, host, username: str, password: str, superpassword=None):
self.host = host
self.login_dict = {
'host': host,
'username': username,
'password': password,
'super': superpassword
}
def savefile(self):
str_path = datetime.now().strftime("%Y%m%d")
if os.path.exists(str_path):
pass
else:
os.makedirs(str_path)
file = open(str_path + '\\' + self.host + "_" +
datetime.now().strftime("%H%M%S") + ".txt", "w")
file.write(self.result_str)
file.close()
class SSHtool(BaseClass):
def ssh_on(self, commands: str):
self.client = paramiko.SSHClient() # 实例化一个SSH连接客户端
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.login_dict['host'], 22, self.login_dict['username'], self.login_dict['password'],
timeout=6) # 客户端实例化一个连接
self.remote_session = self.client.invoke_shell() # 为客户端实例化一个输入输出shell
time.sleep(5)
self.buff = self.remote_session.recv(65535).decode()
if self.login_dict['super']:
self.remote_session.send('\b super \n')
self.buff = self.remote_session.recv(65535).decode()
self.remote_session.send(
self.login_dict['super'].encode('ascii') + b'\n')
self.result_list = []
for command in commands:
self.remote_session.send(command.encode('ascii') + b'\n')
time.sleep(4)
while True:
self.buff = self.remote_session.recv(
65535).decode() # 接受最大65535个字节的返回
self.result_list.append(self.buff)
if '---- More ----' in self.buff.strip():
self.remote_session.send(b' ')
time.sleep(1)
else:
break
self.data = [c.replace('---- More ----', '') for c in self.result_list]
self.data1 = [
f.replace(' ', '') for f in self.data]
self.result_str = '\n'.join(self.data1)
self.savefile()
return self.result_str
class Telnettool(BaseClass):
def telnet_on(self, commands: str):
tn = Telnet(self.login_dict['host'], port=23, timeout=10)
tn.read_until(b"Username:")
tn.write(self.login_dict['username'].encode('ascii') + b'\n')
tn.read_until(b"Password:")
tn.write(self.login_dict['password'].encode('ascii') + b'\n')
if self.login_dict['super']:
tn.write(b'super \n')
tn.read_until(b" Password:")
tn.write(self.login_dict['super'].encode('ascii') + b'\n')
tn.write(b'sys \n')
result_list = []
for command in commands:
tn.write(command.encode('ascii') + b'\n')
time.sleep(2)
while (True):
command_result = tn.read_very_eager().decode('ascii')
result_list.append(command_result)
if '---- More ----' in command_result.strip():
tn.write(b' ')
time.sleep(0.5)
else:
break
data = [c.replace('---- More ----', '') for c in result_list]
data1 = [
f.replace(' ', '') for f in data]
self.result_str = '\n'.join(data1)
self.savefile()
return self.result_str
if __name__ == '__main__':
commands = ['display ip interface brief', 'display cpu-usage']
ip_list = ['192.168.56.10', '192.168.56.20']
for ip in ip_list:
ssh_obj = SSHtool(host=ip, username='admin', password='admin')
t = threading.Thread(target=ssh_obj.ssh_on, args=(commands,))
t.start()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。