Python运维脚本实战

2015/04/21 Python

Python运维脚本实战

启动一个Python下载服务器

Python内置了一个下载服务器就能够显著提升效率了。例如,你的同事要让你传的文件位于某一个目录下,那么,你可以进入这个目录,然后执行下面的命令启动一个下载服务器:

python -m http.server

执行上面的命令就会在当前目录下启动一个文件下载服务器,默认打开8000端口。完成以后,只需要将IP和端口告诉同事,让同事自己去操作即可,非常方便高效。

上面使用的Python语句,从工作原理来说,仅仅是启动了一个Python内置的Web服务器。如果当前目录下存在一个名为index.html的文件,则默认显示该文件的内容。如果当前目录下不存在这样一个文件,则默认显示当前目录下的文件列表,也就是大家看到的下载服务器。

字符串转换为JSON

在工作过程中,我们的系统会调用底层服务的API。底层服务的API一般都是以JSON的格式返回,为了便于问题追踪,我们会将API返回的JSON转换为字符串记录到日志文件中。当需要分析问题时,就需要将日志文件中的JSON字符串拿出来进行分析。这个时候,需要将一个JSON字符串转换为JSON对象,以提高日志的可读性。

除了打开浏览器,使用在线JSON格式化工具以外,我们也可以使用命令行终端的Python解释器来解析JSON串,如下所示:

$ echo '{"job": "developer", "name": "lmx", "sex": "male"}' | python -m json.tool
{
    "job": "developer",
    "name": "lmx",
    "sex": "male"
}

检查第三方库是否正确安装

安装完Python的第三方库以后,如何确认这个库已经正确安装了呢?答案很简单,只需要尝试进行import导入即可。如果导入没有任何错误,则认为安装成功;如果导入失败,则认为安装失败。

$ python
Python 2.7.13 (default, Feb 10 2017, 20:22:22)
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import paramiko
>>>

验证Python的第三方库是否安装成功,本身也是一件很简单的事情,但是,如果我们使用脚本对大批量的服务器进行自动部署,又应该如何验证第三方库安装成功了呢?肯定不能登录每一台服务器进行验证。这个时候,我们可以使用Python解释器的-c参数快速地执行import语句,如下所示:

python -c "import paramiko"

打造命令行工具

编写Linux下的命令行工具,很多时候都需要解析命令行的参数。

获取命令行参数

在Python中,sys库下有一个名为argv的列表,该列表保存了所有的命令行参数。argv列表中的第一个元素是命令行程序的名称,其余的命令行参数以字符串的形式保存在该列表中。 现在有一个名为test_argv.py的Python文件,该文件仅仅是导入sys库,然后使用print函数打印argv列表中的内容。

from __future__ import print_function
import sys

print(sys.argv)

可以看到,如果不传递任何参数给test_argv.py,则sys.argv有且仅有一个元素,即Python程序的名称。当我们传递了其他命令行参数时,所有参数都以字符串的形式保存到sys.argv中。例如,我们将命令行参数localhost和3306传递给test_argv.py文件后,sys.argv中就有三个元素,分别是Python程序的名称和我们传递的命令行参数。并且,我们传递的3306也以字符串的形式被保存下来。

$ python test_argv.py
['test_argv.py']

$ python test_argv.py localhost 3306
['test_argv.py', 'localhost', '3306']

sys.argv是一个保存命令行参数的普通列表。因为它是一个普通的列表,所以,我们可以直接修改sys.argv的内容。下面是一个修改sys.argv列表的应用场景:

from __future__ import print_function
import os
import sys

def main():
    sys.argv.append("")
    filename = sys.argv[1]
    if not os.path.isfile(filename):
        raise SystemExit(filename + ' does not exists')
    elif not os.access(filename, os.R_OK):
        raise SystemExit(filename + ' is not accessible')
    else:
        print(filename + ' is accessible')


if __name__ == '__main__':
    main()

在这个例子中,我们从命令行参数获取文件的名称,然后判断文件是否存在。如果文件不存在,则提示用户该文件不存在;如果文件存在,则使用os.access函数判断我们是否具有对文件的读权限。在这个程序中,我们通过sys.argv[1]获取文件的名称。但是,这里有种异常情况需要考虑,如果用户直接运行我们的程序,没有传递任何命令行参数,那么,访问sys.argv[1]将会出现索引越界的错误。为了避免这个错误,我们可以在访问sys.argv之前先向sys.argv中添加一个空的字符串。添加空字符串以后,无论用户是否提供命令行参数,访问sys.argv[1]都不会出错。如果用户传递了命令行参数,那么,通过sys.argv[1]访问,得到的是用户提供的命令行参数。

使用Python打造一个MySQL压测工具

