導航:首頁 > 編程語言 > python壓測腳本

python壓測腳本

發布時間:2022-12-09 08:13:59

⑴ 如何使用python編寫測試腳本

1)doctest
使用doctest是一種類似於命令行嘗試的方式,用法很簡單,如下

復制代碼代碼如下:

def f(n):
"""
>>> f(1)
1
>>> f(2)
2
"""
print(n)

if __name__ == '__main__':
import doctest
doctest.testmod()

應該來說是足夠簡單了,另外還有一種方式doctest.testfile(filename),就是把命令行的方式放在文件里進行測試。

2)unittest
unittest歷史悠久,最早可以追溯到上世紀七八十年代了,C++,Java里也都有類似的實現,Python里的實現很簡單。
unittest在python里主要的實現方式是TestCase,TestSuite。用法還是例子起步。

復制代碼代碼如下:

from widget import Widget
import unittest
# 執行測試的類
class WidgetTestCase(unittest.TestCase):
def setUp(self):
self.widget = Widget()
def tearDown(self):
self.widget.dispose()
self.widget = None
def testSize(self):
self.assertEqual(self.widget.getSize(), (40, 40))
def testResize(self):
self.widget.resize(100, 100)
self.assertEqual(self.widget.getSize(), (100, 100))
# 測試
if __name__ == "__main__":
# 構造測試集
suite = unittest.TestSuite()
suite.addTest(WidgetTestCase("testSize"))
suite.addTest(WidgetTestCase("testResize"))

# 執行測試
runner = unittest.TextTestRunner()
runner.run(suite)

簡單的說,1>構造TestCase(測試用例),其中的setup和teardown負責預處理和善後工作。2>構造測試集,添加用例3>執行測試需要說明的是測試方法,在Python中有N多測試函數,主要的有:
TestCase.assert_(expr[, msg])
TestCase.failUnless(expr[, msg])
TestCase.assertTrue(expr[, msg])
TestCase.assertEqual(first, second[, msg])
TestCase.failUnlessEqual(first, second[, msg])
TestCase.assertNotEqual(first, second[, msg])
TestCase.failIfEqual(first, second[, msg])
TestCase.assertAlmostEqual(first, second[, places[, msg]])
TestCase.failUnlessAlmostEqual(first, second[, places[, msg]])
TestCase.assertNotAlmostEqual(first, second[, places[, msg]])
TestCase.failIfAlmostEqual(first, second[, places[, msg]])
TestCase.assertRaises(exception, callable, ...)
TestCase.failUnlessRaises(exception, callable, ...)
TestCase.failIf(expr[, msg])
TestCase.assertFalse(expr[, msg])
TestCase.fail([msg])

⑵ python 運維常用腳本

Python 批量遍歷目錄文件,並修改訪問時間

import os

path = "D:/UASM64/include/"
dirs = os.listdir(path)
temp=[];

for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 實現的自動化伺服器管理

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):

def ssh_put(user,passwd,source,target):

while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("輸入一個計劃任務: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("輸入要刪除的計劃任務: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)

遍歷目錄和文件

import os

def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夾下所有的目錄與文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files

a=list_all_files("C:/Users/LyShark/Desktop/a")
print(a)
python檢測指定埠狀態

import socket

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)

for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open "%ip)
except Exception:
print("192.168.1.%d server not open"%ip)

sk.close()

python實現批量執行CMD命令

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

print("------------------------------> ")
print("使用說明,在當前目錄創建ip.txt寫入ip地址")
print("------------------------------> ")

user=input("輸入用戶名:")
passwd=input("輸入密碼:")
port=input("輸入埠:")
cmd=input("輸入執行的命令:")

file = open("./ip.txt", "r")
line = file.readlines()

for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip(' '))

python3-實現釘釘報警

import requests
import sys
import json

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='

data = {"msgtype": "markdown","markdown": {"title": "監控","text": "apche異常"}}

headers = {'Content-Type':'application/json;charset=UTF-8'}

send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)

import psutil
import requests
import time
import os
import json

monitor_name = set(['httpd','cobblerd']) # 用戶指定監控的服務進程名稱

proc_dict = {}
proc_name = set() # 系統檢測的進程名稱
monitor_map = {
'httpd': 'systemctl restart httpd',
'cobblerd': 'systemctl restart cobblerd' # 系統在進程down掉後,自動重啟
}

dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='

while True:
for proc in psutil.process_iter(attrs=['pid','name']):
proc_dict[proc.info['pid']] = proc.info['name']
proc_name.add(proc.info['name'])

判斷指定埠是否開放

import socket

port_number = [135,443,80]

for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()

判斷指定埠並且實現釘釘輪詢報警

