添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

3.可以传入自己配置的交换机地址

二、总体思路整理

2.1 主要思路

1.解析待登录的交换机配置文件,生成初始状态字典

2.批量测试端口可用性,更新状态字典可用性字段,并写入日志文件

3.使用SSH或者Telnet登录交换机,获取配置(尝试登录【登录协议判断、尝试账号密码】、执行命令),更新状态字典

4.根据执行结果与状态字典,写入日志

2.2 解析配置文件

2.2.1引入模块

import os    #获取路径
import sys  #获取打包后exe文件所在文件夹路径
import re   #正则匹配连接后回显内容,用于判断输入用户名还是密码
import shutil  #下载待处理的交换机列表文件
import socket  #判断端口是否开放
import time
from datetime import date
import argparse    #获取并解析cmd窗口传入的参数       
import logging   #打印出netmiko框架运行日志
import threading  # 多线程
import telnetlib  #telnet连接交换机
import paramiko  #ssh连接交换机paramiko的再次封装
from netmiko import ConnectHandler  #netmiko 通过ssh连接交换机,是paramiko的再次封装

2.2.2 全局参数

lock = threading.Lock()  #线程锁
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)

2.2.3 读取cmd运行传参并解析

def parseArgs():
    # 创建一个ArgumentParser对象
    parser = argparse.ArgumentParser(description='这是一个命令行运行autoBackupSwitch程序的说明')
    # 添加一个位置参数
    parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件')
    # 添加一个可选参数
    parser.add_argument('--download', '-d',action='store_true', help='下载默认的交换机列表文件模板')
    # 解析命令行输入
    #args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
    args = parser.parse_args()
    if args.download:
        shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR)
    return args
parseArgs()

2.2.4 读取交换机地址端口文件

def parse_port_file( file_path=parseArgs().ips):
    print("---开始解析端口文件---")
    # 获取当日日期
    today = date.today().strftime('%Y%m%d')
    print(today)
    # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....]
    ip_port = []
    if os.path.exists(file_path):
        print(f"文件 {file_path} 存在")
        with open(file_path, 'r', encoding='utf-8') as file:
            lines = file.readlines()[1:]
            for line in lines:
                line = line.strip()
                ip=line.split('\t')[0]
                protocol=line.split('\t')[3]
                port=line.split('\t')[2]
                address=line.split('\t')[1]
                isable=False
                password_authentication_status=False
                local_username=''
                local_password=''
                ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username})
    else:
        print(f"文件 {file_path} 不存在")
        # 创建一个名为example.txt的文件
    print(ip_port)
    return ip_port

2.3批量测试端口可用性

2.3.1 单个端口测试

def check_port(ip, port,ip_info):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    try:
        result = sock.connect_ex((ip, port))
        if result == 0:
            ip_info['isable']=True   #网络正常,更新  交换机状态字典的端口状态字段
            return True
        else:
            print(f'The port {port}  on {ip} is closed.')
            ip_info['isable'] = False   #网络不通,更新  交换机状态字典的端口状态字段
            return False
    except Exception as e:
        print(f"错误: {str(e)}")
        ip_info['isable'] = False    #出错,更新交换机状态字典的端口状态字段
        return False
    finally:
        sock.close()

2.3.2 多线程多个端口批量测试

def bat_test_port(ip_port):
    threads = []
    for ip_info in ip_port:
        ip=ip_info['ip']
        port=ip_info['port']
        thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    return ip_port

2.4 使用SSH或者Telnet登录交换机

2.4.1 netmiko 使用SSH协议登录交换机

