1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import paramiko
from typing import Dict
def ssh_command(hostname: str, port: str, username: str, password: str, command: str, timeout: int = 90,
get_pty: bool = False) -> Dict:
"""
连接远程服务器执行shell命令
:param hostname: 远程服务器ip地址
:param port: 指定远程服务器的ssh端口
:param username: 远程用户名
:param password: 密码
:param command: 执行的命令
:param timeout: 执行命令的超时时间
:param get_pty: 是否启用tty 即伪终端
:return:
"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname=hostname, port=port, username=username, password=password, timeout=8)
stdin, stdout, stderr = ssh.exec_command(command=command, timeout=timeout, get_pty=get_pty)
exit_code = stdout.channel.recv_exit_status()
ssh_result = {
'output': stdout.readlines(),
'IP': hostname,
'msg': 1,
'error': stderr.readlines(),
'exit_code': exit_code,
}
ssh.close()
except Exception as e:
ssh_result = {
'output': 0,
'IP': hostname,
'msg': 0,
'error': [str(e)],
'exit_code': -1,
}
ssh.close()
return ssh_result
|