❶ python怎麼實現rabbitmq的confirm模式
一、通過Python模擬收發消息
1、在各個節點上安裝租亂epel源
# yum install epel* -y11
2、安裝python庫
# yum --enablerepo=epel -y install python2-pika11
3、弊棚檔在rabbitmq-server節點上
1)、創建用戶
# rabbitmqctl add_user wuyeliang password 11
2)、創建虛擬主機
# rabbitmqctl add_vhost /my_vhost11
3)、賦予許可權
# rabbitmqctl set_permissions -p /my_vhost wuyeliang ".*" ".*" ".*" 11
4、在rabbitmq節點上模擬發消息,和納代碼如下
# vi send_msg.py
#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('wuyeliang', 'password') #注意用戶名及密碼
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost',
5672,
'/my_vhost',
credentials))
channel = connection.channel()
channel.queue_declare(queue='Hello_World')
channel.basic_publish(exchange='',
routing_key='Hello_World',
body='Hello RabbitMQ World!')
print(" [x] Sent 'Hello_World'")
connection.close()
4、在client節點上模擬收消息,代碼如下
# vi receive_msg.py
#!/usr/bin/env python
import signal
import pika
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
credentials = pika.PlainCredentials('wuyeliang', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'dlp.srv.world',
5672,
'/my_vhost',
credentials))
channel = connection.channel()
channel.queue_declare(queue='Hello_World')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='Hello_World',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
❷ ubuntu|linux下 如何用python 模擬按鍵
ubuntu下,也就是linux下,通常會用kill -事件編號實現。 你查一下LINUX下的事件就明白了。
kill 進程號 實現上是發了一個信號給指定的進程。 在python里,也可以載入事件處理模塊,處理來自其它程序發過來的信號, 當然你可以用KILL工具發信號過來。
ctrl+d也是一個信號,ctrl+c也是一個。具體信號編碼我不記得了。不過以前我做多進程管理時就是使用這個方法。 好象信號還可以帶參數過來。
你打開python的幫助。看看signal這個模塊。我把它的例子拿過來。對你有用不
importsignal,os
defhandler(signum,frame):
print'Signalhandlercalledwithsignal',signum
raiseIOError("Couldn'topendevice!")
#Setthesignalhandleranda5-secondalarm
signal.signal(signal.SIGALRM,handler)
signal.alarm(5)
#Thisopen()mayhangindefinitely
fd=os.open('/dev/ttyS0',os.O_RDWR)
signal.alarm(0)#Disablethealarm
下面是我找到的一些資料。也許有用。
信號的概念
信號(signal)--進程之間通訊的方式,是一種軟體中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。
幾個常用信號:
SIGINT終止進程中斷進程(control+c)
SIGTERM終止進程軟體終止信號
SIGKILL終止進程殺死進程
SIGALRM鬧鍾信號
進程結束信號SIGTERM和SIGKILL的區別
SIGTERM比較友好,進程能捕捉這個信號,根據您的需要來關閉程序。在關閉程序之前,您可以結束打開的記錄文件和完成正在做的任務。在某些情況下,假如進程正在進行作業而且不能中斷,那麼進程可以忽略這個SIGTERM信號。
對於SIGKILL信號,進程是不能忽略的。這是一個「我不管您在做什麼,立刻停止」的信號。假如您發送SIGKILL信號給進程,Linux就將進程停止在那裡。
發送信號一般有兩種原因:
1(被動式)內核檢測到一個系統事件.例如子進程退出會像父進程發送SIGCHLD信號.鍵盤按下control+c會發送SIGINT信號
2(主動式)通過系統調用kill來向指定進程發送信號
linux操作系統提供的信號
[100003@oss235 myppt]$ kill -l
1) SIGHUP2) SIGINT3) SIGQUIT4) SIGILL
5) SIGTRAP6) SIGABRT7) SIGBUS8) SIGFPE
9) SIGKILL10) SIGUSR111) SIGSEGV12) SIGUSR2
13) SIGPIPE14) SIGALRM15) SIGTERM16) SIGSTKFLT
17) SIGCHLD18) SIGCONT19) SIGSTOP20) SIGTSTP
21) SIGTTIN22) SIGTTOU23) SIGURG24) SIGXCPU
25) SIGXFSZ26) SIGVTALRM27) SIGPROF28) SIGWINCH
29) SIGIO30) SIGPWR31) SIGSYS34) SIGRTMIN
35) SIGRTMIN+136) SIGRTMIN+237) SIGRTMIN+338) SIGRTMIN+4
39) SIGRTMIN+540) SIGRTMIN+641) SIGRTMIN+742) SIGRTMIN+8
43) SIGRTMIN+944) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12
47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14
51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10
55) SIGRTMAX-956) SIGRTMAX-857) SIGRTMAX-758) SIGRTMAX-6
59) SIGRTMAX-560) SIGRTMAX-461) SIGRTMAX-362) SIGRTMAX-2
63) SIGRTMAX-164) SIGRTMAX
Python提供的信號
Python 2.4.3 (#1, Jun 11 2009, 14:09:58)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-44)] on linux2
Type "help", "right", "credits" or "license" for more information.
>>> import signal
>>> dir(signal)
['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD',
'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT',
'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT',
'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM',
'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1',
'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL',
'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler',
'getsignal', 'pause', 'signal']
操作系統規定了進程收到信號以後的默認行為
但是,我們可以通過綁定信號處理函數來修改進程收到信號以後的行為
有兩個信號是不可更改的SIGTOP和SIGKILL
綁定信號處理函數
import os
import signal
from time import sleep
def onsignal_term(a,b):
print '收到SIGTERM信號'
#這里是綁定信號處理函數,將SIGTERM綁定在函數onsignal_term上面
signal.signal(signal.SIGTERM,onsignal_term)
def onsignal_usr1(a,b):
print '收到SIGUSR1信號'
#這里是綁定信號處理函數,將SIGUSR1綁定在函數onsignal_term上面
signal.signal(signal.SIGUSR1,onsignal_usr1)
while 1:
print '我的進程id是',os.getpid()
sleep(10)
運行該程序。然後通過另外一個進程來發送信號。
發送信號
發送信號的代碼如下:
import os
import signal
#發送信號,16175是前面那個綁定信號處理函數的pid,需要自行修改
os.kill(16175,signal.SIGTERM)
#發送信號,16175是前面那個綁定信號處理函數的pid,需要自行修改
os.kill(16175,signal.SIGUSR1)
❸ python如何使用ctrl+c來退出程序
根據我處理這個問題的教訓,python的多線程面對這個情況是非常郁悶的,所以我最後選擇了用multiprocessing模塊(多進程)替換了多線程。如果可以的話,我強烈建議你改用multiprocessing。
❹ 如何讓python腳本在關閉的時候運行一段代碼
如果是命令行的,一般是捕獲ctrl-c事件吧。
import signal
import sys
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()
如果是基於GUI框架開發的,都有事件觸發的,你重載事件就好了。比如Qt就有closeEvent事件
❺ python 一個進程可以注冊多個信號么
可以
信號的概念
信號(signal)-- 進程之間通訊的方式,是一種軟體中斷。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。
幾個常用信號:
SIGINT 終止進程 中斷進程 (control+c)
SIGTERM 終止進程 軟體終止信號
SIGKILL 終止進程 殺死進程
SIGALRM 鬧鍾信號
進程結束信號 SIGTERM和SIGKILL的區別
SIGTERM比較友好,進程能捕捉這個信號,根據您的需要來關閉程序。在關閉程序之前,您可以結束打開的記錄文件和完成正在做的任務。在某些情況下,假如進程正在進行作業而且不能中斷,那麼進程可以忽略這個SIGTERM信號。
對於SIGKILL信號,進程是不能忽略的。這是一個 「我不管您在做什麼,立刻停止」的信號。假如您發送SIGKILL信號給進程,Linux就將進程停止在那裡。
發送信號一般有兩種原因:
1(被動式) 內核檢測到一個系統事件.例如子進程退出會像父進程發送SIGCHLD信號.鍵盤按下control+c會發送SIGINT信號
2(主動式) 通過系統調用kill來向指定進程發送信號
linux操作系統提供的信號
[100003@oss235 myppt]$ kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL
5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE
9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2
13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGSTKFLT
17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU
25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH
29) SIGIO 30) SIGPWR 31) SIGSYS 34) SIGRTMIN
35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3 38) SIGRTMIN+4
39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12
47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14
51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10
55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7 58) SIGRTMAX-6
59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
63) SIGRTMAX-1 64) SIGRTMAX
Python提供的信號
Python 2.4.3 (#1, Jun 11 2009, 14:09:58)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-44)] on linux2
Type "help", "right", "credits" or "license" for more information.
>>> import signal
>>> dir(signal)
['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD', 'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT', 'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT', 'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM', 'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1', 'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL', 'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler', 'getsignal', 'pause', 'signal']
操作系統規定了進程收到信號以後的默認行為
但是,我們可以通過綁定信號處理函數來修改進程收到信號以後的行為
有兩個信號是不可更改的SIGTOP和SIGKILL
綁定信號處理函數
import os
import signal
from time import sleep
def onsignal_term(a,b):
print '收到SIGTERM信號'
#這里是綁定信號處理函數,將SIGTERM綁定在函數onsignal_term上面
signal.signal(signal.SIGTERM,onsignal_term)
def onsignal_usr1(a,b):
print '收到SIGUSR1信號'
#這里是綁定信號處理函數,將SIGUSR1綁定在函數onsignal_term上面
signal.signal(signal.SIGUSR1,onsignal_usr1)
while 1:
print '我的進程id是',os.getpid()
sleep(10)
運行該程序。然後通過另外一個進程來發送信號。
發送信號
發送信號的代碼如下:
import os
import signal
#發送信號,16175是前面那個綁定信號處理函數的pid,需要自行修改
os.kill(16175,signal.SIGTERM)
#發送信號,16175是前面那個綁定信號處理函數的pid,需要自行修改
os.kill(16175,signal.SIGUSR1)
SIGCHLD信號
然後顯示一個子進程結束後自動向父進程發送SIGCHLD信號的例子。
'''''''
子進程結束會向父進程發送SIGCHLD信號
'''
import os
import signal
from time import sleep
def onsigchld(a,b):
print '收到子進程結束信號'
signal.signal(signal.SIGCHLD,onsigchld)
pid = os.fork()
if pid == 0:
print '我是子進程,pid是',os.getpid()
sleep(2)
else:
print '我是父進程,pid是',os.getpid()
os.wait() #等待子進程結束
使用信號需要特別注意的地方:
如果一個進程收到一個SIGUSR1信號,然後執行信號綁定函數,第二個SIGUSR2信號又來了,第一個信號沒有被處理完畢的話,第二個信號就會丟棄。
所以,盡量不要在多線程中使用信號。
這個不妥,測試沒發現有信號丟失
例子演示:
接收信號的程序,你會發現如果有另外一端使用多線程向這個進程發送信號,會遺漏一些信號。
import os
import signal
from time import sleep
import Queue
QCOUNT = Queue.Queue() #初始化隊列
def onsigchld(a,b):
'''''''收到信號後向隊列中插入一個數字1'''
print '收到SIGUSR1信號'
sleep(2)
QCOUNT.put(1) #向隊列中寫入
def exithanddle(s,e):
raise SystemExit('收到終止命令,退出程序')
signal.signal(signal.SIGUSR1,onsigchld) #綁定信號處理函數
signal.signal(signal.SIGINT,exithanddle) #當按下Ctrl + C 終止進程
while 1:
print '我的pid是',os.getpid()
print '現在隊列中元素的個數是',QCOUNT.qsize()
sleep(2)
多線程發信號端的程序:
'''''''
使用多線程向另外一個進程發送信號
'''
import threading
import os
import signal
def sensr1():
print '發送信號'
#這里的進程id需要寫前一個程序實際運行的pid
os.kill(17788, signal.SIGUSR1)
WORKER = []
#開啟6個線程
for i in range(1, 7):
threadinstance = threading.Thread(target = sensr1)
WORKER.append(threadinstance)
for i in WORKER:
i.start()
for i in WORKER:
i.join()
print '主線程完成'
內容補充:
Alarms 是一個特殊信號類型,它可以讓程序要求系統經過一段時間對自己發送通知。os 標准模塊中指出,它可用於避免無限制阻塞 I/O 操作或其它系統調用。
像下面例子,原本程序睡眠 10 後才列印出print 'After :', time.ctime(),但是由於 signal.alarm(2),所以 2 秒後就執行了列印。
import signal
import time
def receive_alarm(signum, stack):
print 'Alarm :', time.ctime()
# Call receive_alarm in 2 seconds
signal.signal(signal.SIGALRM, receive_alarm)
signal.alarm(2)
print 'Before:', time.ctime()
time.sleep(10)
print 'After :', time.ctime()
注意Signal只有主線程才能接收信號,像下面例子,print 'Done waiting' 語句列印不出來,如果不調用 signal.alarm(2) ,程序將永遠阻塞
import signal
import threading
import os
import time
def signal_handler(num, stack):
print 'Received signal %d in %s' % \
(num, threading.currentThread().name)
signal.signal(signal.SIGUSR1, signal_handler)
def wait_for_signal():
print 'Waiting for signal in', threading.currentThread().name
signal.pause()
print 'Done waiting'
# Start a thread that will not receive the signal
receiver = threading.Thread(target=wait_for_signal, name='receiver')
receiver.start()
time.sleep(0.1)
def send_signal():
print 'Sending signal in', threading.currentThread().name
os.kill(os.getpid(), signal.SIGUSR1)
sender = threading.Thread(target=send_signal, name='sender')
sender.start()
sender.join()
# Wait for the thread to see the signal (not going to happen!)
print 'Waiting for', receiver.name
signal.alarm(2)
receiver.join()
還有一點需要注意的是,雖然 alarms 類信號可以在任何線程中調用,但是只能在主線程中接收,像下面例子即使子線程 use_alarm 中調用 signal.alarm(1) ,但是不起作用 :
import signal
import time
import threading
def signal_handler(num, stack):
print time.ctime(), 'Alarm in', threading.currentThread().name
signal.signal(signal.SIGALRM, signal_handler)
def use_alarm():
t_name = threading.currentThread().name
print time.ctime(), 'Setting alarm in', t_name
signal.alarm(1)
print time.ctime(), 'Sleeping in', t_name
time.sleep(3)
print time.ctime(), 'Done with sleep in', t_name
# Start a thread that will not receive the signal
alarm_thread = threading.Thread(target=use_alarm,
name='alarm_thread')
alarm_thread.start()
time.sleep(0.1)
# Wait for the thread to see the signal (not going to happen!)
print time.ctime(), 'Waiting for', alarm_thread.name
alarm_thread.join()
print time.ctime(), 'Exiting normally'
❻ 有沒有python的串口庫
串口模塊的波特率比較特別,找了幾個串口工具都不支持。。。所以,乾脆用python自己來寫了,其實已經好奇好久了,別人的工具各種不順手。
需要pyserial的支持,兼容各種平台,不需要新編譯二進制文件。
先貼一個定時發送的代碼:
6. exception:serial.SerialException
另一個完整收發的例子,單片機數據以TLV(Type,Length,Value)格式發上來
#!/usr/bin/env python
# it's a program of luo, [email protected]
import serialimport arrayimport osimport signalfrom time import sleep
flag_stop = Falsedef onsignal_int(a,b): print "sigint!"
global flag_stop
flag_stop = True
signal.signal(signal.SIGINT, onsignal_int)
signal.signal(signal.SIGTERM, onsignal_int)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout = 0.001)print "serial.isOpen() =",ser.isOpen()
cmd_send = ""cmd_send = cmd_send.decode("hex")
stop = "7b04047d0d0a"stop = stop.decode("hex")
cmd_back = ""cmd_length = 0x00cmd_count = 0x00s = ser.write(cmd_send)while True:
sleep(0.1)
if flag_stop: # read data until Ctrl+c
ser.write(stop) # send cmd stop before exit
print "reset cmd has been sent!"
sleep(0.05) break
text = ser.read(1) # read one, with timout
if text: # check if not timeout
n = ser.inWaiting() # look if there is more to read
if n:
text = text + ser.read(n) #get it
cmd_back = cmd_back + text
text = ""
if len(cmd_back) < 2: # go back if no enough data recvd
continue
if cmd_length == 0x00: # new loop
cmd_length = ord(cmd_back[1]) # Type(1 byte),Length of Value(1 byte),Value
print "new cmd length,",cmd_length
if (cmd_length + 0x02) > len(cmd_back): # do nothing until all bytes is recvd
continue
# so far, we have got a full cmd
hex_list = [hex(ord(i)) for i in cmd_back] # more readable than data.encode("hex")
print "In buffer:",hex_list
cmd_back = cmd_back[cmd_length+2:] # remove this cmd(TLV) from buffer
cmd_length = 0
cmd_count += 1
print "==> %d cmds recvd."%(cmd_count) print "-------------"
ser.close()
——————
❼ python 怎麼讓程序接受ctrl + c終止信號
花了一天時間用python為服務寫了個壓力測試。很簡單,多線程向伺服器發請求。但寫完之後發現如果中途想停下來,按Ctrl+C達不到效果,自然想到要用信號處理函數捕捉信號,使線程都停下來,問題解決的方法請往下看:
復制代碼代碼如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.start()
上面是一個模擬程序,並不真正向服務發送請求,而代之以在一千萬以內,每個線程每隔並發數個(cc個)列印一個整數。很明顯,當所有線程都完成自己的任務後,進程會正常退出。但如果我們中途想退出(試想一個壓力測試程序,在中途已經發現了問題,需要停止測試),該腫么辦?你當然可以用ps查找到進程號,然後kill -9殺掉,但這樣太繁瑣了,捕捉Ctrl+C是最自然的想法。上面示常式序中已經捕捉了這個信號,並修改全局變數is_exit,線程中會檢測這個變數,及時退出。
但事實上這個程序並不work,當你按下Ctrl+C時,程序照常運行,並無任何響應。網上搜了一些資料,明白是python的子線程如果不是daemon的話,主線程是不能響應任何中斷的。但設為daemon後主線程會隨之退出,接著整個進程很快就退出了,所以還需要在主線程中檢測各個子線程的狀態,直到所有子線程退出後自己才退出,因此上例29行之後的代碼可以修改為:
復制代碼代碼如下:
threads=[]
for i in range(cc):
t = threading.Thread(target=doStress, args=(i, cc))
t.setDaemon(True)
threads.append(t)
t.start()
for i in range(cc):
threads[i].join()
重新試一下,問題依然沒有解決,進程還是沒有響應Ctrl+C,這是因為join()函數同樣會waiting在一個鎖上,使主線程無法捕獲信號。因此繼續修改,調用線程的isAlive()函數判斷線程是否完成:
復制代碼代碼如下:
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
這樣修改後,程序完全按照預想運行了:可以順利的列印每個線程應該列印的所有數字,也可以中途用Ctrl+C終結整個進程。完整的代碼如下:
復制代碼代碼如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
if is_exit:
print "receive a signal to exit, thread[%d] stop."%i
else:
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
threads = []
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.setDaemon(True)
threads.append(t)
t.start()
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
其實,如果用python寫一個服務,也需要這樣,因為負責服務的那個線程是永遠在那裡接收請求的,不會退出,而如果你想用Ctrl+C殺死整個服務,跟上面的壓力測試程序是一個道理。總結一下,python多線程中要響應Ctrl+C的信號以殺死整個進程,需要:
1.把所有子線程設為Daemon;
2.使用isAlive()函數判斷所有子線程是否完成,而不是在主線程中用join()函數等待完成;
3.寫一個響應Ctrl+C信號的函數,修改全局變數,使得各子線程能夠檢測到,並正常退出。
❽ python 運維常用腳本
Python 批量遍歷目錄文件,並修改訪問時間
import os
path = "D:/UASM64/include/"
dirs = os.listdir(path)
temp=[];
for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 實現的自動化伺服器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
def ssh_put(user,passwd,source,target):
while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("輸入一個計劃任務: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("輸入要刪除的計劃任務: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)
遍歷目錄和文件
import os
def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夾下所有的目錄與文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
a=list_all_files("C:/Users/LyShark/Desktop/a")
print(a)
python檢測指定埠狀態
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open
"%ip)
except Exception:
print("192.168.1.%d server not open"%ip)
sk.close()
python實現批量執行CMD命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print("------------------------------>
")
print("使用說明,在當前目錄創建ip.txt寫入ip地址")
print("------------------------------>
")
user=input("輸入用戶名:")
passwd=input("輸入密碼:")
port=input("輸入埠:")
cmd=input("輸入執行的命令:")
file = open("./ip.txt", "r")
line = file.readlines()
for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip('
'))
python3-實現釘釘報警
import requests
import sys
import json
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": "監控","text": "apche異常"}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
import psutil
import requests
import time
import os
import json
monitor_name = set(['httpd','cobblerd']) # 用戶指定監控的服務進程名稱
proc_dict = {}
proc_name = set() # 系統檢測的進程名稱
monitor_map = {
'httpd': 'systemctl restart httpd',
'cobblerd': 'systemctl restart cobblerd' # 系統在進程down掉後,自動重啟
}
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
while True:
for proc in psutil.process_iter(attrs=['pid','name']):
proc_dict[proc.info['pid']] = proc.info['name']
proc_name.add(proc.info['name'])
判斷指定埠是否開放
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()
判斷指定埠並且實現釘釘輪詢報警
import requests
import sys
import json
import socket
import time
def dingding(title,text):
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
return index
sock.close()
while True:
dingding("Warning",net_scan())
time.sleep(60)
python-實現SSH批量CMD執行命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, "r")
line = file.readlines()
for i in range(len(line)):
print("對IP: %s 執行"%line[i].strip('
'))
ssh.connect(hostname=line[i].strip('
'),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")
用python寫一個列舉當前目錄以及所有子目錄下的文件,並列印出絕對路徑
import sys
import os
for root,dirs,files in os.walk("C://"):
for name in files:
print(os.path.join(root,name))
os.walk()
按照這樣的日期格式(xxxx-xx-xx)每日生成一個文件,例如今天生成的文件為2013-09-23.log, 並且把磁碟的使用情況寫到到這個文件中。
import os
import sys
import time
new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()
str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)
f.flush()
f.close()
統計出每個IP的訪問量有多少?(從日誌文件中查找)
import sys
list = []
f = open("/var/log/httpd/access_log","r")
str1 = f.readlines()
f.close()
for i in str1:
ip=i.split()[0]
list.append(ip)
list_num=set(list)
for j in list_num:
num=list.count(j)
print("%s -----> %s" %(num,j))
寫個程序,接受用戶輸入數字,並進行校驗,非數字給出錯誤提示,然後重新等待用戶輸入。
import tab
import sys
while True:
try:
num=int(input("輸入數字:").strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print("您輸入的不是數字")
except KeyboardInterrupt:
sys.exit("
")
ps 可以查看進程的內存佔用大小,寫一個腳本計算一下所有進程所佔用內存大小的和。
import sys
import os
list=[]
sum=0
str1=os.popen("ps aux","r").readlines()
for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num
print("%s ---> %s"%(list[0],sum))
關於Python 命令行參數argv
import sys
if len(sys.argv) < 2:
print ("沒有輸入任何參數")
sys.exit()
if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]
利用random生成6位數字加字母隨機驗證碼
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)
自動化-使用pexpect非交互登陸系統
import pexpect
import sys
ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.txt', 'w')
ssh.logfile = fout
ssh.expect("[email protected]'s password:")
ssh.sendline("密碼")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
Python-取系統時間
import sys
import time
time_str = time.strftime("日期:%Y-%m-%d",time.localtime())
print(time_str)
time_str= time.strftime("時間:%H:%M",time.localtime())
print(time_str)
psutil-獲取內存使用情況
import sys
import os
import psutil
memory_convent = 1024 * 1024
mem =psutil.virtual_memory()
print("內存容量為:"+str(mem.total/(memory_convent))+"MB
")
print("已使用內存:"+str(mem.used/(memory_convent))+"MB
")
print("可用內存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB
")
print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB
")
print("cache容量:"+str(mem.cached/(memory_convent))+"MB
")
Python-通過SNMP協議監控CPU
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split('
')[:-1]
return sn1
def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')
if name == ' main ':
Python-通過SNMP協議監控系統負載
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
import sys
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('
')
return sn1
def getload(host,loid):
load_oids = Ƈ.3.6.1.4.1.2021.10.1.3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]
if name == ' main ':
Python-通過SNMP協議監控內存
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import os
def getAllitems(host, oid):
def getSwapTotal(host):
def getSwapUsed(host):
def getMemTotal(host):
def getMemUsed(host):
if name == ' main ':
Python-通過SNMP協議監控磁碟
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
def getDate(source,newitem):
def getRealDate(item1,item2,listname):
def caculateDiskUsedRate(host):
if name == ' main ':
Python-通過SNMP協議監控網卡流量
注意:被監控的機器上需要支持snmp協議 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('
')[:-1]
return sn1
def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []
def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []
if name == ' main ':
Python-實現多級菜單
import os
import sys
ps="[None]->"
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
flage=1
while True:
ps="[None]->"
temp=input(ps)
if (temp=="test"):
print("test page !!!!")
elif(temp=="user"):
while (flage == 1):
ps="[User]->"
temp1=input(ps)
if(temp1 =="exit"):
flage=0
break
elif(temp1=="show"):
for i in range(len(ip)):
print(i)
Python實現一個沒用的東西
import sys
ps="[root@localhost]# "
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
while True:
temp=input(ps)
temp1=temp.split()
檢查各個進程讀寫的磁碟IO
import sys
import os
import time
import signal
import re
class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0
def main():
argc = len(sys.argv)
if argc != 1:
print ("usage: please run this script like [./lyshark.py]")
sys.exit(0)
if os.getuid() != 0:
print ("Error: This script must be run as root")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system('echo 1 > /proc/sys/vm/block_mp')
print ("TASK PID READ WRITE")
while True:
os.system('dmesg -c > /tmp/diskio.log')
l = []
f = open('/tmp/diskio.log', 'r')
line = f.readline()
while line:
m = re.match(
'^(S+)(d+)(d+): (READ|WRITE) block (d+) on (S+)', line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == "READ":
item.reads = item.reads + 1
elif m.group(3) == "WRITE":
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print ("%-10s %10s %10d %10d" %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system('echo 0 > /proc/sys/vm/block_mp')
sys.exit(0)
if name ==" main ":
main()
利用Pexpect實現自動非交互登陸linux
import pexpect
import sys
ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.log', 'w')
ssh.logfile = fout
ssh.expect("[email protected]'s password:")
ssh.sendline("密碼")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
利用psutil模塊獲取系統的各種統計信息
import sys
import psutil
import time
import os
time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"
if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , "w" )
else :
handle = open ( file_name , "a" )
if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2
def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False
print_str = "";
if ( print_type == 1 ) or isset( sys.argv,"mem" ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += " 內存狀態如下:
"
print_str = print_str + " 系統的內存容量為: "+str( mem.total/( memory_convent ) ) + " MB
"
print_str = print_str + " 系統的內存以使用容量為: "+str( mem.used/( memory_convent ) ) + " MB
"
print_str = print_str + " 系統可用的內存容量為: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB
"
print_str = print_str + " 內存的buffer容量為: "+str( mem.buffers/( memory_convent ) ) + " MB
"
print_str = print_str + " 內存的cache容量為:" +str( mem.cached/( memory_convent ) ) + " MB
"
if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
print_str += " CPU狀態如下:
"
cpu_status = psutil.cpu_times()
print_str = print_str + " user = " + str( cpu_status.user ) + "
"
print_str = print_str + " nice = " + str( cpu_status.nice ) + "
"
print_str = print_str + " system = " + str( cpu_status.system ) + "
"
print_str = print_str + " idle = " + str ( cpu_status.idle ) + "
"
print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + "
"
print_str = print_str + " irq = " + str( cpu_status.irq ) + "
"
print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + "
"
print_str = print_str + " steal = " + str ( cpu_status.steal ) + "
"
print_str = print_str + " guest = " + str ( cpu_status.guest ) + "
"
if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
print_str += " 硬碟信息如下:
"
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + " "+ str( item ) + "
"
if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
print_str += " 登錄用戶信息如下:
"
user_status = psutil.users()
for item in user_status :
print_str = print_str + " "+ str( item ) + "
"
print_str += "---------------------------------------------------------------
"
print ( print_str )
handle.write( print_str )
handle.close()
import psutil
mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 輸出獲取SWAP分區信息
cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches
psutil.cpu_times(percpu=True) # 輸出每個核心的詳細CPU信息
psutil.cpu_times().user # 獲取CPU的單項數據 [用戶態CPU的數據]
psutil.cpu_count() # 獲取CPU邏輯核心數,默認logical=True
psutil.cpu_count(logical=False) # 獲取CPU物理核心數
psutil.disk_partitions() # 列出全部的分區信息
psutil.disk_usage('/') # 顯示出指定的掛載點情況【位元組為單位】
psutil.disk_io_counters() # 磁碟總的IO個數
psutil.disk_io_counters(perdisk=True) # 獲取單個分區IO個數
psutil.net_io_counter() 獲取網路總的IO,默認參數pernic=False
psutil.net_io_counter(pernic=Ture)獲取網路各個網卡的IO
psutil.pids() # 列出所有進程的pid號
p = psutil.Process(2047)
p.name() 列出進程名稱
p.exe() 列出進程bin路徑
p.cwd() 列出進程工作目錄的絕對路徑
p.status()進程當前狀態[sleep等狀態]
p.create_time() 進程創建的時間 [時間戳格式]
p.uids()
p.gids()
p.cputimes() 【進程的CPU時間,包括用戶態、內核態】
p.cpu_affinity() # 顯示CPU親緣關系
p.memory_percent() 進程內存利用率
p.meminfo() 進程的RSS、VMS信息
p.io_counters() 進程IO信息,包括讀寫IO數及位元組數
p.connections() 返回打開進程socket的nametples列表
p.num_threads() 進程打開的線程數
import psutil
from subprocess import PIPE
p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()
psutil.users() # 顯示當前登錄的用戶,和Linux的who命令差不多
psutil.boot_time() 結果是個UNIX時間戳,下面我們來轉換它為標准時間格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的結果不是str格式,繼續進行轉換 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')
Python生成一個隨機密碼
import random, string
def GenPassword(length):
if name == ' main ':
print (GenPassword(6))
❾ 我執行一段python腳本報錯了,怎麼解決
這個要看具體的錯誤談孫,一般來說python腳悄雹本錯誤,如果是命令行的,一般是捕啟侍帆獲ctrl-c事件吧。
import signal
import sys
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()
❿ 如何在Python人為強制退出循環前,先觸發一個方法再退出
Python中實現當人為介入時停止循環的一個例子,當程序中出現 KeyboardInterrupt 異常(例如通過鍵盤的ctrl-C)時,程序會首先執行 stop_program() 方法再退出循環:
def stop_program():
# 這里可以添加程序停止後需要執行的操作
print("程序已停神頌止")
exit()
while True:
try:
# 這里寫需要循環執行的程序代碼
print("正在執行程序")
except KeyboardInterrupt:
stop_program()
在上述代碼中,我們首先定義了一個 stop_program() 方法,用於在程攔瞎嫌序停止前執行一些必要的操作。簡手在主程序的循環中,我們使用 try-except 語句來捕獲 KeyboardInterrupt 異常,如果用戶通過鍵盤輸入中斷信號(例如ctrl-C),則會觸發這個異常。在異常處理塊中,我們先執行 stop_program() 方法,然後退出循環,程序就會停止。
你可以將需要循環執行的代碼替換為實際需要執行的代碼,然後運行這個程序。當你想要停止程序時,可以通過鍵盤輸入ctrl-C或直接關閉程序窗口來觸發停止程序的操作。程序會先執行 stop_program() 方法再停止。當然,你也可以根據實際需求來修改 stop_program() 方法中的代碼,例如關閉文件、保存數據等。