def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True):
    尝试使用密码列表连接到 SSH 服务器。
    :param hostname: SSH 服务器的主机名或IP地址
    :param port: SSH 服务器的端口号
    :param username: 登录用户名
    :param passwords: 包含密码的列表
    :param timeout: 连接超时时间(秒)
    :return: 成功连接后返回SSHClient对象,否则返回None
    # client = paramiko.SSHClient()
    # client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    passwords_num=len(passwords)
    for password in passwords:
        try:
            print(f"尝试使用密码连接(密码 {password})...")
            # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1)
            dev = {'device_type': 'terminal_server',
                   'host': hostname,
                   'username': username,
                   'password': password,
                   'port': port,
                   'timeout': 180,  
                   'conn_timeout':180,##必须长,否则没有连接到就直接失败
                   'auth_timeout':180,
                   'fast_cli': True,
                   'global_delay_factor': 0.01,
                   'session_log': 'session.log',
            client = ConnectHandler(**dev)
            print("连接成功!")
            ip_info['local_username'] = username
            ip_info['local_password'] = password
            ip_info['password_authentication_status'] = True
            return client
        except paramiko.ssh_exception.AuthenticationException:
                print("密码错误,尝试下一个密码...")
        except paramiko.ssh_exception.SSHException as e:
            print(f"SSH 错误: {e}")
            return None  # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的
        except Exception as e:
            print(f"发生其他错误: {e}")
            return None  # 同样,这里可以处理异常或继续尝试
    print("所有密码尝试均失败。")
    return None

2.4.2 使用SSH协议登录交换机进行查询配置

def connet_port(ip_info,command_lists):
    ip=ip_info['ip']
    print(ip)
    port=ip_info['port']
    print(port)
    passwords = ['xxxxx','*****']
    isable=ip_info['isable']
    password = '******'
    username = '*****'
    # username = 'root'
    logging.basicConfig(level=logging.DEBUG)
    if isable and port=='22':
        print('开始使用SSH协议连接服务器')
        try:
            with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh:
                print(ssh)
                print('SSH协议连接服务器成功')
                time.sleep(40)  ##必须休眠,有些交换机登录之后,提醒更改密码,休眠40s跳过选择
                Q:如何交互,显示回显内容,来确定选择Y/N
                    command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal                 length 0','more off','dis cu', 'show running-config']
                print('去除分屏状态')
                for command in command_lists:
                    print('开始执行命令')
                    try:
                        output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True)
                        print(output)
                        bulid_result_file(ip_info, output)   ##将结果写入单独文件
                    except Exception as e:
                        print(f'出错,{command}执行结束,原因{e}')
                print('命令执行完毕')
        except Exception:
            print(f"{ip},连接失败")
            return 0
                # 关闭连接

2.4.3 写入结果文件

def bulid_result_file(ip,resp):
    print("---开始生成交换机执行命令结果存在---")
    result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt'
    with lock:
        with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file:
            file.write(resp + "\n")
            file.flush()  # 确保数据被立即写入磁盘

2.5 telnet 登录并执行命令

if isable and port=='23':
    with telnetlib.Telnet(ip, port) as tn:
        tn.set_debuglevel(3)
        login_putout = ''
        while 1:
            time.sleep(3)
            first_login=tn.read_very_eager().decode('ascii')
            login_putout+=first_login
            # time.sleep(1)
            if re.search(r'password:',login_putout.lower()):
                tn.write(password.encode() + b"\n")
                break
            if re.search(r'username:|login:',login_putout.lower()):
                tn.write( username.encode() + b"\n")
                time.sleep(1)
                continue
        index,obj,output=tn.expect([b'>',b'#'],timeout=20)
        if index==1:
            ip_info['local_username'] = username
            ip_info['local_password'] = password
            ip_info['password_authentication_status'] = True
            time.sleep(1)
            tn.write( b"\n") #单独回车,获取提示符例如
            prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') #获取提示符
            tn.write(b'screen-length 0 temporary' + b"\n")
            tn.read_until(b'#', timeout=10)
            tn.write(b'screen-length disable' + b"\n")
            tn.read_until(b'#', timeout=10)
            tn.write(b'terminal length 0' + b"\n")
            tn.read_until(b'#', timeout=10)
            tn.write(b'more off' + b"\n")
            tn.read_until(b'#', timeout=10)
            for command in command_lists:
                tn.write(command.encode() + b"\n")
                output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
                print(output)
                bulid_result_file(ip_info, output)
        if index == 0:
            ip_info['local_username'] = username
            ip_info['local_password'] = password
            ip_info['password_authentication_status'] = True
            print('111111111111111')
            # tn.write(b'ls -l' + b"\n")
            # time.sleep(1)
            # output=tn.read_very_eager().decode()
            # # print('ooooooooooooooooooooooooooooooooo')
            # print(output)
            time.sleep(1)
            tn.write( b"\n")
            prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii')
            tn.write(b'screen-length 0 temporary' + b"\n")
            tn.read_until(b'>', timeout=10)
            tn.write(b'screen-length disable' + b"\n")
            tn.read_until(b'>', timeout=10)
            tn.write(b'terminal length 0' + b"\n")
            tn.read_until(b'>', timeout=10)
            tn.write(b'more off' + b"\n")
            tn.read_until(b'>', timeout=10)
            for command in command_lists:
                tn.write(command.encode() + b"\n")
                output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
                print(output)
                bulid_result_file(ip_info, output)
        if index == -1:
            return None

