需要在mysql数据库中查到相关文件的在服务器的路径,然后通过scp来下载相关文件,之前是手动操作,我现在要写成一个脚本
如何使用python连接mysql数据库如何使用python执行scp命令.使用 pymysql模块连接mysql获取路径使用 paramiko模块执行scp命令通过使用PyInstaller打包为一个exe,可以直接给运维人员使用
何谓喜欢一个人,遇上她之前不知情为何物,错过之后,便更不知了 ——烽火戏诸侯《雪中悍刀行》
pymysql是一个基于python的 MySQL 客户端库
import pymysql.cursors
# Connect to the database
connection=pymysql.connect(host='localhost',
user='user',
password='passwd',
database='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
# Create a new record
sql="INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
# connection is not autocommit by default. So you must commit to save
# your changes.
connection.commit()
with connection.cursor() as cursor:
# Read a single record
sql="SELECT `id`, `password` FROM `users` WHERE `email`=%s"
cursor.execute(sql, ('webmaster@python.org',))
result=cursor.fetchone()
print(result)
Paramiko是 SSH协议的纯 Python实现 ,提供客户端和服务器功能。它为高级 SSH 库Fabric提供了基础,可以运行远程 shell 命令或传输文件。官网:https://www.paramiko.org/,下面是一个简单的Demo
import paramiko
# Connect
client=paramiko.SSHClient()
client.connect(host, port, username)
# Obtain session
session=client.get_transport().open_session()
# Forward local agent
AgentRequestHandler(session)
# Commands executed after this point will see the forwarded agent on
# the remote end.
session.exec_command("git clone https://my.git.repository/")
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@File : apps.py
@Time : 2021/12/14 16:34:56
@Author : Li Ruilong
@Version : 1.0
@Contact : 1224965096@qq.com
@Desc : 一个从mysql数据库获取文件路径,通过scp远程下载文件到本地的脚本
pip install pymysql
pip install paramiko
'''
# here put the import lib
import pymysql
import os
import time
import paramiko
# mysql数据库相关配置
host='127.0.0.1'
port=3306
user='user'
password='***********'
db='dbname'
# ssh相关配置
ssh_hostname="127.0.0.1"
ssh_username="user"
ssh_password='***********'
def initDB():
'''连接数据库的操作
Args:
host(str)
port(int)
user(str)
password(str)
db(str)
Returns:
连接状态:1成功,0失败
'''
try:
global connection
connection=pymysql.connect(host=host, port=port, user=user, password=password,
db=db, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
global cursor
cursor=connection.cursor()
print("数据库连接成功============================================", time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime()), "=========================", '
')
return 1
except:
print("数据库连接异常============================================", time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime()), "=========================", '
')
return 0
def scp(local_filename, remote_path):
'''创建`scp`连接,
Args:
local_filename(str): 本地要存放的文件位置
remote_path(int) 远程的文件位置
Returns:
void
'''
# 创建ssh访问
ssh=paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(
paramiko.AutoAddPolicy()) # 允许连接不在know_hosts文件中的主机
ssh.connect(ssh_hostname, port=22, username=ssh_username,
password=ssh_password) # 远程访问的服务器信息
# 创建scp,下载文件
sftp=paramiko.SFTPClient.from_transport(ssh.get_transport())
sftp=ssh.open_sftp()
sftp.get(remote_path, local_filename)
def execute():
sql='''SELECT a.number,a.path,a.date FROM tablename a
WHERE (number LIKE "%{}" OR numbers LIKE "%{}" )
AND year(a.date)="{}"
AND month(a.date)="{}" '''
print("查询sql:",sql)
year=input("请输入年份:")
month=input("请输入月份:")
number=input("请输入电话号码:")
print("
")
sql=sql.format(number,
number,
year,
month)
print("数据查询中请稍等.....")
resout=cursor.execute(sql)
if(resout==0):
print("没有需要的数据!!!", '
')
time.sleep(5)
else:
date=cursor.fetchall()
for i in date:
pathName=i["path"]
print("获取到的二手文件位置:", pathName, '
') #/bakrecord/record/2020/05/25/800142/800142_202918189.mp3
# 获取文件名称
fileName=str(pathName).split("/")[7]
print("文件名称:", fileName, '
')
# 当前工作环境目录
currentPath=os.getcwd()
loadPathName=currentPath+"\"+fileName
try:
scp(loadPathName, pathName)
print("下载成功============================================", time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime()), "=========================", '
')
print("下载后的文件路径:", loadPathName, '
')
except:
print("下载异常!!!!", '
')
time.sleep(5)
if __name__=='__main__':
print('数据库连接','
')
if (initDB()==1):
while True:
boo=input("是否下载录音文件:?y/n
")
if boo=='y':
execute()
else:
break
else:
print("数据库链接异常")
可以通过命令行打包,也可以通过写一个打包文件的方式打包
from PyInstaller.__main__ import run
#### 打包文件直接执行
if __name__=='__main__':
opts=['apps.py', # 主程序文件
'-F', # 打包单文件
'--ico=favicon.ico', # 可执行程序图标
]
run(opts)
打包运行
联系客服