111 lines
4.5 KiB
Python
111 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""启动 kr宝塔 sshd 并放行安全组,结果写文件。"""
|
|
import os, re, sys, base64, time
|
|
|
|
_here = os.path.dirname(os.path.abspath(__file__))
|
|
ROOT = os.path.dirname(os.path.dirname(_here)) # 卡若AI
|
|
INDEX = os.path.join(ROOT, "运营中枢", "工作台", "00_账号与API索引.md")
|
|
OUT = os.path.join(ROOT, "运营中枢", "工作台", "kr_ssh_start_result.txt")
|
|
|
|
def log(msg):
|
|
with open(OUT, "a", encoding="utf-8") as f:
|
|
f.write(msg + "\n")
|
|
|
|
def main():
|
|
# 确保输出目录存在
|
|
os.makedirs(os.path.dirname(OUT), exist_ok=True)
|
|
if os.path.isfile(OUT):
|
|
os.remove(OUT)
|
|
log("=== kr宝塔 SSH 启动与连接检查 ===\n")
|
|
|
|
# 读凭证
|
|
with open(INDEX) as f:
|
|
t = f.read()
|
|
sid = skey = None
|
|
in_t = False
|
|
for line in t.splitlines():
|
|
if "### 腾讯云" in line: in_t = True; continue
|
|
if in_t and line.strip().startswith("###"): break
|
|
if in_t:
|
|
m = re.search(r"SecretId[^|]*\|\s*`([^`]+)`", line, re.I)
|
|
if m and "AKID" in m.group(1): sid = m.group(1).strip()
|
|
m = re.search(r"SecretKey\s*\|\s*`([^`]+)`", line, re.I)
|
|
if m: skey = m.group(1).strip()
|
|
if not sid or not skey:
|
|
log("ERR: 未找到腾讯云凭证"); return 1
|
|
|
|
from tencentcloud.common import credential
|
|
from tencentcloud.cvm.v20170312 import cvm_client, models as cvm_models
|
|
from tencentcloud.vpc.v20170312 import vpc_client, models as vpc_models
|
|
from tencentcloud.tat.v20201028 import tat_client, models as tat_models
|
|
|
|
cred = credential.Credential(sid, skey)
|
|
KR_ID = "ins-aw0tnqjo"
|
|
REGION = "ap-guangzhou"
|
|
|
|
# 1. 安全组放行 22、22022
|
|
log("1. 安全组放行 22、22022")
|
|
try:
|
|
cvm = cvm_client.CvmClient(cred, REGION)
|
|
r = cvm.DescribeInstances(cvm_models.DescribeInstancesRequest(InstanceIds=[KR_ID]))
|
|
ins = (r.InstanceSet or [None])[0]
|
|
if not ins: log(" ERR: 未找到实例"); return 1
|
|
sg_ids = list(getattr(ins, "SecurityGroupIds", None) or [])
|
|
vpc = vpc_client.VpcClient(cred, REGION)
|
|
for port, desc in [("22", "SSH"), ("22022", "SSH-宝塔")]:
|
|
for sg_id in sg_ids:
|
|
try:
|
|
req = vpc_models.CreateSecurityGroupPoliciesRequest()
|
|
req.SecurityGroupId = sg_id
|
|
ps = vpc_models.SecurityGroupPolicySet()
|
|
ing = vpc_models.SecurityGroupPolicy()
|
|
ing.Protocol, ing.Port, ing.CidrBlock = "TCP", port, "0.0.0.0/0"
|
|
ing.Action, ing.PolicyDescription = "ACCEPT", desc
|
|
ps.Ingress = [ing]
|
|
req.SecurityGroupPolicySet = ps
|
|
vpc.CreateSecurityGroupPolicies(req)
|
|
log(" OK %s -> %s/TCP" % (sg_id, port))
|
|
except Exception as e:
|
|
if "RuleAlreadyExists" in str(e) or "已存在" in str(e): log(" 已存在 %s" % port)
|
|
else: log(" ERR %s: %s" % (port, e))
|
|
except Exception as e:
|
|
log(" 安全组 ERR: %s" % e)
|
|
|
|
# 2. TAT 启动 sshd
|
|
log("\n2. TAT 启动 sshd")
|
|
CMD = """systemctl enable sshd; systemctl start sshd; sleep 1; systemctl is-active sshd; ss -tlnp | grep sshd"""
|
|
try:
|
|
tat = tat_client.TatClient(cred, REGION)
|
|
req = tat_models.RunCommandRequest()
|
|
req.Content = base64.b64encode(CMD.encode()).decode()
|
|
req.InstanceIds = [KR_ID]
|
|
req.CommandType = "SHELL"
|
|
req.Timeout = 30
|
|
req.CommandName = "kr_sshd_start"
|
|
r = tat.RunCommand(req)
|
|
inv_id = r.InvocationId
|
|
log(" InvocationId: %s" % inv_id)
|
|
for _ in range(8):
|
|
time.sleep(4)
|
|
req2 = tat_models.DescribeInvocationTasksRequest()
|
|
req2.Filters = [{"Name": "invocation-id", "Values": [inv_id]}]
|
|
r2 = tat.DescribeInvocationTasks(req2)
|
|
tasks = r2.InvocationTaskSet or []
|
|
if tasks and tasks[0].TaskStatus in ("SUCCESS", "FAILED", "TIMEOUT"):
|
|
res = tasks[0].TaskResult
|
|
log(" Status: %s" % tasks[0].TaskStatus)
|
|
if res and res.Output:
|
|
log(" Output:\n" + base64.b64decode(res.Output).decode("utf-8", errors="replace"))
|
|
break
|
|
except Exception as e:
|
|
log(" TAT ERR: %s" % e)
|
|
|
|
log("\n3. 请在本机执行 SSH 测试:")
|
|
log(" ssh -p 22022 -o StrictHostKeyChecking=no root@43.139.27.93")
|
|
log(" 密码: Zhiqun1984 (首字母大写Z)")
|
|
log("\n完成。")
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|