import requests
import sys
import json
import socket
import time

def dingding(title,text):
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)

def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
return index
sock.close()

while True:
dingding("Warning",net_scan())
time.sleep(60)

python-實現SSH批量CMD執行命令

import sys
import os
import paramiko

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, "r")
line = file.readlines()
for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip(' '))
ssh.connect(hostname=line[i].strip(' '),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()

ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")

用python寫一個列舉當前目錄以及所有子目錄下的文件,並列印出絕對路徑

import sys
import os

for root,dirs,files in os.walk("C://"):
for name in files:
print(os.path.join(root,name))
os.walk()

按照這樣的日期格式(xxxx-xx-xx)每日生成一個文件,例如今天生成的文件為2013-09-23.log, 並且把磁碟的使用情況寫到到這個文件中。

import os
import sys
import time

new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()

str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)

f.flush()
f.close()

統計出每個IP的訪問量有多少?(從日誌文件中查找)

import sys

list = []

f = open("/var/log/httpd/access_log","r")
str1 = f.readlines()
f.close()

for i in str1:
ip=i.split()[0]
list.append(ip)

list_num=set(list)

for j in list_num:
num=list.count(j)
print("%s -----> %s" %(num,j))

寫個程序,接受用戶輸入數字,並進行校驗,非數字給出錯誤提示,然後重新等待用戶輸入。

import tab
import sys

while True:
try:
num=int(input("輸入數字:").strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print("您輸入的不是數字")
except KeyboardInterrupt:
sys.exit(" ")

ps 可以查看進程的內存佔用大小,寫一個腳本計算一下所有進程所佔用內存大小的和。

import sys
import os

list=[]
sum=0

str1=os.popen("ps aux","r").readlines()

for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num

print("%s ---> %s"%(list[0],sum))

關於Python 命令行參數argv

import sys

if len(sys.argv) < 2:
print ("沒有輸入任何參數")
sys.exit()

if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]

利用random生成6位數字加字母隨機驗證碼

import sys
import random

rand=[]

for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)

自動化-使用pexpect非交互登陸系統

import pexpect
import sys

ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.txt', 'w')
ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密碼")
ssh.expect('#')

ssh.sendline('ls /home')
ssh.expect('#')

Python-取系統時間

import sys
import time

time_str = time.strftime("日期:%Y-%m-%d",time.localtime())
print(time_str)

time_str= time.strftime("時間:%H:%M",time.localtime())
print(time_str)

psutil-獲取內存使用情況

import sys
import os
import psutil

memory_convent = 1024 * 1024
mem =psutil.virtual_memory()

print("內存容量為:"+str(mem.total/(memory_convent))+"MB ")
print("已使用內存:"+str(mem.used/(memory_convent))+"MB ")
print("可用內存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB ")
print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB ")
print("cache容量:"+str(mem.cached/(memory_convent))+"MB ")

Python-通過SNMP協議監控CPU
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os

def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split(' ')[:-1]
return sn1

def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')

if name == ' main ':

Python-通過SNMP協議監控系統負載
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os
import sys

def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split(' ')
return sn1

def getload(host,loid):
load_oids = Ƈ.3.6.1.4.1.2021.10.1.3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]

if name == ' main ':

Python-通過SNMP協議監控內存
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import os

def getAllitems(host, oid):

def getSwapTotal(host):

def getSwapUsed(host):

def getMemTotal(host):

def getMemUsed(host):

if name == ' main ':

Python-通過SNMP協議監控磁碟
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import re
import os

def getAllitems(host,oid):

def getDate(source,newitem):

def getRealDate(item1,item2,listname):

def caculateDiskUsedRate(host):

if name == ' main ':

Python-通過SNMP協議監控網卡流量
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*

import re
import os

def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split(' ')[:-1]
return sn1

def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []

def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []

if name == ' main ':

Python-實現多級菜單

import os
import sys

ps="[None]->"
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
flage=1

while True:
ps="[None]->"
temp=input(ps)
if (temp=="test"):
print("test page !!!!")
elif(temp=="user"):
while (flage == 1):
ps="[User]->"
temp1=input(ps)
if(temp1 =="exit"):
flage=0
break
elif(temp1=="show"):
for i in range(len(ip)):
print(i)

Python實現一個沒用的東西

import sys

ps="[root@localhost]# "
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]

while True:
temp=input(ps)
temp1=temp.split()

檢查各個進程讀寫的磁碟IO

import sys
import os
import time
import signal
import re

class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0