2.6多线程执行获取配置结果

def bat_check_port():
    ip_port=parse_port_file()
    bat_test_port(ip_port)
    print(ip_port)
    time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果
    # command_lists=['ls -a','show']
    command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config']
    threads = []
    for ip_info in ip_port:
        thread = threading.Thread(target=connet_port, args=(ip_info,command_lists))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    build_log_file(ip_port)

2.7将结果写入文件

def build_log_file(ip_info):
    print("---开始检测总日志是否存在---")
    # 获取当日日期
    today = date.today().strftime('%Y%m%d')
    today_log_name = today + '_log.txt'
    success_name = today + '_connect_success.txt'
    fail_name = today + '_connect_fail.txt'
    for ip in ip_info:
        with lock:
            with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file:
                single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                file.write(single_log + "\n")
                file.flush()  # 确保数据被立即写入磁盘
        if ip['isable']:
            with lock:
                with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file:
                    single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] +"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                    file.write(single_log + "\n")
                    file.flush()  # 确保数据被立即写入磁盘
        if not ip['isable']:
            with lock:
                with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file:
                    single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) +"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                    file.write(single_log + "\n")
                    file.flush()  # 确保数据被立即写入磁盘

三、 全部代码

import argparse
import logging
import os
import re
import shutil
import socket
import sys
import telnetlib
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime, timedelta
import paramiko
from netmiko import ConnectHandler
from pip._internal.utils import datetime
lock = threading.Lock()
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)
# BASE_EXE_DIR = os.path.dirname(os.path.abspath(__file__))
def parseArgs():
    # 创建一个ArgumentParser对象
    parser = argparse.ArgumentParser(description='这是一个命令行运行bat_check_port程序的说明')
    # 添加一个位置参数
    parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件')
    # 添加一个可选参数
    parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板')
    # 解析命令行输入
    #args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
    args = parser.parse_args()
    if args.download:
        shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR)
    return args
parseArgs()
def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True):
    尝试使用密码列表连接到 SSH 服务器。
    :param hostname: SSH 服务器的主机名或IP地址
    :param port: SSH 服务器的端口号
    :param username: 登录用户名
    :param passwords: 包含密码的列表
    :param timeout: 连接超时时间(秒)
    :return: 成功连接后返回SSHClient对象,否则返回None
    # client = paramiko.SSHClient()
    # client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    passwords_num=len(passwords)
    for password in passwords:
        try:
            print(f"尝试使用密码连接(密码 {password})...")
            # 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1)
            dev = {'device_type': 'terminal_server',
                   'host': hostname,
                   'username': username,
                   'password': password,
                   'port': port,
                   'timeout': 180,
                   'conn_timeout':180,
                   'auth_timeout':180,
                   'fast_cli': True,
                   'global_delay_factor': 0.01,
                   'session_log': 'session.log',
            client = ConnectHandler(**dev)
            print("连接成功!")
            ip_info['local_username'] = username
            ip_info['local_password'] = password
            ip_info['password_authentication_status'] = True
            return client
        except paramiko.ssh_exception.AuthenticationException:
                print("密码错误,尝试下一个密码...")
        except paramiko.ssh_exception.SSHException as e:
            print(f"SSH 错误: {e}")
            return None  # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的
        except Exception as e:
            print(f"发生其他错误: {e}")
            return None  # 同样,这里可以处理异常或继续尝试
    print("所有密码尝试均失败。")
    return None
def parse_port_file( file_path=parseArgs().ips):
    print("---开始解析端口文件---")
    # 获取当日日期
    today = date.today().strftime('%Y%m%d')
    print(today)
    # 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....]
    ip_port = []
    if os.path.exists(file_path):
        print(f"文件 {file_path} 存在")
        with open(file_path, 'r', encoding='utf-8') as file:
            lines = file.readlines()[1:]
            for line in lines:
                line = line.strip()
                ip=line.split('\t')[0]
                protocol=line.split('\t')[3]
                port=line.split('\t')[2]
                address=line.split('\t')[1]
                isable=False
                password_authentication_status=False
                local_username=''
                local_password=''
                ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username})
    else:
        print(f"文件 {file_path} 不存在")
        # 创建一个名为example.txt的文件
    print(ip_port)
    return ip_port
