Python实用脚本

2015/04/23 Python

Python实用脚本

判断是否是一个目录

import os

dir = "/var/www/html/xxxx/"
if os.path.isdir(dir):
    print('%s is a dir' % dir)
else:
    print('%s is not a dir' % dir)

系统内存与磁盘检测

import psutil

def memissue():
    print('内存信息:')
    mem = psutil.virtual_memory()
    # 单位换算为MB
    memtotal = mem.total/1024/1024
    memused = mem.used/1024/1024
    membaifen = str(mem.used/mem.total*100) + '%'

    print('%.2fMB' % memused)
    print('%.2fMB' % memtotal)
    print(membaifen)

def cuplist():
    print('磁盘信息:')
    disk = psutil.disk_partitions()
    diskuse = psutil.disk_usage('/')
    #单位换算为GB
    diskused = diskuse.used / 1024 / 1024 / 1024
    disktotal = diskuse.total / 1024 / 1024 / 1024
    diskbaifen = diskused / disktotal * 100
    print('%.2fGB' % diskused)
    print('%.2fGB' % disktotal)
    print('%.2f' % diskbaifen)


memissue()
print('*******************')
cuplist()

统计nginx日志前十ip访问量并以柱状图显示

import matplotlib.pyplot as plt
#
nginx_file = 'nginx2018-12-18_07:45:26'

ip = {}
# 筛选nginx日志文件中的ip
with open(nginx_file) as f:
    for i in f.readlines():
        s = i.strip().split()[0]
        lengh = len(ip.keys())

        # 统计每个ip的访问量以字典存储
        if s in ip.keys():
            ip[s] = ip[s] + 1
        else:
            ip[s] = 1


#以ip出现的次数排序返回对象为list
ip = sorted(ip.items(), key=lambda e:e[1], reverse=True)

#取列表前十
newip = ip[0:10:1]
tu = dict(newip)

x = []
y = []
for k in tu:
    x.append(k)
    y.append(tu[k])
plt.title('ip access')
plt.xlabel('ip address')
plt.ylabel('PV')

#x轴项的翻转角度
plt.xticks(rotation=70)

#显示每个柱状图的值
for a,b in zip(x,y):
    plt.text(a, b, '%.0f' % b, ha='center', va= 'bottom',fontsize=7)


plt.bar(x,y)
plt.legend()
plt.show()

gitlab钩子脚本,实现简单自动化操作

from flask import Flask,request,render_template,make_response,Response
import json,os,re,requests
import subprocess

app = Flask(__name__)
null = ""
cmd = "/var/www/html/ladmin-devel/"
@app.route('/test',methods=['POST'])
def hello():
    json_dict = json.loads(request.data)

    name = json_dict['event_name']
    ref = json_dict['ref'][11:]
    project = json_dict['project']['name']

    if name == 'push' and ref == 'master':
        os.chdir(cmd)
        s = subprocess.getoutput('sudo -u nginx composer install')
        return Response(s)
    else:
        return Response('none')


if __name__ == '__main__':
    app.run(host='0.0.0.0',port=8080)

解析一组域名的ip地址

import dns.resolver
from collections import defaultdict
hosts = ['baidu.com','weibo.com']
s = defaultdict(list)
def query(hosts):
    for host in hosts:
        ip = dns.resolver.query(host,"A")
        for i in ip:
            s[host].append(i)

    return s

for i in query(hosts):

    print(i,s[i])

清除指定redis缓存

import redis


#选择连接的数据库
db = input('输入数据库:')
r = redis.Redis(host='127.0.0.1',port=6379,db=0)

#输入要匹配的键名
id = input('请输入要执匹配的字段:')
arg = '*' + id + '*'

n = r.keys(arg)
#查看匹配到键值
for i in n:
    print(i.decode('utf-8'))

#确定清除的键名
delid = input('输入要删除的键:')

print('清除缓存 %s 成功' % delid)

下载阿里云RDS二进制日志

'''
查询阿里云rds binlog日志
'''

import base64,urllib.request
import hashlib
import hmac
import uuid,time,json,wget



class RDS_BINLOG_RELATE(object):

    def __init__(self):
        #阿里云的id和key
        self.access_id = '**********************'
        self.access_key = '**********************'

    #通过id和key来进行签名
    def signed(self):
        timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        header = {
            'Action': 'DescribeBinlogFiles',
            'DBInstanceId': 'rm-sfasffsdaf',
            'StartTime': '2018-07-11T15:00:00Z',
            'EndTime': timestamp,
            'Format': 'JSON',
            'Version': '2014-08-15',
            'AccessKeyId': self.access_id,
            'SignatureVersion': '1.0',
            'SignatureMethod': 'HMAC-SHA1',
            'SignatureNonce': str(uuid.uuid1()),
            'TimeStamp': timestamp,

        }

        #对请求头进行排序
        sortedD = sorted(header.items(), key=lambda x: x[0])
        url = 'https://rds.aliyuncs.com'
        canstring = ''

        #将请求参数以#连接
        for k, v in sortedD:
            canstring += '&' + self.percentEncode(k) + '=' + self.percentEncode(v)

        #对请求连接进行阿里云要的编码规则进行编码
        stiingToSign = 'GET&%2F&' + self.percentEncode(canstring[1:])

        bs = self.access_key + '&'
        bs = bytes(bs, encoding='utf8')
        stiingToSign = bytes(stiingToSign, encoding='utf8')
        h = hmac.new(bs, stiingToSign, hashlib.sha1)
        stiingToSign = base64.b64encode(h.digest()).strip()

        #将签名加入到请求头
        header['Signature'] = stiingToSign

        #返回url
        url = url + "/?" + urllib.parse.urlencode(header)
        return url

    #按照规则替换
    def percentEncode(self,store):
        encodeStr = store
        res = urllib.request.quote(encodeStr)
        res = res.replace('+', '%20')
        res = res.replace('*', '%2A')
        res = res.replace('%7E', '~')
        return str(res)

    #筛选出链接下载二进制日志文件
    def getBinLog(self):
        binlog_url = self.signed()
        req = urllib.request.urlopen(binlog_url)
        req = req.read().decode('utf8')
        res = json.loads(req)

        for i in res['Items']['BinLogFile']:
            wget.download(i['DownloadLink'])