def main():
argc = len(sys.argv)
if argc != 1:
print ("usage: please run this script like [./lyshark.py]")
sys.exit(0)
if os.getuid() != 0:
print ("Error: This script must be run as root")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system('echo 1 > /proc/sys/vm/block_mp')
print ("TASK PID READ WRITE")
while True:
os.system('dmesg -c > /tmp/diskio.log')
l = []
f = open('/tmp/diskio.log', 'r')
line = f.readline()
while line:
m = re.match(
'^(S+)(d+)(d+): (READ|WRITE) block (d+) on (S+)', line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == "READ":
item.reads = item.reads + 1
elif m.group(3) == "WRITE":
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print ("%-10s %10s %10d %10d" %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system('echo 0 > /proc/sys/vm/block_mp')
sys.exit(0)

if name ==" main ":
main()

利用Pexpect實現自動非交互登陸linux

import pexpect
import sys

ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.log', 'w')
ssh.logfile = fout

ssh.expect("[email protected]'s password:")

ssh.sendline("密碼")

ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')

利用psutil模塊獲取系統的各種統計信息

import sys
import psutil
import time
import os

time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"

if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , "w" )
else :
handle = open ( file_name , "a" )

if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2

def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False

print_str = "";

if ( print_type == 1 ) or isset( sys.argv,"mem" ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += " 內存狀態如下: "
print_str = print_str + " 系統的內存容量為: "+str( mem.total/( memory_convent ) ) + " MB "
print_str = print_str + " 系統的內存以使用容量為: "+str( mem.used/( memory_convent ) ) + " MB "
print_str = print_str + " 系統可用的內存容量為: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB "
print_str = print_str + " 內存的buffer容量為: "+str( mem.buffers/( memory_convent ) ) + " MB "
print_str = print_str + " 內存的cache容量為:" +str( mem.cached/( memory_convent ) ) + " MB "

if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
print_str += " CPU狀態如下: "
cpu_status = psutil.cpu_times()
print_str = print_str + " user = " + str( cpu_status.user ) + " "
print_str = print_str + " nice = " + str( cpu_status.nice ) + " "
print_str = print_str + " system = " + str( cpu_status.system ) + " "
print_str = print_str + " idle = " + str ( cpu_status.idle ) + " "
print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + " "
print_str = print_str + " irq = " + str( cpu_status.irq ) + " "
print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + " "
print_str = print_str + " steal = " + str ( cpu_status.steal ) + " "
print_str = print_str + " guest = " + str ( cpu_status.guest ) + " "

if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
print_str += " 硬碟信息如下: "
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + " "+ str( item ) + " "

if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
print_str += " 登錄用戶信息如下: "
user_status = psutil.users()
for item in user_status :
print_str = print_str + " "+ str( item ) + " "

print_str += "--------------------------------------------------------------- "
print ( print_str )
handle.write( print_str )
handle.close()

import psutil

mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 輸出獲取SWAP分區信息

cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches

psutil.cpu_times(percpu=True) # 輸出每個核心的詳細CPU信息
psutil.cpu_times().user # 獲取CPU的單項數據 [用戶態CPU的數據]
psutil.cpu_count() # 獲取CPU邏輯核心數,默認logical=True
psutil.cpu_count(logical=False) # 獲取CPU物理核心數

psutil.disk_partitions() # 列出全部的分區信息
psutil.disk_usage('/') # 顯示出指定的掛載點情況【位元組為單位】
psutil.disk_io_counters() # 磁碟總的IO個數
psutil.disk_io_counters(perdisk=True) # 獲取單個分區IO個數

psutil.net_io_counter() 獲取網路總的IO,默認參數pernic=False
psutil.net_io_counter(pernic=Ture)獲取網路各個網卡的IO

psutil.pids() # 列出所有進程的pid號
p = psutil.Process(2047)
p.name() 列出進程名稱
p.exe() 列出進程bin路徑
p.cwd() 列出進程工作目錄的絕對路徑
p.status()進程當前狀態[sleep等狀態]
p.create_time() 進程創建的時間 [時間戳格式]
p.uids()
p.gids()
p.cputimes() 【進程的CPU時間,包括用戶態、內核態】
p.cpu_affinity() # 顯示CPU親緣關系
p.memory_percent() 進程內存利用率
p.meminfo() 進程的RSS、VMS信息
p.io_counters() 進程IO信息,包括讀寫IO數及位元組數
p.connections() 返回打開進程socket的nametples列表
p.num_threads() 進程打開的線程數

import psutil
from subprocess import PIPE
p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()

psutil.users() # 顯示當前登錄的用戶,和Linux的who命令差不多

psutil.boot_time() 結果是個UNIX時間戳,下面我們來轉換它為標准時間格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的結果不是str格式,繼續進行轉換 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')

Python生成一個隨機密碼

import random, string
def GenPassword(length):

if name == ' main ':
print (GenPassword(6))

⑶ Python實現性能自動化測試竟然如此簡單

一、思考❓❔

1.什麼是性能自動化測試?

2.Python中的性能自動化測試庫?

locust庫

二、基礎操作

1.安裝locust

安裝成功之後,在cmd控制台將會新增一條命令,可輸入如下命令查看:

2.基本用法

三、綜合案例演練

1.編寫自動化測試腳本

2.使用命令行運行

3.打開web ui界面進行配置

設置並發用戶數為10,每5秒創建一個用戶

壓測過程截圖

美輪美奐的壓測報告

壓測失敗詳情

下載壓測統計數據

下載的壓測統計數據csv文件

六、總結

出處:https://www.cnblogs.com/keyou1/

⑷ python開發命令行腳本

工作中會經常需要寫一些命令行腳本,如果還是用if,else判斷用戶輸入實在是太醜陋了。這里介紹幾個python里的命令行腳本庫,可以幫助我們快速開發好用的命令行腳本。

https://docs.python.org/3/library/cmd.html

使用方式是繼承Cmd,實現自己的子類。

參數comletekey是自動補全操作,默認值是Tab, 如果不為None 且readline可用的話,命令會自動完成。
這里的readline指的是python實現的 GNU readline 介面(標准python庫里沒有,Windows系統不支持)。

參數stdin,stdout是輸入輸出流,默認是sys.stdin,sys.stout。

cmd提供了一個簡單的框架,但是功能比較簡單,python還有其他的很多第三方庫可以用來寫命令行程序。

https://www.cnblogs.com/xueweihan/p/12293402.html 這篇文章對比了各個庫的功能,貼在這里:

看起來fire是最簡單的,來試一下。

fire 則是用一種面向廣義對象的方式來玩轉命令行,這種對象可以是類、函數、字典、列表等,它更加靈活,也更加簡單。你都不需要定義參數類型,fire 會根據輸入和參數默認值來自動判斷,這無疑進一步簡化了實現過程。

以下示例為 fire 實現的 計算器程序:

從上述示例可以看出,fire 提供的方式無疑是最簡單、並且最 Pythonic 的了。我們只需關注業務邏輯,而命令行參數的定義則和函數參數的定義融為了一體。

不過,有利自然也有弊,比如 nums 並沒有說是什麼類型,也就意味著輸入字元串'abc'也是合法的,這就意味著一個嚴格的命令行程序必須在自己的業務邏輯中來對期望的類型進行約束。

⑸ Python - pytest

目錄

pytest是Python的單元測試框架,同自帶的unittest框架類似,但pytest框架使用起來更簡潔,效率更高。

pytest特點

安裝

測試

在測試之前要做的准備

我的演示腳本處於這樣一個的目錄中:

踩坑:你創建的pytest腳本名稱中不允許含有 . ,比如 1.簡單上手.py ,這樣會報錯。當然,可以這么寫 1-簡單上手.py

demo1.py :

上例中,當我們在執行(就像Python解釋器執行普通的Python腳本一樣)測試用例的時候, pytest.main(["-s", "demo1.py"]) 中的傳參需要是一個元組或者列表(我的pytest是5.2.2版本),之前的版本可能需要這么調用 pytest.main("-s demo1.py") ,傳的參數是str的形式,至於你使用哪種,取決於報不報錯:

遇到上述報錯,就是參數需要一個列表或者元組的形式,而我們使用的是str形式。

上述代碼正確的執行結果是這樣的:

大致的信息就是告訴我們:

pytest.main(["-s", "demo1.py"])參數說明

除了上述的函數這種寫法,也可以有用例類的寫法:

用法跟unittest差不多,類名要以 Test 開頭,並且其中的用例方法也要以 test 開頭,然後執行也一樣。

執行結果:

那麼,你這個時候可能會問,我記得unittest中有setup和teardown的方法,難道pytest中沒有嘛?你怎麼提都不提?穩住,答案是有的。

接下來,我們來研究一下pytest中的setup和teardown的用法。

我們知道,在unittest中,setup和teardown可以在每個用例前後執行,也可以在所有的用例集執行前後執行。那麼在pytest中,有以下幾種情況:

來一一看看各自的用法。

模塊級別setup_mole/teardown_mole

執行結果:

類級別的setup_class/teardown_class

執行結果:

類中方法級別的setup_method/teardown_method

執行結果:

函數級別的setup_function/teardown_function

執行結果:

小結

該腳本有多種運行方式,如果處於PyCharm環境,可以使用右鍵或者點擊運行按鈕運行,也就是在pytest中的主函數中運行:

也可以在命令行中運行:

這種方式,跟使用Python解釋器執行Python腳本沒有什麼兩樣。也可以如下面這么執行:

當然,還有一種是使用配置文件運行,來看看怎麼用。

在項目的根目錄下,我們可以建立一個 pytest.ini 文件,在這個文件中,我們可以實現相關的配置:

那這個配置文件中的各項都是什麼意思呢?

首先, pytest.ini 文件必須位於項目的根目錄,而且也必須叫做 pytest.ini 。

其他的參數:

OK,來個示例。

首先,(詳細目錄參考開頭的目錄結構)在 scripts/test_case_01.py 中:

在 scripts/test_case_dir1/test_case02.py 中:

那麼,在不同的目錄或者文件中,共有5個用例將被執行,而結果則是兩個失敗三個成功。來執行驗證一下,因為有了配置文件,我們在終端中(前提是在項目的根目錄),直接輸入 pytest 即可。

由執行結果可以發現, 2 failed, 3 passed ,跟我們的預期一致。

後續執行相關配置都來自配置文件,如果更改,會有相應說明,終端都是直接使用 pytest 執行。

我們知道在unittest中,跳過用例可以用 skip ,那麼這同樣是適用於pytest。

來看怎麼使用:

跳過用例,我們使用 @pytest.mark.skipif(condition, reason) :

然後將它裝飾在需要被跳過用例的的函數上面。

效果如下:

上例執行結果相對詳細,因為我們在配置文件中為 addopts 增加了 -v ,之前的示例結果中,沒有加!
另外,此時,在輸出的控制台中, 還無法列印出 reason 信息,如果需要列印,則可以在配置文件中的 addopts 參數的 -s 變為 -rs :

如果我們事先知道測試函數會執行失敗,但又不想直接跳過,而是希望顯示的提示。

Pytest 使用 pytest.mark.xfail 實現預見錯誤功能::

需要掌握的必傳參數的是:

那麼關於預期失敗的幾種情況需要了解一下:

結果如下:

pytest 使用 x 表示預見的失敗(XFAIL)。

如果預見的是失敗,但實際運行測試卻成功通過,pytest 使用 X 進行標記(XPASS)。

而在預期失敗的兩種情況中,我們不希望出現預期失敗,結果卻執行成功了的情況出現,因為跟我們想的不一樣嘛,我預期這條用例失敗,那這條用例就應該執行失敗才對,你雖然執行成功了,但跟我想的不一樣,你照樣是失敗的!

所以,我們需要將預期失敗,結果卻執行成功了的用例標記為執行失敗,可以在 pytest.ini 文件中,加入:

這樣就就把上述的情況標記為執行失敗了。

pytest身為強大的單元測試框架,那麼同樣支持DDT數據驅動測試的概念。也就是當對一個測試函數進行測試時,通常會給函數傳遞多組參數。比如測試賬號登陸,我們需要模擬各種千奇百怪的賬號密碼。

當然,我們可以把這些參數寫在測試函數內部進行遍歷。不過雖然參數眾多,但仍然是一個測試,當某組參數導致斷言失敗,測試也就終止了。

通過異常捕獲,我們可以保證程所有參數完整執行,但要分析測試結果就需要做不少額外的工作。

在 pytest 中,我們有更好的解決方法,就是參數化測試,即每組參數都獨立執行一次測試。使用的工具就是 pytest.mark.parametrize(argnames, argvalues) 。

使用就是以裝飾器的形式使用。

只有一個參數的測試用例

來看(重要部分)結果::

可以看到,列表內的每個手機號,都是一條測試用例。

多個參數的測試用例

(重要部分)結果:

可以看到,每一個手機號與每一個驗證碼都組合一起執行了,這樣就執行了4次。那麼如果有很多個組合的話,用例數將會更多。我們希望手機號與驗證碼一一對應組合,也就是只執行兩次,怎麼搞呢?

在多參數情況下,多個參數名是以 , 分割的字元串。參數值是列表嵌套的形式組成的。

固件(Fixture)是一些函數,pytest 會在執行測試函數之前(或之後)載入運行它們,也稱測試夾具。

我們可以利用固件做任何事情,其中最常見的可能就是資料庫的初始連接和最後關閉操作。

Pytest 使用 pytest.fixture() 定義固件,下面是最簡單的固件,訪問主頁前必須先登錄:

結果:

在之前的示例中,你可能會覺得,這跟之前的setup和teardown的功能也類似呀,但是,fixture相對於setup和teardown來說更靈活。pytest通過 scope 參數來控制固件的使用范圍,也就是作用域。

比如之前的login固件,可以指定它的作用域:

很多時候需要在測試前進行預處理(如新建資料庫連接),並在測試完成進行清理(關閉資料庫連接)。

當有大量重復的這類操作,最佳實踐是使用固件來自動化所有預處理和後處理。

Pytest 使用 yield 關鍵詞將固件分為兩部分, yield 之前的代碼屬於預處理,會在測試前執行; yield 之後的代碼屬於後處理,將在測試完成後執行。

以下測試模擬資料庫查詢,使用固件來模擬資料庫的連接關閉:

結果:

可以看到在兩個測試用例執行前後都有預處理和後處理。

pytest中還有非常多的插件供我們使用,我們來介紹幾個常用的。

先來看一個重要的,那就是生成測試用例報告。

想要生成測試報告,首先要有下載,才能使用。

下載

如果下載失敗,可以使用PyCharm下載,怎麼用PyCharm下載這里無需多言了吧。

使用

在配置文件中,添加參數:

效果很不錯吧!

沒完,看我大招

Allure框架是一個靈活的輕量級多語言測試報告工具,它不僅以web的方式展示了簡潔的測試結果,而且允許參與開發過程的每個人從日常執行的測試中最大限度的提取有用信息。
從開發人員(dev,developer)和質量保證人員(QA,Quality Assurance)的角度來看,Allure報告簡化了常見缺陷的統計:失敗的測試可以分為bug和被中斷的測試,還可以配置日誌、步驟、fixture、附件、計時、執行 歷史 以及與TMS和BUG管理系統集成,所以,通過以上配置,所有負責的開發人員和測試人員可以盡可能的掌握測試信息。
從管理者的角度來看,Allure提供了一個清晰的「大圖」,其中包括已覆蓋的特性、缺陷聚集的位置、執行時間軸的外觀以及許多其他方便的事情。allure的模塊化和可擴展性保證了我們總是能夠對某些東西進行微調。

少扯點,來看看怎麼使用。

Python的pytest中allure下載

但由於這個 allure-pytest 插件生成的測試報告不是 html 類型的,我們還需要使用allure工具再「加工」一下。所以說,我們還需要下載這個allure工具。

allure工具下載

在現在allure工具之前,它依賴Java環境,我們還需要先配置Java環境。

注意,如果你的電腦已經有了Java環境,就無需重新配置了。

配置完了Java環境,我們再來下載allure工具,我這里直接給出了網路雲盤鏈接,你也可以去其他鏈接中自行下載:

下載並解壓好了allure工具包之後,還需要將allure包內的 bin 目錄添加到系統的環境變數中。

完事後打開你的終端測試:

返回了版本號說明安裝成功。

使用

一般使用allure要經歷幾個步驟:

來看配置 pytest.ini :

就是 --alluredir ./report/result 參數。

在終端中輸入 pytest 正常執行測試用例即可:

執行完畢後,在項目的根目下,會自動生成一個 report 目錄,這個目錄下有:

接下來需要使用allure工具來生成HTML報告。

此時我們在終端(如果是windows平台,就是cmd),路徑是項目的根目錄,執行下面的命令。

PS:我在pycharm中的terminal輸入allure提示'allure' 不是內部或外部命令,也不是可運行的程序或批處理文件。但windows的終端沒有問題。

命令的意思是,根據 reportresult 目錄中的數據(這些數據是運行pytest後產生的)。在 report 目錄下新建一個 allure_html 目錄,而這個目錄內有 index.html 才是最終的allure版本的HTML報告;如果你是重復執行的話,使用 --clean 清除之前的報告。

結果很漂亮:

allure open
默認的,allure報告需要HTTP伺服器來打開,一般我們可以通過pycharm來完成,另外一種情況就是通過allure自帶的open命令來完成。

allure的其他用法
當然,故事還是沒有完!在使用allure生成報告的時候,在編寫用例階段,還可以有一些參數可以使用:

allure.title與allure.description

feature和story

由上圖可以看到,不同的用例被分為不同的功能中。

allure.severity

allure.severity 用來標識測試用例或者測試類的級別,分為blocker,critical,normal,minor,trivial5個級別。

severity的默認級別是normal,所以上面的用例5可以不添加裝飾器了。

allure.dynamic

在之前,用例的執行順序是從上到下依次執行:

正如上例的執行順序是 3 1 2 。

現在,來看看我們如何手動控制多個用例的執行順序,這里也依賴一個插件。

下載

使用

手動控制用例執行順序的方法是在給各用例添加一個裝飾器:

那麼, 現在的執行順序是 2 1 3 ,按照order指定的排序執行的。

如果有人較勁傳個0或者負數啥的,那麼它們的排序關系應該是這樣的:

失敗重試意思是指定某個用例執行失敗可以重新運行。

下載

使用

需要在 pytest.ini 文件中, 配置:

給 addopts 欄位新增(其他原有保持不變) --reruns=3 欄位,這樣如果有用例執行失敗,則再次執行,嘗試3次。

來看示例:

結果:

我們也可以從用例報告中看出重試的結果:

上面演示了用例失敗了,然後重新執行多少次都沒有成功,這是一種情況。

接下來,來看另一種情況,那就是用例執行失敗,重新執行次數內通過了,那麼剩餘的重新執行的次數將不再執行。

通過 random 模塊幫助我們演示出在某次執行中出現失敗的情況,而在重新執行的時候,會出現成功的情況,看結果:

可以看到,用例 02 重新執行了一次就成功了,剩餘的兩次執行就終止了。

一條一條用例的執行,肯定會很慢,來看如何並發的執行測試用例,當然這需要相應的插件。

下載

使用

在配置文件中添加:

就是這個 -n=auto :

並發的配置可以寫在配置文件中,然後其他正常的執行用例腳本即可。另外一種就是在終端中指定,先來看示例:

結果:

pytest-sugar 改變了 pytest 的默認外觀,添加了一個進度條,並立即顯示失敗的測試。它不需要配置,只需 下載插件即可,用 pytest 運行測試,來享受更漂亮、更有用的輸出。

下載

其他照舊執行用例即可。

pytest-cov 在 pytest 中增加了覆蓋率支持,來顯示哪些代碼行已經測試過,哪些還沒有。它還將包括項目的測試覆蓋率。

下載

使用

在配置文件中:

也就是配置 --cov=./scripts ,這樣,它就會統計所有 scripts 目錄下所有符合規則的腳本的測試覆蓋率。

執行的話,就照常執行就行。

結果:

更多插件參考:https://zhuanlan.hu.com/p/50317866

有的時候,在 pytest.ini 中配置了 pytest-html 和 allure 插件之後,執行後報錯:

出現了這個報錯,檢查你配置的解釋器中是否存在 pytest-html 和 allure-pytest 這兩個模塊。如果是使用的pycharm ide,那麼你除了檢查settings中的解釋器配置之外,還需要保證運行腳本的編輯器配置是否跟settings中配置一致。

⑹ 如何用python寫腳本

以Python2.7操作為例:
1、首先需要打開電腦桌面,按開始的快捷鍵,點擊Python2.7如圖所示的選項進入。
相關推薦:《Python入門教程》
2、打開之後,開始編輯腳本,腳本第一行一定要寫上 #!usr/bin/python表示該腳本文件是可執行python腳本,如果python目錄不在usr/bin目錄下,則替換成當前python執行程序的目錄。
3、腳本寫完之後,打開CMD命令行,開始調試、可以直接用editplus調試。
4、最後,CMD命令行中,輸入 「python」 + 「空格」,即 」python 「,然後敲回車運行即可,這樣就可以把編輯好的腳本運行了。

⑺ python寫的selenium測試腳本,run.py文件引測試腳本怎麼批量引入

獲取以test開頭,以.py結尾的測試用例create_suite

生產測試報告eport_design

#coding=utf-8

fromemail.headerimportHeader

fromemail.mime.textimportMIMEText

importsmtplib

importunittest

importtime

importsys

#reload(sys)

#sys.setdefaultencoding('utf-8')

defcreate_suite():

#1.獲取框架中腳本的位置

script_dir="..Script\add"

#2.獲取要運行的腳本--discover

discv=unittest.defaultTestLoader.discover(script_dir,pattern="test_add_*.py")

#3.講獲取的腳本加入到測試集合

#創建一個測試集合

suite=unittest.TestSuite()

#循環遍歷discv列表中腳本的名字,並加入到suite中

forcaseindiscv:

#printcase

suite.addTest(case)

#講測試集返回

returnsuite

defreport_design():

globalfilename,runner,file1

now=time.strftime("%Y-%m-%d%H-%M-%S")

filename=".\Ggpt\add\"+now+"result.html"

file1=open(filename,'wb+')#wb+二進制寫入方式

#stream報告文件title標題description

runner=HTMLTestRunner(stream=file1,title="selenium_test_report",description="用例執行情況")

⑻ 集中式日誌分析平台 - ELK Stack - Filebeat 壓測

任何一款採集 agent 進行公司內全面推廣前都需要進行性能測試以及資源限制功能測試,以保證:

對於 Filebeat 這款號稱 golang 編寫,性能強於 logstahs-forwarder 的採集 agent,我們也需要這樣進行嚴謹對待。

硬體選擇虛擬機,6cores + 16GB Mem + 175GB SSD + 1000Mbps 帶寬;

Filebeat 配置,輸出到 console:

Filebeat 配置,輸出到 Kafka:

我們開啟 Filebeat 的 6060 埠,並使用 python 腳本進行指標採集。
expvar_rates.py ,每秒統計出 Filebeat 指標,主要看:

Step1. 啟動 Filebeat (172.16.134.8)

Step2. 啟動統計腳本

Step3. 啟動 tsar

Step4. 寫入壓測數據(6個進程寫入,6千萬條日誌)

在 6 進程數據寫入日誌文件時,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.8 cores。
在 6 進程數據寫入日誌文件後,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 1.6 cores。
小結:

測試步驟和上述一致,區別在於配置文件需要輸出到 Kafka。
在 6 進程數據寫入日誌文件時,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.7~0.8 cores。
在 6 進程數據寫入日誌文件後,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 2.0 cores。
小結:

測試步驟和上述一致,區別在於配置文件需要輸出到 Kafka。
和上述步驟不同的是,啟動 Filebeat 時需要 systemd 限制 CPU、句柄數,根據之前的理論,句柄數限制在 100 已經非常夠用,CPU 限制在 1 core。
修改 /usr/lib/systemd/system/filebeat.service :

執行 reload:

對 CPU 進行限制:

確認是否限製成功:

有如下輸出表示OK:

在 6 進程數據寫入日誌文件時,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.7 ~ 0.8 cores。
在 6 進程數據寫入日誌文件後,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 1.0 cores,限制生效。
小結:

在 6 進程數據寫入日誌文件時,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.75 ~ 0.9 cores。
在 6 進程數據寫入日誌文件後,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 1.0 cores,限制生效。
小結:

在 6 進程數據寫入日誌文件時,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.7 ~ 0.75 cores。
在 6 進程數據寫入日誌文件後,我們在開啟 python 統計腳本的窗口得到如下穩定的統計數據:

我們在 tsar 看到的統計數據為:

我們在 top 中可以看到 Filebeat 大致占據了 0.9 cores,未達到限制。
小結:

A. FB 全力採集 247B 數據(真實環境類似日誌長度),速率為 ~ 40K/s,CPU 開銷為 2 cores;

B. FB 在 CPU 限制 1 cores 情況下,採集 247B 數據速率為 ~ 20K/s,可以認為單核採集速率為 ~ 20K/s/core;

C. 日誌單行數據越大,吞吐越小,5KB 每行已經非常誇張,即使如此,沒有壓縮的情況下帶寬消耗 35MBps,gzip 壓縮率一般為 0.3~0.4,佔用帶寬為 10.5~14MBps,對於千兆網卡來說壓力較小;

⑼ Python 如何寫腳本

以Python2.7操作為例:

1、首先需要打開電腦桌面,按開始的快捷鍵,點擊Python2.7如圖所示的選項進入。

⑽ 初學Python,想做手機自動化測試腳本,想了解幾個問題

1、手機自動化測試Python能獨立完成嗎?可以。
2、想要學的話,看哪本教程會好些?首先學習自動化測試,然後學習python,然後結合實例學習。可以參考http://wenku..com/view/fd8b690b581b6bd97f19ea61.html
3、主要要學習的模塊內容或者方向是哪些?
Python的世界有一個開源框架Splinter,可以非常棒的模擬瀏覽器的行為(從某種意義上也可以說是人的訪問點擊行為)。Splinter提供了豐富的API,可以獲取頁面的信息,以判斷當前的行為所產生的結果

4、還有懂這行補充給我的,我另加分。。多項
多學習測試的各方面知識,python只是工具。測試的理論知識很重要。

閱讀全文

與python壓測腳本相關的資料

熱點內容
腳本提取源碼器 瀏覽:928
smo源碼 瀏覽:875
為什麼要搭建單獨伺服器 瀏覽:478
編譯器有什麼控制 瀏覽:891
希爾伯特pdf 瀏覽:645
php數組全數字 瀏覽:645
解密塔羅牌小程序源碼 瀏覽:862
聚合跑分源碼 瀏覽:553
注冊dns伺服器寫什麼 瀏覽:879
linux安裝deb包 瀏覽:521
電腦盤文件夾如何平鋪 瀏覽:267
相機卡滿了沒文件夾 瀏覽:751
如何批量快速壓縮視頻 瀏覽:432
我的世界如何加入ice伺服器 瀏覽:873
兄弟cnc編程說明書 瀏覽:204
php閃電入門教程學習 瀏覽:152
金岳霖邏輯pdf 瀏覽:938
linuxtomcat線程 瀏覽:77
pboc長度加數據加密 瀏覽:188
英雄聯盟國際服手游怎麼下安卓 瀏覽:299