运维系列之FastAPI接口服务怎么用
小编给大家分享一下运维系列之FastAPI接口服务怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
第一部分:FastAPI接口部分
from fastapi import FastAPI, Path
from com.fy.fastapi.monitor.shell.fabric.FabricLinux import FabricLinux
app = FastAPI()
#执行shell命令;
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/cmd/{cmd}")
def runCommand(cmd, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand(cmd)
fabric.closeClient()
return {"res": result }
#上传文件;
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/upload/{srcFile}")
def upload(srcFile, targetDir, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.upload(srcFile, targetDir)
fabric.closeClient()
return {"res": result }
#下载文件;
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/download/{srcFile}")
def download(srcFile, targetDir, host, userName, password):
#fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")
fabric = FabricLinux(host , userName , password)
result = fabric.download(srcFile, targetDir)
fabric.closeClient()
return {"res": result }
#删除文件
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/delete/{targetPath}")
def delete(targetPath:"必须是全路径", host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("rm -rf " + targetPath)
fabric.closeClient()
return {"res": result }
#解压.tar.gz文件
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/delete/{targetPath}")
def decomTarGz(srcPath, tarPath, host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.decomTarGz(srcPath, tarPath)
fabric.closeClient()
return {"res": result }
#根据PID关闭相关的服务;
#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95
@app.get("/kill/{pid}")
def stop(pid: int=Path(..., gt=0, le=32767), host, userName, password):
''' gt: 大于; le: 小于等于 '''
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("kill -9 " + pid)
fabric.closeClient()
return {"res": result }
#根据Python命令符,以及启动文件路径,启动相应的服务;
#调用示例:http://127.0.0.1:8000/start/python?startFile=%E6%B5%8B%E8%AF%95
@app.get("/start/{pycmd}")
def start(pycmd , startFile , host, userName, password):
fabric = FabricLinux(host , userName , password)
result = fabric.runCommand("nohup " + pycmd + " " + startFile + " &")
fabric.closeClient()
return {"res": result }
第二部分:fabric接口部分,主要用来执行命令行
from fabric import Connection
import traceback, os
class FabricLinux:
def __init__(self, host:"服务器IP", userName:"用户名", password:"密码"):
self.host = host
self.userName = userName
self.password = password
print(self.userName + "@" + self.host, {"password": self.password})
self.initClient()#初始化链接
#初始化ssh链接对象;
def initClient(self):
#如果服务器配置了SSH免密码登录,就不需要 connect_kwargs 来指定密码了。
self.con = Connection(self.userName + "@" + self.host, connect_kwargs={"password": self.password})
#关闭fabric操作对象;
def closeClient(self):
self.con.close()
#执行shell命令;
def runCommand(self, sshCommand:"Linux命令行语句"):
#top命令尚未测试通过;
#如果命令行中包含路径,最好使用绝对路径;
try:
#语法:run('远程命令')
result = self.con.run(sshCommand, hide=True)
if result.return_code == 0:# 返回码,0表示正确执行,1表示错误
return True, result.stdout
return result.failed, result.stdout
except:
exp = traceback.format_exc()
if "mkdir" in exp and 'File exists' in exp:
print("目录【", sshCommand, "】已存在")
else:
print(exp)
return False, exp
#在特定目录下,执行shell命令;
def runDir(self, sshCommand:"Linux命令行语句", dir):
if not os.path.isdir(dir):
return "目标路径错误,必须是目录"
with self.cd(dir):#with表示,with块的代码是在dir目录下执行;
return self.runCommand(sshCommand)
#解压.tar.gz文件;
def decomTarGz(self, srcPath, tarPath):
if os.path.isdir(srcPath):
return "带解压的路径,必须是文件类型"
if not os.path.isdir(tarPath):
return "目标路径错误,必须是目录"
return fabricu.runCommand("tar -zxvf " + srcPath + " -C " + tarPath)
#切换到某个目录下(上下文不连贯)
def cd(self, dir):
#语法:cd('远程目录')
self.con.cd(dir)
#上传本地文件到远程主机
def upload(self, src:"待上传的文件全路径。路径中最好不要有空格等", target:"保存到服务器上的目录"):
if not os.path.isdir(target):
return "待上传的文件全路径"
elif " " in src:
return "路径中不允许有空格、(、)等特殊字符"
#语法:put('本地文件', '远程目录')
else:return self.con.put(src, target)
#从远程主机下载文件到本地
def download(self , src:"远程文件全路径", target:"本地文件路径(必须包含文件名称的全路径)"):
#语法:get('远程文件', '本地文件路径')
#示例:fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")
if not os.path.isdir(target):
return self.con.get(src, target)
else:return "目标路径错误,必须是包含文件名称的全路径"
看完了这篇文章,相信你对"运维系列之FastAPI接口服务怎么用"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!