def build_log_file(ip_info):
    print("---开始检测总日志是否存在---")
    # 获取当日日期
    today = date.today().strftime('%Y%m%d')
    today_log_name = today + '_log.txt'
    success_name = today + '_connect_success.txt'
    fail_name = today + '_connect_fail.txt'
    for ip in ip_info:
        with lock:
            with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file:
                single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
                             "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
                             'password_authentication_status:' + str(ip['password_authentication_status']) + \
                             "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                file.write(single_log + "\n")
                file.flush()  # 确保数据被立即写入磁盘
        if ip['isable']:
            with lock:
                with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file:
                    single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
                                 "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
                                 'password_authentication_status:' + str(ip['password_authentication_status']) + \
                                 "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                    file.write(single_log + "\n")
                    file.flush()  # 确保数据被立即写入磁盘
        if not ip['isable']:
            with lock:
                with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file:
                    single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
                                 "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
                                 'password_authentication_status:' + str(ip['password_authentication_status']) + \
                                 "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
                    file.write(single_log + "\n")
                    file.flush()  # 确保数据被立即写入磁盘
def bulid_result_file(ip,resp):
    print("---开始生成交换机执行命令结果存在---")
    result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt'
    with lock:
        with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file:
            file.write(resp + "\n")
            file.flush()  # 确保数据被立即写入磁盘
def check_port(ip, port,ip_info):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    try:
        result = sock.connect_ex((ip, port))
        if result == 0:
            ip_info['isable']=True
            return True
        else:
            print(f'The port {port}  on {ip} is closed.')
            ip_info['isable'] = False
            return False
    except Exception as e:
        print(f"错误: {str(e)}")
        ip_info['isable'] = False
        return False
    finally:
        sock.close()