s = RDS_BINLOG_RELATE()
s.getBinLog()

遍历目录和文件


import os
 
 
def list_all_files(rootdir):
    import os
    _files = []
    list = os.listdir(rootdir) #列出文件夹下所有的目录与文件
    for i in range(0,len(list)):
           path = os.path.join(rootdir,list[i])
           if os.path.isdir(path):
              _files.extend(list_all_files(path))
           if os.path.isfile(path):
              _files.append(path)
    return _files
 
a=list_all_files("D:\python")

print(a)

python检测指定端口状态


import socket
 
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
 
for ip in range(0,254):
    try:
        sk.connect(("192.168.1."+str(ip),443))
        print("192.168.1.%d server open \n"%ip)
    except Exception:
        print("192.168.1.%d server not open"%ip)
 
sk.close()

实现钉钉报警

import requests
import sys
import json
 
dingding_url = 'https://oapi.dingtalk.com/robot/send?access_token=6d11af3252812ea50410c2ccb861814a6ed11b2306606934a5d4ca9f2ec8c09'
 
data = {"msgtype": "markdown","markdown": {"title": "监控","text": "apche异常"}}
 
headers = {'Content-Type':'application/json;charset=UTF-8'}
 
send_data = json.dumps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)

每日生成一个文件并且把磁盘的使用情况写到到这个文件中。

import os
import sys
import time
 
new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()
 
str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)
 
f.flush()
f.close()

利用psutil模块获取系统的各种统计信息


import sys
import psutil
import time
import os 
 
#获取当前时间
time_str =  time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"
 
if os.path.exists ( file_name ) == False :
   os.mknod( file_name )
   handle = open ( file_name , "w" )
else :
   handle = open ( file_name , "a" )
 
 
#获取命令行参数的个数
if len( sys.argv ) == 1 :
   print_type = 1
else :
   print_type = 2
 
def isset ( list_arr , name ) :
    if name in list_arr :
       return True
    else :
       return False
 
print_str = "";
 
#获取系统内存使用情况
if ( print_type == 1 ) or isset( sys.argv,"mem" )  :
 memory_convent = 1024 * 1024
 mem = psutil.virtual_memory()
 print_str +=  " 内存状态如下:\n" 
 print_str = print_str + "   系统的内存容量为: "+str( mem.total/( memory_convent ) ) + " MB\n" 
 print_str = print_str + "   系统的内存以使用容量为: "+str( mem.used/( memory_convent ) ) + " MB\n" 
 print_str = print_str + "   系统可用的内存容量为: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB\n"
 print_str = print_str + "   内存的buffer容量为: "+str( mem.buffers/( memory_convent ) ) + " MB\n" 
 print_str = print_str + "   内存的cache容量为:" +str( mem.cached/( memory_convent ) ) + " MB\n"
 
 
#获取cpu的相关信息
if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
 print_str += " CPU状态如下:\n"
 cpu_status = psutil.cpu_times()
 print_str = print_str + "   user = " + str( cpu_status.user ) + "\n" 
 print_str = print_str + "   nice = " + str( cpu_status.nice ) + "\n"
 print_str = print_str + "   system = " + str( cpu_status.system ) + "\n"
 print_str = print_str + "   idle = " + str ( cpu_status.idle ) + "\n"
 print_str = print_str + "   iowait = " + str ( cpu_status.iowait ) + "\n"
 print_str = print_str + "   irq = " + str( cpu_status.irq ) + "\n"
 print_str = print_str + "   softirq = " + str ( cpu_status.softirq ) + "\n" 
 print_str = print_str + "   steal = " + str ( cpu_status.steal ) + "\n"
 print_str = print_str + "   guest = " + str ( cpu_status.guest ) + "\n"
 
 
#查看硬盘基本信息
if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
 print_str +=  " 硬盘信息如下:\n" 
 disk_status = psutil.disk_partitions()
 for item in disk_status :
     print_str = print_str + "   "+ str( item ) + "\n"
 
#查看当前登录的用户信息
if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
 print_str +=  " 登录用户信息如下:\n " 
 user_status = psutil.users()
 for item in  user_status :
     print_str = print_str + "   "+ str( item ) + "\n"
 
print_str += "---------------------------------------------------------------\n"
print ( print_str )
handle.write( print_str )
handle.close()

Search

    微信好友

    博士的沙漏

    Table of Contents