Python运维脚本实战
启动一个Python下载服务器
Python内置了一个下载服务器就能够显著提升效率了。例如,你的同事要让你传的文件位于某一个目录下,那么,你可以进入这个目录,然后执行下面的命令启动一个下载服务器:
python -m http.server
执行上面的命令就会在当前目录下启动一个文件下载服务器,默认打开8000端口。完成以后,只需要将IP和端口告诉同事,让同事自己去操作即可,非常方便高效。
上面使用的Python语句,从工作原理来说,仅仅是启动了一个Python内置的Web服务器。如果当前目录下存在一个名为index.html的文件,则默认显示该文件的内容。如果当前目录下不存在这样一个文件,则默认显示当前目录下的文件列表,也就是大家看到的下载服务器。
字符串转换为JSON
在工作过程中,我们的系统会调用底层服务的API。底层服务的API一般都是以JSON的格式返回,为了便于问题追踪,我们会将API返回的JSON转换为字符串记录到日志文件中。当需要分析问题时,就需要将日志文件中的JSON字符串拿出来进行分析。这个时候,需要将一个JSON字符串转换为JSON对象,以提高日志的可读性。
除了打开浏览器,使用在线JSON格式化工具以外,我们也可以使用命令行终端的Python解释器来解析JSON串,如下所示:
$ echo '{"job": "developer", "name": "lmx", "sex": "male"}' | python -m json.tool
{
"job": "developer",
"name": "lmx",
"sex": "male"
}
检查第三方库是否正确安装
安装完Python的第三方库以后,如何确认这个库已经正确安装了呢?答案很简单,只需要尝试进行import导入即可。如果导入没有任何错误,则认为安装成功;如果导入失败,则认为安装失败。
$ python
Python 2.7.13 (default, Feb 10 2017, 20:22:22)
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import paramiko
>>>
验证Python的第三方库是否安装成功,本身也是一件很简单的事情,但是,如果我们使用脚本对大批量的服务器进行自动部署,又应该如何验证第三方库安装成功了呢?肯定不能登录每一台服务器进行验证。这个时候,我们可以使用Python解释器的-c参数快速地执行import语句,如下所示:
python -c "import paramiko"
打造命令行工具
编写Linux下的命令行工具,很多时候都需要解析命令行的参数。
获取命令行参数
在Python中,sys库下有一个名为argv的列表,该列表保存了所有的命令行参数。argv列表中的第一个元素是命令行程序的名称,其余的命令行参数以字符串的形式保存在该列表中。 现在有一个名为test_argv.py的Python文件,该文件仅仅是导入sys库,然后使用print函数打印argv列表中的内容。
from __future__ import print_function
import sys
print(sys.argv)
可以看到,如果不传递任何参数给test_argv.py,则sys.argv有且仅有一个元素,即Python程序的名称。当我们传递了其他命令行参数时,所有参数都以字符串的形式保存到sys.argv中。例如,我们将命令行参数localhost和3306传递给test_argv.py文件后,sys.argv中就有三个元素,分别是Python程序的名称和我们传递的命令行参数。并且,我们传递的3306也以字符串的形式被保存下来。
$ python test_argv.py
['test_argv.py']
$ python test_argv.py localhost 3306
['test_argv.py', 'localhost', '3306']
sys.argv是一个保存命令行参数的普通列表。因为它是一个普通的列表,所以,我们可以直接修改sys.argv的内容。下面是一个修改sys.argv列表的应用场景:
from __future__ import print_function
import os
import sys
def main():
sys.argv.append("")
filename = sys.argv[1]
if not os.path.isfile(filename):
raise SystemExit(filename + ' does not exists')
elif not os.access(filename, os.R_OK):
raise SystemExit(filename + ' is not accessible')
else:
print(filename + ' is accessible')
if __name__ == '__main__':
main()
在这个例子中,我们从命令行参数获取文件的名称,然后判断文件是否存在。如果文件不存在,则提示用户该文件不存在;如果文件存在,则使用os.access函数判断我们是否具有对文件的读权限。在这个程序中,我们通过sys.argv[1]
获取文件的名称。但是,这里有种异常情况需要考虑,如果用户直接运行我们的程序,没有传递任何命令行参数,那么,访问sys.argv[1]将会出现索引越界的错误。为了避免这个错误,我们可以在访问sys.argv之前先向sys.argv中添加一个空的字符串。添加空字符串以后,无论用户是否提供命令行参数,访问sys.argv[1]都不会出错。如果用户传递了命令行参数,那么,通过sys.argv[1]访问,得到的是用户提供的命令行参数。
使用Python打造一个MySQL压测工具
我们的压测程序只需要很少的参数,相对于MySQL客户端连接MySQL数据库,仅仅多了thread_size和row_size这两个参数。前者表示启用多少个线程并发地向数据库插入数据,后者表示每一个线程插入多少条记录后结束。由于我们使用标准库的argparse模块来解析命令行参数,argparse模块能够帮我们自动生成帮助信息。因此,我们可以通过“–help”选项得到程序的帮助信息。
为了提高程序的易用性,我们的压测工具会自动在数据库中创建test_insert_data_db数据库,并在该数据库下自动创建test_insert_data_table表。然后启用多个线程并发地向表中插入记录。如下所示:
$ python insert_data.py --host=127.0.0.1 --user=laimingxing --password=laimingxing --port=3306 --thread_size=5 --row_size=5
drop database if exists test_insert_data_db
create database test_insert_data_db
use test_insert_data_db
create table test_insert_data_table (id int(10) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL, datetime double NOT NULL,
PRIMARY KEY ('id'))
了解了压测工具的使用方法和实现原理以后,再来看它的具体实现。如下所示:
#!/usr/bin/python2.7
from __future__ import print_function
import string
import argparse
import random
import threading
import time
from contextlib import contextmanager
import pymysql
DB_NAME = 'test_insert_data_db'
TABLE_NAME = 'test_insert_data_table'
CREATE_TABLE_STATEMENT = """create table {0} (id int(10) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL, datetime double NOT NULL,
PRIMARY KEY ('id'))""".format(TABLE_NAME)
def _argparse():
parser = argparse.ArgumentParser(description='benchmark tool for MySQL database')
parser.add_argument('--host', action='store', dest='host',
required=True, help='connect to host')
parser.add_argument('--user', action='store', dest='user',
required=True, help='user for login')
parser.add_argument('--password', action='store', dest='password',
required=True, help='password to use when connecting to server')
parser.add_argument('--port', action='store', dest='port', default=3306,
type=int, help='port number to use for connection or 3306 for default')
parser.add_argument('--thread_size', action='store', dest='thread_size',
default=5, type=int, help='how much connection for database
usage')
parser.add_argument('--row_size', action='store', dest='row_size',
default=5000, type=int, help='how much rows')
parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1')
return parser.parse_args()
@contextmanager
def get_conn(**kwargs):
conn = pymysql.connect(**kwargs)
try:
yield conn
finally:
conn.close()
def create_db_and_table(conn):
with conn as cur:
for sql in ["drop database if exists {0}".format(DB_NAME),
"create database {0}".format(DB_NAME),
"use {0}".format(DB_NAME),
CREATE_TABLE_STATEMENT]:
print(sql)
cur.execute(sql)
def random_string(length=10):
s = string.letters + string.digits
return "".join(random.sample(s, length))
def add_row(cursor):
SQL_FORMAT = "INSERT INTO '{0}'(name, datetime) values('{1}', {2})"
sql = SQL_FORMAT.format(TABLE_NAME, random_string(), time.time())
cursor.execute(sql)
def insert_data(conn_args, row_size):
with get_conn(**conn_args) as conn:
with conn as c:
c.execute('use {0}'.format(DB_NAME))
with conn as c:
for i in range(row_size):
add_row(c)
conn.commit()
def main():
parser = _argparse()
conn_args = dict(host=parser.host, user=parser.user,
password=parser.password, port=parser.port)
with get_conn(**conn_args) as conn:
create_db_and_table(conn)
threads = []
for i in range(parser.thread_size):
t = threading.Thread(target=insert_data, args=(conn_args, parser.row_size))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == '__main__':
main()
在这段程序中,我们首先导入了多个标准库,以及连接数据库的pymysql第三方库。随后,我们定义了三个全局变量,即测试的数据库名、表名和表结构。在main函数中,我们首先调用_argparse函数来解析命令行参数。_argparse的唯一作用就是使用标准库的argparse模块解析命令行参数并生成帮助信息。解析完命令行参数以后,我们就得到了建立数据库连接的参数。我们将建立数据库连接的参数传递给get_conn函数,get_conn函数使用pymysql模块的connect函数建立数据库连接。为了同时保证get_conn函数的易用性和程序结束以后能够及时关闭数据库连接,我们使用contextmanager装饰器编写了一个上下文管理器。有了上下文管理器以后,我们就可以使用with语句管理数据库连接。在main函数中,我们得到数据库连接以后,调用create_db_and_table函数创建相关的数据库和表结构。随后,我们根据用户输入的thread_size参数创建多个线程,每个线程都会去调用insert_data函数。在insert_data函数中,我们根据数据库连接参数创建数据库连接,并根据用户指定的row_size参数向数据库插入随机数据。