def connet_port(ip_info,command_lists):
    ip=ip_info['ip']
    print(ip)
    port=ip_info['port']
    print(port)
    passwords = ['*****','*****']
    isable=ip_info['isable']
    password = 'xxxx'
    username = '*****'
    # username = 'root'
    logging.basicConfig(level=logging.DEBUG)
    if isable and port=='22':
        print('开始使用SSH协议连接服务器')
        try:
            with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh:
                print(ssh)
                print('SSH协议连接服务器成功')
                time.sleep(40)
                    # 执行命令
                # ssh.exec_command('screen-length 0 temporary',get_pty=True)
                # # 执行命令
                # ssh.exec_command('screen-length disable',get_pty=True)
                # # 执行命令
                # ssh.exec_command('terminal length 0',get_pty=True)
                # # 执行命令
                # ssh.exec_command('more off',get_pty=True)
                print('去除分屏状态')
                for command in command_lists:
                    print('开始执行命令')
                    try:
                        output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True)
                        print(output)
                        bulid_result_file(ip_info, output)
                    except Exception as e:
                        print(f'出错,{command}执行结束,原因{e}')
                print('命令执行完毕')
        except Exception:
            print(f"{ip},连接失败")
            return 0
                # 关闭连接
    if isable and port=='23':
        print('iiiiiiiiiiiiiiiiii')
        with telnetlib.Telnet(ip, port) as tn:
            print(tn)
            tn.set_debuglevel(0)
            # print(tn.read_all().decode())
            # tn.read_until(b'continue...\n')
            # tn.write( b"\n")
            login_putout = ''
            while 1:
                time.sleep(3)
                first_login=tn.read_very_eager().decode('ascii')
                login_putout+=first_login
                # time.sleep(1)
                if re.search(r'password:',login_putout.lower()):
                    tn.write(password.encode() + b"\n")
                    # tn.read_until(b"\n")
                    # tn.write(b"\n")
                    break
                if re.search(r'username:|login:',login_putout.lower()):
                    tn.write( username.encode() + b"\n")
                    time.sleep(1)
                    # print(tn.read_very_eager())
                    continue
            # # tn.read_until(b'login:',timeout=10)
            # tn.write(b'root'+ b"\n")
            # tn.read_until(b'Password:',timeout=10)
            # tn.write(b'123456' + b"\n")
            index,obj,output=tn.expect([b'>',b'#'],timeout=20)
            # print(f'1111111----{index}')
            # print(index)
            if index==1:
                ip_info['local_username'] = username
                ip_info['local_password'] = password
                ip_info['password_authentication_status'] = True
                print('111111111111111')
                # tn.write(b'ls -l' + b"\n")
                # time.sleep(1)
                # output=tn.read_very_eager().decode()
                # # print('ooooooooooooooooooooooooooooooooo')
                # print(output)
                time.sleep(1)
                tn.write( b"\n")
                prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii')
                tn.write(b'screen-length 0 temporary' + b"\n")
                tn.read_until(b'#', timeout=10)
                tn.write(b'screen-length disable' + b"\n")
                tn.read_until(b'#', timeout=10)
                tn.write(b'terminal length 0' + b"\n")
                tn.read_until(b'#', timeout=10)
                tn.write(b'more off' + b"\n")
                tn.read_until(b'#', timeout=10)
                for command in command_lists:
                    tn.write(command.encode() + b"\n")
                    output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
                    print(output)
                    bulid_result_file(ip_info, output)
            if index == 0:
                ip_info['local_username'] = username
                ip_info['local_password'] = password
                ip_info['password_authentication_status'] = True
                print('111111111111111')
                # tn.write(b'ls -l' + b"\n")
                # time.sleep(1)
                # output=tn.read_very_eager().decode()
                # # print('ooooooooooooooooooooooooooooooooo')
                # print(output)
                time.sleep(1)
                tn.write( b"\n")
                prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii')
                tn.write(b'screen-length 0 temporary' + b"\n")
                tn.read_until(b'>', timeout=10)
                tn.write(b'screen-length disable' + b"\n")
                tn.read_until(b'>', timeout=10)
                tn.write(b'terminal length 0' + b"\n")
                tn.read_until(b'>', timeout=10)
                tn.write(b'more off' + b"\n")
                tn.read_until(b'>', timeout=10)
                for command in command_lists:
                    tn.write(command.encode() + b"\n")
                    output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
                    print(output)
                    bulid_result_file(ip_info, output)
            if index == -1:
                return None
        # # 从这里开始,tn可以用来发送命令和接收响应
        # tn.write(b"pwd \n")
        # print('8888888888888')
        # print(tn.read_all().decode())
        # print('qqqqqqqqq')
        # tn.close()
def bat_test_port(ip_port):
    threads = []
    for ip_info in ip_port:
        ip=ip_info['ip']
        port=ip_info['port']
        thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    return ip_port
def bat_check_port():
    ip_port=parse_port_file()
    bat_test_port(ip_port)
    print(ip_port)
    time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果
    # command_lists=['ls -a','show']
    command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config']
    threads = []
    for ip_info in ip_port:
        thread = threading.Thread(target=connet_port, args=(ip_info,command_lists))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    build_log_file(ip_port)
if __name__ == '__main__':
    bat_check_port()

四、打包

4.1 前置说明

4.1.1文件列表