3.可以传入自己配置的交换机地址
1.解析待登录的交换机配置文件,生成初始状态字典
2.批量测试端口可用性,更新状态字典可用性字段,并写入日志文件
3.使用SSH或者Telnet登录交换机,获取配置(尝试登录【登录协议判断、尝试账号密码】、执行命令),更新状态字典
4.根据执行结果与状态字典,写入日志
import os #获取路径 import sys #获取打包后exe文件所在文件夹路径 import re #正则匹配连接后回显内容,用于判断输入用户名还是密码 import shutil #下载待处理的交换机列表文件 import socket #判断端口是否开放 import time from datetime import date import argparse #获取并解析cmd窗口传入的参数 import logging #打印出netmiko框架运行日志 import threading # 多线程 import telnetlib #telnet连接交换机 import paramiko #ssh连接交换机paramiko的再次封装 from netmiko import ConnectHandler #netmiko 通过ssh连接交换机,是paramiko的再次封装
lock = threading.Lock() #线程锁 # 获取当前路径父路径(所在文件夹) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) #获取打包后exe文件路径(所在文件夹) BASE_EXE_DIR = os.path.dirname(sys.executable)
def parseArgs(): # 创建一个ArgumentParser对象 parser = argparse.ArgumentParser(description='这是一个命令行运行autoBackupSwitch程序的说明') # 添加一个位置参数 parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件') # 添加一个可选参数 parser.add_argument('--download', '-d',action='store_true', help='下载默认的交换机列表文件模板') # 解析命令行输入 #args = parser.parse_args(['-i','aaaa','-t','bbbbb']) args = parser.parse_args() if args.download: shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR) return args parseArgs()
def parse_port_file( file_path=parseArgs().ips): print("---开始解析端口文件---") # 获取当日日期 today = date.today().strftime('%Y%m%d') print(today) # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....] ip_port = [] if os.path.exists(file_path): print(f"文件 {file_path} 存在") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines()[1:] for line in lines: line = line.strip() ip=line.split('\t')[0] protocol=line.split('\t')[3] port=line.split('\t')[2] address=line.split('\t')[1] isable=False password_authentication_status=False local_username='' local_password='' ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username}) else: print(f"文件 {file_path} 不存在") # 创建一个名为example.txt的文件 print(ip_port) return ip_port
def check_port(ip, port,ip_info): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: result = sock.connect_ex((ip, port)) if result == 0: ip_info['isable']=True #网络正常,更新 交换机状态字典的端口状态字段 return True else: print(f'The port {port} on {ip} is closed.') ip_info['isable'] = False #网络不通,更新 交换机状态字典的端口状态字段 return False except Exception as e: print(f"错误: {str(e)}") ip_info['isable'] = False #出错,更新交换机状态字典的端口状态字段 return False finally: sock.close()
def bat_test_port(ip_port): threads = [] for ip_info in ip_port: ip=ip_info['ip'] port=ip_info['port'] thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info)) thread.start() threads.append(thread) for thread in threads: thread.join() return ip_port
def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True): 尝试使用密码列表连接到 SSH 服务器。 :param hostname: SSH 服务器的主机名或IP地址 :param port: SSH 服务器的端口号 :param username: 登录用户名 :param passwords: 包含密码的列表 :param timeout: 连接超时时间(秒) :return: 成功连接后返回SSHClient对象,否则返回None # client = paramiko.SSHClient() # client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) passwords_num=len(passwords) for password in passwords: try: print(f"尝试使用密码连接(密码 {password})...") # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1) dev = {'device_type': 'terminal_server', 'host': hostname, 'username': username, 'password': password, 'port': port, 'timeout': 180, 'conn_timeout':180,##必须长,否则没有连接到就直接失败 'auth_timeout':180, 'fast_cli': True, 'global_delay_factor': 0.01, 'session_log': 'session.log', client = ConnectHandler(**dev) print("连接成功!") ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True return client except paramiko.ssh_exception.AuthenticationException: print("密码错误,尝试下一个密码...") except paramiko.ssh_exception.SSHException as e: print(f"SSH 错误: {e}") return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的 except Exception as e: print(f"发生其他错误: {e}") return None # 同样,这里可以处理异常或继续尝试 print("所有密码尝试均失败。") return None
def connet_port(ip_info,command_lists): ip=ip_info['ip'] print(ip) port=ip_info['port'] print(port) passwords = ['xxxxx','*****'] isable=ip_info['isable'] password = '******' username = '*****' # username = 'root' logging.basicConfig(level=logging.DEBUG) if isable and port=='22': print('开始使用SSH协议连接服务器') try: with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh: print(ssh) print('SSH协议连接服务器成功') time.sleep(40) ##必须休眠,有些交换机登录之后,提醒更改密码,休眠40s跳过选择 Q:如何交互,显示回显内容,来确定选择Y/N command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] print('去除分屏状态') for command in command_lists: print('开始执行命令') try: output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True) print(output) bulid_result_file(ip_info, output) ##将结果写入单独文件 except Exception as e: print(f'出错,{command}执行结束,原因{e}') print('命令执行完毕') except Exception: print(f"{ip},连接失败") return 0 # 关闭连接
def bulid_result_file(ip,resp): print("---开始生成交换机执行命令结果存在---") result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt' with lock: with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file: file.write(resp + "\n") file.flush() # 确保数据被立即写入磁盘
if isable and port=='23': with telnetlib.Telnet(ip, port) as tn: tn.set_debuglevel(3) login_putout = '' while 1: time.sleep(3) first_login=tn.read_very_eager().decode('ascii') login_putout+=first_login # time.sleep(1) if re.search(r'password:',login_putout.lower()): tn.write(password.encode() + b"\n") break if re.search(r'username:|login:',login_putout.lower()): tn.write( username.encode() + b"\n") time.sleep(1) continue index,obj,output=tn.expect([b'>',b'#'],timeout=20) if index==1: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True time.sleep(1) tn.write( b"\n") #单独回车,获取提示符例如 prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') #获取提示符 tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'#', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == 0: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'>', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == -1: return None
def bat_check_port(): ip_port=parse_port_file() bat_test_port(ip_port) print(ip_port) time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果 # command_lists=['ls -a','show'] command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] threads = [] for ip_info in ip_port: thread = threading.Thread(target=connet_port, args=(ip_info,command_lists)) thread.start() threads.append(thread) for thread in threads: thread.join() build_log_file(ip_port) 2.7将结果写入文件def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] +"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) +"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 三、 全部代码import argparse import logging import os import re import shutil import socket import sys import telnetlib import threading import time from concurrent.futures import ThreadPoolExecutor from datetime import date, datetime, timedelta import paramiko from netmiko import ConnectHandler from pip._internal.utils import datetime lock = threading.Lock() # 获取当前路径父路径(所在文件夹) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) #获取打包后exe文件路径(所在文件夹) BASE_EXE_DIR = os.path.dirname(sys.executable) # BASE_EXE_DIR = os.path.dirname(os.path.abspath(__file__)) def parseArgs(): # 创建一个ArgumentParser对象 parser = argparse.ArgumentParser(description='这是一个命令行运行bat_check_port程序的说明') # 添加一个位置参数 parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件') # 添加一个可选参数 parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板') # 解析命令行输入 #args = parser.parse_args(['-i','aaaa','-t','bbbbb']) args = parser.parse_args() if args.download: shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR) return args parseArgs() def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True): 尝试使用密码列表连接到 SSH 服务器。 :param hostname: SSH 服务器的主机名或IP地址 :param port: SSH 服务器的端口号 :param username: 登录用户名 :param passwords: 包含密码的列表 :param timeout: 连接超时时间(秒) :return: 成功连接后返回SSHClient对象,否则返回None # client = paramiko.SSHClient() # client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) passwords_num=len(passwords) for password in passwords: try: print(f"尝试使用密码连接(密码 {password})...") # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1) dev = {'device_type': 'terminal_server', 'host': hostname, 'username': username, 'password': password, 'port': port, 'timeout': 180, 'conn_timeout':180, 'auth_timeout':180, 'fast_cli': True, 'global_delay_factor': 0.01, 'session_log': 'session.log', client = ConnectHandler(**dev) print("连接成功!") ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True return client except paramiko.ssh_exception.AuthenticationException: print("密码错误,尝试下一个密码...") except paramiko.ssh_exception.SSHException as e: print(f"SSH 错误: {e}") return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的 except Exception as e: print(f"发生其他错误: {e}") return None # 同样,这里可以处理异常或继续尝试 print("所有密码尝试均失败。") return None def parse_port_file( file_path=parseArgs().ips): print("---开始解析端口文件---") # 获取当日日期 today = date.today().strftime('%Y%m%d') print(today) # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....] ip_port = [] if os.path.exists(file_path): print(f"文件 {file_path} 存在") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines()[1:] for line in lines: line = line.strip() ip=line.split('\t')[0] protocol=line.split('\t')[3] port=line.split('\t')[2] address=line.split('\t')[1] isable=False password_authentication_status=False local_username='' local_password='' ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username}) else: print(f"文件 {file_path} 不存在") # 创建一个名为example.txt的文件 print(ip_port) return ip_port def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 def bulid_result_file(ip,resp): print("---开始生成交换机执行命令结果存在---") result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt' with lock: with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file: file.write(resp + "\n") file.flush() # 确保数据被立即写入磁盘 def check_port(ip, port,ip_info): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: result = sock.connect_ex((ip, port)) if result == 0: ip_info['isable']=True return True else: print(f'The port {port} on {ip} is closed.') ip_info['isable'] = False return False except Exception as e: print(f"错误: {str(e)}") ip_info['isable'] = False return False finally: sock.close() def connet_port(ip_info,command_lists): ip=ip_info['ip'] print(ip) port=ip_info['port'] print(port) passwords = ['*****','*****'] isable=ip_info['isable'] password = 'xxxx' username = '*****' # username = 'root' logging.basicConfig(level=logging.DEBUG) if isable and port=='22': print('开始使用SSH协议连接服务器') try: with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh: print(ssh) print('SSH协议连接服务器成功') time.sleep(40) # 执行命令 # ssh.exec_command('screen-length 0 temporary',get_pty=True) # # 执行命令 # ssh.exec_command('screen-length disable',get_pty=True) # # 执行命令 # ssh.exec_command('terminal length 0',get_pty=True) # # 执行命令 # ssh.exec_command('more off',get_pty=True) print('去除分屏状态') for command in command_lists: print('开始执行命令') try: output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True) print(output) bulid_result_file(ip_info, output) except Exception as e: print(f'出错,{command}执行结束,原因{e}') print('命令执行完毕') except Exception: print(f"{ip},连接失败") return 0 # 关闭连接 if isable and port=='23': print('iiiiiiiiiiiiiiiiii') with telnetlib.Telnet(ip, port) as tn: print(tn) tn.set_debuglevel(0) # print(tn.read_all().decode()) # tn.read_until(b'continue...\n') # tn.write( b"\n") login_putout = '' while 1: time.sleep(3) first_login=tn.read_very_eager().decode('ascii') login_putout+=first_login # time.sleep(1) if re.search(r'password:',login_putout.lower()): tn.write(password.encode() + b"\n") # tn.read_until(b"\n") # tn.write(b"\n") break if re.search(r'username:|login:',login_putout.lower()): tn.write( username.encode() + b"\n") time.sleep(1) # print(tn.read_very_eager()) continue # # tn.read_until(b'login:',timeout=10) # tn.write(b'root'+ b"\n") # tn.read_until(b'Password:',timeout=10) # tn.write(b'123456' + b"\n") index,obj,output=tn.expect([b'>',b'#'],timeout=20) # print(f'1111111----{index}') # print(index) if index==1: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'#', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == 0: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'>', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == -1: return None # # 从这里开始,tn可以用来发送命令和接收响应 # tn.write(b"pwd \n") # print('8888888888888') # print(tn.read_all().decode()) # print('qqqqqqqqq') # tn.close() def bat_test_port(ip_port): threads = [] for ip_info in ip_port: ip=ip_info['ip'] port=ip_info['port'] thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info)) thread.start() threads.append(thread) for thread in threads: thread.join() return ip_port def bat_check_port(): ip_port=parse_port_file() bat_test_port(ip_port) print(ip_port) time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果 # command_lists=['ls -a','show'] command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] threads = [] for ip_info in ip_port: thread = threading.Thread(target=connet_port, args=(ip_info,command_lists)) thread.start() threads.append(thread) for thread in threads: thread.join() build_log_file(ip_port) if __name__ == '__main__': bat_check_port() 四、打包4.1 前置说明4.1.1文件列表 autoBackupSwicthesV2.py 网络设备ip端口3.txt 4.1.2 安装pyinstallerpip install pyinstaller 4.2 打包生成.spec文件pyinstaller -F autoBackupSwicthesV2.py 说明:1.生成autoBackupSwicthesV2.spec文件 4.3 使用.spec文件打包# -*- mode: python ; coding: utf-8 -*- a = Analysis( ['autoBackupSwicthesV2.py'], pathex=[], binaries=[], datas=[("网络设备ip端口3.txt",".")], hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], excludes=[], noarchive=False, optimize=0, pyz = PYZ(a.pure) exe = EXE( pyz, a.scripts, a.binaries, a.datas, [], name='autoBackupSwicthesV2.5', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], runtime_tmpdir=None, console=True, disable_windowed_traceback=False, argv_emulation=False, target_arch=None, codesign_identity=None, entitlements_file=None, 执行pyinstaller .\autoBackupSwitchesV2.spec 五、其他说明5.1 总结socket编程 5.2 易错点/踩坑点1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。 2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。 3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。 5.3 疑问 登录交换机之后出现 提醒更改密码,如何有效获取提示并输入命令?拓展,如何交互式进行读取回显与写入命令,根据回显内容决定下一步命令? 单行过长,会出现乱码,什么原因? 如何有效获取登录 输入命令前的提示符? 5.4 参考文献Netmiko最强攻略——两万字吐血整理,网工玩转自动化https://zhuanlan.zhihu.com/p/541592293
def bat_check_port(): ip_port=parse_port_file() bat_test_port(ip_port) print(ip_port) time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果 # command_lists=['ls -a','show'] command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] threads = [] for ip_info in ip_port: thread = threading.Thread(target=connet_port, args=(ip_info,command_lists)) thread.start() threads.append(thread) for thread in threads: thread.join() build_log_file(ip_port)
2.7将结果写入文件def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] +"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) +"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 三、 全部代码import argparse import logging import os import re import shutil import socket import sys import telnetlib import threading import time from concurrent.futures import ThreadPoolExecutor from datetime import date, datetime, timedelta import paramiko from netmiko import ConnectHandler from pip._internal.utils import datetime lock = threading.Lock() # 获取当前路径父路径(所在文件夹) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) #获取打包后exe文件路径(所在文件夹) BASE_EXE_DIR = os.path.dirname(sys.executable) # BASE_EXE_DIR = os.path.dirname(os.path.abspath(__file__)) def parseArgs(): # 创建一个ArgumentParser对象 parser = argparse.ArgumentParser(description='这是一个命令行运行bat_check_port程序的说明') # 添加一个位置参数 parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件') # 添加一个可选参数 parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板') # 解析命令行输入 #args = parser.parse_args(['-i','aaaa','-t','bbbbb']) args = parser.parse_args() if args.download: shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR) return args parseArgs() def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True): 尝试使用密码列表连接到 SSH 服务器。 :param hostname: SSH 服务器的主机名或IP地址 :param port: SSH 服务器的端口号 :param username: 登录用户名 :param passwords: 包含密码的列表 :param timeout: 连接超时时间(秒) :return: 成功连接后返回SSHClient对象,否则返回None # client = paramiko.SSHClient() # client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) passwords_num=len(passwords) for password in passwords: try: print(f"尝试使用密码连接(密码 {password})...") # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1) dev = {'device_type': 'terminal_server', 'host': hostname, 'username': username, 'password': password, 'port': port, 'timeout': 180, 'conn_timeout':180, 'auth_timeout':180, 'fast_cli': True, 'global_delay_factor': 0.01, 'session_log': 'session.log', client = ConnectHandler(**dev) print("连接成功!") ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True return client except paramiko.ssh_exception.AuthenticationException: print("密码错误,尝试下一个密码...") except paramiko.ssh_exception.SSHException as e: print(f"SSH 错误: {e}") return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的 except Exception as e: print(f"发生其他错误: {e}") return None # 同样,这里可以处理异常或继续尝试 print("所有密码尝试均失败。") return None def parse_port_file( file_path=parseArgs().ips): print("---开始解析端口文件---") # 获取当日日期 today = date.today().strftime('%Y%m%d') print(today) # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....] ip_port = [] if os.path.exists(file_path): print(f"文件 {file_path} 存在") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines()[1:] for line in lines: line = line.strip() ip=line.split('\t')[0] protocol=line.split('\t')[3] port=line.split('\t')[2] address=line.split('\t')[1] isable=False password_authentication_status=False local_username='' local_password='' ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username}) else: print(f"文件 {file_path} 不存在") # 创建一个名为example.txt的文件 print(ip_port) return ip_port def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 def bulid_result_file(ip,resp): print("---开始生成交换机执行命令结果存在---") result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt' with lock: with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file: file.write(resp + "\n") file.flush() # 确保数据被立即写入磁盘 def check_port(ip, port,ip_info): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: result = sock.connect_ex((ip, port)) if result == 0: ip_info['isable']=True return True else: print(f'The port {port} on {ip} is closed.') ip_info['isable'] = False return False except Exception as e: print(f"错误: {str(e)}") ip_info['isable'] = False return False finally: sock.close() def connet_port(ip_info,command_lists): ip=ip_info['ip'] print(ip) port=ip_info['port'] print(port) passwords = ['*****','*****'] isable=ip_info['isable'] password = 'xxxx' username = '*****' # username = 'root' logging.basicConfig(level=logging.DEBUG) if isable and port=='22': print('开始使用SSH协议连接服务器') try: with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh: print(ssh) print('SSH协议连接服务器成功') time.sleep(40) # 执行命令 # ssh.exec_command('screen-length 0 temporary',get_pty=True) # # 执行命令 # ssh.exec_command('screen-length disable',get_pty=True) # # 执行命令 # ssh.exec_command('terminal length 0',get_pty=True) # # 执行命令 # ssh.exec_command('more off',get_pty=True) print('去除分屏状态') for command in command_lists: print('开始执行命令') try: output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True) print(output) bulid_result_file(ip_info, output) except Exception as e: print(f'出错,{command}执行结束,原因{e}') print('命令执行完毕') except Exception: print(f"{ip},连接失败") return 0 # 关闭连接 if isable and port=='23': print('iiiiiiiiiiiiiiiiii') with telnetlib.Telnet(ip, port) as tn: print(tn) tn.set_debuglevel(0) # print(tn.read_all().decode()) # tn.read_until(b'continue...\n') # tn.write( b"\n") login_putout = '' while 1: time.sleep(3) first_login=tn.read_very_eager().decode('ascii') login_putout+=first_login # time.sleep(1) if re.search(r'password:',login_putout.lower()): tn.write(password.encode() + b"\n") # tn.read_until(b"\n") # tn.write(b"\n") break if re.search(r'username:|login:',login_putout.lower()): tn.write( username.encode() + b"\n") time.sleep(1) # print(tn.read_very_eager()) continue # # tn.read_until(b'login:',timeout=10) # tn.write(b'root'+ b"\n") # tn.read_until(b'Password:',timeout=10) # tn.write(b'123456' + b"\n") index,obj,output=tn.expect([b'>',b'#'],timeout=20) # print(f'1111111----{index}') # print(index) if index==1: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'#', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == 0: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'>', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == -1: return None # # 从这里开始,tn可以用来发送命令和接收响应 # tn.write(b"pwd \n") # print('8888888888888') # print(tn.read_all().decode()) # print('qqqqqqqqq') # tn.close() def bat_test_port(ip_port): threads = [] for ip_info in ip_port: ip=ip_info['ip'] port=ip_info['port'] thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info)) thread.start() threads.append(thread) for thread in threads: thread.join() return ip_port def bat_check_port(): ip_port=parse_port_file() bat_test_port(ip_port) print(ip_port) time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果 # command_lists=['ls -a','show'] command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] threads = [] for ip_info in ip_port: thread = threading.Thread(target=connet_port, args=(ip_info,command_lists)) thread.start() threads.append(thread) for thread in threads: thread.join() build_log_file(ip_port) if __name__ == '__main__': bat_check_port() 四、打包4.1 前置说明4.1.1文件列表 autoBackupSwicthesV2.py 网络设备ip端口3.txt 4.1.2 安装pyinstallerpip install pyinstaller 4.2 打包生成.spec文件pyinstaller -F autoBackupSwicthesV2.py 说明:1.生成autoBackupSwicthesV2.spec文件 4.3 使用.spec文件打包# -*- mode: python ; coding: utf-8 -*- a = Analysis( ['autoBackupSwicthesV2.py'], pathex=[], binaries=[], datas=[("网络设备ip端口3.txt",".")], hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], excludes=[], noarchive=False, optimize=0, pyz = PYZ(a.pure) exe = EXE( pyz, a.scripts, a.binaries, a.datas, [], name='autoBackupSwicthesV2.5', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], runtime_tmpdir=None, console=True, disable_windowed_traceback=False, argv_emulation=False, target_arch=None, codesign_identity=None, entitlements_file=None, 执行pyinstaller .\autoBackupSwitchesV2.spec 五、其他说明5.1 总结socket编程 5.2 易错点/踩坑点1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。 2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。 3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。 5.3 疑问 登录交换机之后出现 提醒更改密码,如何有效获取提示并输入命令?拓展,如何交互式进行读取回显与写入命令,根据回显内容决定下一步命令? 单行过长,会出现乱码,什么原因? 如何有效获取登录 输入命令前的提示符? 5.4 参考文献Netmiko最强攻略——两万字吐血整理,网工玩转自动化https://zhuanlan.zhihu.com/p/541592293
def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] +"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) +"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘
import argparse import logging import os import re import shutil import socket import sys import telnetlib import threading import time from concurrent.futures import ThreadPoolExecutor from datetime import date, datetime, timedelta import paramiko from netmiko import ConnectHandler from pip._internal.utils import datetime lock = threading.Lock() # 获取当前路径父路径(所在文件夹) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) #获取打包后exe文件路径(所在文件夹) BASE_EXE_DIR = os.path.dirname(sys.executable) # BASE_EXE_DIR = os.path.dirname(os.path.abspath(__file__)) def parseArgs(): # 创建一个ArgumentParser对象 parser = argparse.ArgumentParser(description='这是一个命令行运行bat_check_port程序的说明') # 添加一个位置参数 parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件') # 添加一个可选参数 parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板') # 解析命令行输入 #args = parser.parse_args(['-i','aaaa','-t','bbbbb']) args = parser.parse_args() if args.download: shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR) return args parseArgs() def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True): 尝试使用密码列表连接到 SSH 服务器。 :param hostname: SSH 服务器的主机名或IP地址 :param port: SSH 服务器的端口号 :param username: 登录用户名 :param passwords: 包含密码的列表 :param timeout: 连接超时时间(秒) :return: 成功连接后返回SSHClient对象,否则返回None # client = paramiko.SSHClient() # client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) passwords_num=len(passwords) for password in passwords: try: print(f"尝试使用密码连接(密码 {password})...") # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1) dev = {'device_type': 'terminal_server', 'host': hostname, 'username': username, 'password': password, 'port': port, 'timeout': 180, 'conn_timeout':180, 'auth_timeout':180, 'fast_cli': True, 'global_delay_factor': 0.01, 'session_log': 'session.log', client = ConnectHandler(**dev) print("连接成功!") ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True return client except paramiko.ssh_exception.AuthenticationException: print("密码错误,尝试下一个密码...") except paramiko.ssh_exception.SSHException as e: print(f"SSH 错误: {e}") return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的 except Exception as e: print(f"发生其他错误: {e}") return None # 同样,这里可以处理异常或继续尝试 print("所有密码尝试均失败。") return None def parse_port_file( file_path=parseArgs().ips): print("---开始解析端口文件---") # 获取当日日期 today = date.today().strftime('%Y%m%d') print(today) # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....] ip_port = [] if os.path.exists(file_path): print(f"文件 {file_path} 存在") with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines()[1:] for line in lines: line = line.strip() ip=line.split('\t')[0] protocol=line.split('\t')[3] port=line.split('\t')[2] address=line.split('\t')[1] isable=False password_authentication_status=False local_username='' local_password='' ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username}) else: print(f"文件 {file_path} 不存在") # 创建一个名为example.txt的文件 print(ip_port) return ip_port def build_log_file(ip_info): print("---开始检测总日志是否存在---") # 获取当日日期 today = date.today().strftime('%Y%m%d') today_log_name = today + '_log.txt' success_name = today + '_connect_success.txt' fail_name = today + '_connect_fail.txt' for ip in ip_info: with lock: with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 if not ip['isable']: with lock: with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file: single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \ "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \ 'password_authentication_status:' + str(ip['password_authentication_status']) + \ "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username'] file.write(single_log + "\n") file.flush() # 确保数据被立即写入磁盘 def bulid_result_file(ip,resp): print("---开始生成交换机执行命令结果存在---") result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt' with lock: with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file: file.write(resp + "\n") file.flush() # 确保数据被立即写入磁盘 def check_port(ip, port,ip_info): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: result = sock.connect_ex((ip, port)) if result == 0: ip_info['isable']=True return True else: print(f'The port {port} on {ip} is closed.') ip_info['isable'] = False return False except Exception as e: print(f"错误: {str(e)}") ip_info['isable'] = False return False finally: sock.close() def connet_port(ip_info,command_lists): ip=ip_info['ip'] print(ip) port=ip_info['port'] print(port) passwords = ['*****','*****'] isable=ip_info['isable'] password = 'xxxx' username = '*****' # username = 'root' logging.basicConfig(level=logging.DEBUG) if isable and port=='22': print('开始使用SSH协议连接服务器') try: with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh: print(ssh) print('SSH协议连接服务器成功') time.sleep(40) # 执行命令 # ssh.exec_command('screen-length 0 temporary',get_pty=True) # # 执行命令 # ssh.exec_command('screen-length disable',get_pty=True) # # 执行命令 # ssh.exec_command('terminal length 0',get_pty=True) # # 执行命令 # ssh.exec_command('more off',get_pty=True) print('去除分屏状态') for command in command_lists: print('开始执行命令') try: output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True) print(output) bulid_result_file(ip_info, output) except Exception as e: print(f'出错,{command}执行结束,原因{e}') print('命令执行完毕') except Exception: print(f"{ip},连接失败") return 0 # 关闭连接 if isable and port=='23': print('iiiiiiiiiiiiiiiiii') with telnetlib.Telnet(ip, port) as tn: print(tn) tn.set_debuglevel(0) # print(tn.read_all().decode()) # tn.read_until(b'continue...\n') # tn.write( b"\n") login_putout = '' while 1: time.sleep(3) first_login=tn.read_very_eager().decode('ascii') login_putout+=first_login # time.sleep(1) if re.search(r'password:',login_putout.lower()): tn.write(password.encode() + b"\n") # tn.read_until(b"\n") # tn.write(b"\n") break if re.search(r'username:|login:',login_putout.lower()): tn.write( username.encode() + b"\n") time.sleep(1) # print(tn.read_very_eager()) continue # # tn.read_until(b'login:',timeout=10) # tn.write(b'root'+ b"\n") # tn.read_until(b'Password:',timeout=10) # tn.write(b'123456' + b"\n") index,obj,output=tn.expect([b'>',b'#'],timeout=20) # print(f'1111111----{index}') # print(index) if index==1: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'#', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'#', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == 0: ip_info['local_username'] = username ip_info['local_password'] = password ip_info['password_authentication_status'] = True print('111111111111111') # tn.write(b'ls -l' + b"\n") # time.sleep(1) # output=tn.read_very_eager().decode() # # print('ooooooooooooooooooooooooooooooooo') # print(output) time.sleep(1) tn.write( b"\n") prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii') tn.write(b'screen-length 0 temporary' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'screen-length disable' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'terminal length 0' + b"\n") tn.read_until(b'>', timeout=10) tn.write(b'more off' + b"\n") tn.read_until(b'>', timeout=10) for command in command_lists: tn.write(command.encode() + b"\n") output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii') print(output) bulid_result_file(ip_info, output) if index == -1: return None # # 从这里开始,tn可以用来发送命令和接收响应 # tn.write(b"pwd \n") # print('8888888888888') # print(tn.read_all().decode()) # print('qqqqqqqqq') # tn.close() def bat_test_port(ip_port): threads = [] for ip_info in ip_port: ip=ip_info['ip'] port=ip_info['port'] thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info)) thread.start() threads.append(thread) for thread in threads: thread.join() return ip_port def bat_check_port(): ip_port=parse_port_file() bat_test_port(ip_port) print(ip_port) time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果 # command_lists=['ls -a','show'] command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config'] threads = [] for ip_info in ip_port: thread = threading.Thread(target=connet_port, args=(ip_info,command_lists)) thread.start() threads.append(thread) for thread in threads: thread.join() build_log_file(ip_port) if __name__ == '__main__': bat_check_port()
pip install pyinstaller
pyinstaller -F autoBackupSwicthesV2.py
说明:1.生成autoBackupSwicthesV2.spec文件
# -*- mode: python ; coding: utf-8 -*- a = Analysis( ['autoBackupSwicthesV2.py'], pathex=[], binaries=[], datas=[("网络设备ip端口3.txt",".")], hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], excludes=[], noarchive=False, optimize=0, pyz = PYZ(a.pure) exe = EXE( pyz, a.scripts, a.binaries, a.datas, [], name='autoBackupSwicthesV2.5', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], runtime_tmpdir=None, console=True, disable_windowed_traceback=False, argv_emulation=False, target_arch=None, codesign_identity=None, entitlements_file=None, 执行pyinstaller .\autoBackupSwitchesV2.spec 五、其他说明5.1 总结socket编程 5.2 易错点/踩坑点1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。 2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。 3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。 5.3 疑问 登录交换机之后出现 提醒更改密码,如何有效获取提示并输入命令?拓展,如何交互式进行读取回显与写入命令,根据回显内容决定下一步命令? 单行过长,会出现乱码,什么原因? 如何有效获取登录 输入命令前的提示符? 5.4 参考文献Netmiko最强攻略——两万字吐血整理,网工玩转自动化https://zhuanlan.zhihu.com/p/541592293
# -*- mode: python ; coding: utf-8 -*- a = Analysis( ['autoBackupSwicthesV2.py'], pathex=[], binaries=[], datas=[("网络设备ip端口3.txt",".")], hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], excludes=[], noarchive=False, optimize=0, pyz = PYZ(a.pure) exe = EXE( pyz, a.scripts, a.binaries, a.datas, [], name='autoBackupSwicthesV2.5', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], runtime_tmpdir=None, console=True, disable_windowed_traceback=False, argv_emulation=False, target_arch=None, codesign_identity=None, entitlements_file=None, 执行pyinstaller .\autoBackupSwitchesV2.spec
执行pyinstaller .\autoBackupSwitchesV2.spec
pyinstaller .\autoBackupSwitchesV2.spec
五、其他说明5.1 总结socket编程 5.2 易错点/踩坑点1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。 2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。 3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。 5.3 疑问 登录交换机之后出现 提醒更改密码,如何有效获取提示并输入命令?拓展,如何交互式进行读取回显与写入命令,根据回显内容决定下一步命令? 单行过长,会出现乱码,什么原因? 如何有效获取登录 输入命令前的提示符? 5.4 参考文献Netmiko最强攻略——两万字吐血整理,网工玩转自动化https://zhuanlan.zhihu.com/p/541592293
socket编程
1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。
2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。
3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。