我们的压测程序只需要很少的参数,相对于MySQL客户端连接MySQL数据库,仅仅多了thread_size和row_size这两个参数。前者表示启用多少个线程并发地向数据库插入数据,后者表示每一个线程插入多少条记录后结束。由于我们使用标准库的argparse模块来解析命令行参数,argparse模块能够帮我们自动生成帮助信息。因此,我们可以通过“–help”选项得到程序的帮助信息。

为了提高程序的易用性,我们的压测工具会自动在数据库中创建test_insert_data_db数据库,并在该数据库下自动创建test_insert_data_table表。然后启用多个线程并发地向表中插入记录。如下所示:

$ python insert_data.py --host=127.0.0.1 --user=laimingxing --password=laimingxing --port=3306 --thread_size=5 --row_size=5
drop database if exists test_insert_data_db
create database test_insert_data_db
use test_insert_data_db
create table test_insert_data_table (id int(10) NOT NULL AUTO_INCREMENT,
                            name varchar(255) NOT NULL, datetime double NOT NULL,
                            PRIMARY KEY ('id'))

了解了压测工具的使用方法和实现原理以后,再来看它的具体实现。如下所示:

#!/usr/bin/python2.7
from __future__ import print_function
import string
import argparse
import random
import threading
import time
from contextlib import contextmanager

import pymysql

DB_NAME = 'test_insert_data_db'
TABLE_NAME = 'test_insert_data_table'
CREATE_TABLE_STATEMENT = """create table {0} (id int(10) NOT NULL AUTO_INCREMENT,
                            name varchar(255) NOT NULL, datetime double NOT NULL,
                            PRIMARY KEY ('id'))""".format(TABLE_NAME)

def _argparse():
    parser = argparse.ArgumentParser(description='benchmark tool for MySQL database')
    parser.add_argument('--host', action='store', dest='host',
                        required=True, help='connect to host')
    parser.add_argument('--user', action='store', dest='user',
                        required=True, help='user for login')
    parser.add_argument('--password', action='store', dest='password',
                        required=True, help='password to use when connecting to server')
    parser.add_argument('--port', action='store', dest='port', default=3306,
                        type=int, help='port number to use for connection or 3306 for default')
    parser.add_argument('--thread_size', action='store', dest='thread_size',
                        default=5, type=int, help='how much connection for database
usage')
    parser.add_argument('--row_size', action='store', dest='row_size',
                        default=5000, type=int, help='how much rows')
    parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1')
    return parser.parse_args()

@contextmanager
def get_conn(**kwargs):
    conn = pymysql.connect(**kwargs)
    try:
        yield conn
    finally:
        conn.close()

def create_db_and_table(conn):
    with conn as cur:
        for sql in ["drop database if exists {0}".format(DB_NAME),
                    "create database {0}".format(DB_NAME),
                    "use {0}".format(DB_NAME),
                    CREATE_TABLE_STATEMENT]:
            print(sql)
            cur.execute(sql)

def random_string(length=10):
    s = string.letters + string.digits
    return "".join(random.sample(s, length))

def add_row(cursor):
    SQL_FORMAT = "INSERT INTO '{0}'(name, datetime) values('{1}', {2})"
    sql = SQL_FORMAT.format(TABLE_NAME, random_string(), time.time())
    cursor.execute(sql)

def insert_data(conn_args, row_size):
    with get_conn(**conn_args) as conn:
        with conn as c:
            c.execute('use {0}'.format(DB_NAME))
        with conn as c:
            for i in range(row_size):
                add_row(c)
                conn.commit()

def main():
    parser = _argparse()

    conn_args = dict(host=parser.host, user=parser.user,
                     password=parser.password, port=parser.port)
    with get_conn(**conn_args) as conn:
        create_db_and_table(conn)

    threads = []
    for i in range(parser.thread_size):
        t = threading.Thread(target=insert_data, args=(conn_args, parser.row_size))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

if __name__ == '__main__':
    main()

在这段程序中,我们首先导入了多个标准库,以及连接数据库的pymysql第三方库。随后,我们定义了三个全局变量,即测试的数据库名、表名和表结构。在main函数中,我们首先调用_argparse函数来解析命令行参数。_argparse的唯一作用就是使用标准库的argparse模块解析命令行参数并生成帮助信息。解析完命令行参数以后,我们就得到了建立数据库连接的参数。我们将建立数据库连接的参数传递给get_conn函数,get_conn函数使用pymysql模块的connect函数建立数据库连接。为了同时保证get_conn函数的易用性和程序结束以后能够及时关闭数据库连接,我们使用contextmanager装饰器编写了一个上下文管理器。有了上下文管理器以后,我们就可以使用with语句管理数据库连接。在main函数中,我们得到数据库连接以后,调用create_db_and_table函数创建相关的数据库和表结构。随后,我们根据用户输入的thread_size参数创建多个线程,每个线程都会去调用insert_data函数。在insert_data函数中,我们根据数据库连接参数创建数据库连接,并根据用户指定的row_size参数向数据库插入随机数据。

Search

    微信好友

    博士的沙漏

    Table of Contents