❶ python怎么实现rabbitmq的confirm模式
一、通过Python模拟收发消息
1、在各个节点上安装租乱epel源
# yum install epel* -y11
2、安装python库
# yum --enablerepo=epel -y install python2-pika11
3、弊棚档在rabbitmq-server节点上
1)、创建用户
# rabbitmqctl add_user wuyeliang password 11
2)、创建虚拟主机
# rabbitmqctl add_vhost /my_vhost11
3)、赋予权限
# rabbitmqctl set_permissions -p /my_vhost wuyeliang ".*" ".*" ".*" 11
4、在rabbitmq节点上模拟发消息,和纳代码如下
# vi send_msg.py
#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('wuyeliang', 'password') #注意用户名及密码
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost',
5672,
'/my_vhost',
credentials))
channel = connection.channel()
channel.queue_declare(queue='Hello_World')
channel.basic_publish(exchange='',
routing_key='Hello_World',
body='Hello RabbitMQ World!')
print(" [x] Sent 'Hello_World'")
connection.close()
4、在client节点上模拟收消息,代码如下
# vi receive_msg.py
#!/usr/bin/env python
import signal
import pika
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
credentials = pika.PlainCredentials('wuyeliang', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'dlp.srv.world',
5672,
'/my_vhost',
credentials))
channel = connection.channel()
channel.queue_declare(queue='Hello_World')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='Hello_World',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
❷ ubuntu|linux下 如何用python 模拟按键
ubuntu下,也就是linux下,通常会用kill -事件编号实现。 你查一下LINUX下的事件就明白了。
kill 进程号 实现上是发了一个信号给指定的进程。 在python里,也可以加载事件处理模块,处理来自其它程序发过来的信号, 当然你可以用KILL工具发信号过来。
ctrl+d也是一个信号,ctrl+c也是一个。具体信号编码我不记得了。不过以前我做多进程管理时就是使用这个方法。 好象信号还可以带参数过来。
你打开python的帮助。看看signal这个模块。我把它的例子拿过来。对你有用不
importsignal,os
defhandler(signum,frame):
print'Signalhandlercalledwithsignal',signum
raiseIOError("Couldn'topendevice!")
#Setthesignalhandleranda5-secondalarm
signal.signal(signal.SIGALRM,handler)
signal.alarm(5)
#Thisopen()mayhangindefinitely
fd=os.open('/dev/ttyS0',os.O_RDWR)
signal.alarm(0)#Disablethealarm
下面是我找到的一些资料。也许有用。
信号的概念
信号(signal)--进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。
几个常用信号:
SIGINT终止进程中断进程(control+c)
SIGTERM终止进程软件终止信号
SIGKILL终止进程杀死进程
SIGALRM闹钟信号
进程结束信号SIGTERM和SIGKILL的区别
SIGTERM比较友好,进程能捕捉这个信号,根据您的需要来关闭程序。在关闭程序之前,您可以结束打开的记录文件和完成正在做的任务。在某些情况下,假如进程正在进行作业而且不能中断,那么进程可以忽略这个SIGTERM信号。
对于SIGKILL信号,进程是不能忽略的。这是一个“我不管您在做什么,立刻停止”的信号。假如您发送SIGKILL信号给进程,Linux就将进程停止在那里。
发送信号一般有两种原因:
1(被动式)内核检测到一个系统事件.例如子进程退出会像父进程发送SIGCHLD信号.键盘按下control+c会发送SIGINT信号
2(主动式)通过系统调用kill来向指定进程发送信号
linux操作系统提供的信号
[100003@oss235 myppt]$ kill -l
1) SIGHUP2) SIGINT3) SIGQUIT4) SIGILL
5) SIGTRAP6) SIGABRT7) SIGBUS8) SIGFPE
9) SIGKILL10) SIGUSR111) SIGSEGV12) SIGUSR2
13) SIGPIPE14) SIGALRM15) SIGTERM16) SIGSTKFLT
17) SIGCHLD18) SIGCONT19) SIGSTOP20) SIGTSTP
21) SIGTTIN22) SIGTTOU23) SIGURG24) SIGXCPU
25) SIGXFSZ26) SIGVTALRM27) SIGPROF28) SIGWINCH
29) SIGIO30) SIGPWR31) SIGSYS34) SIGRTMIN
35) SIGRTMIN+136) SIGRTMIN+237) SIGRTMIN+338) SIGRTMIN+4
39) SIGRTMIN+540) SIGRTMIN+641) SIGRTMIN+742) SIGRTMIN+8
43) SIGRTMIN+944) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12
47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14
51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10
55) SIGRTMAX-956) SIGRTMAX-857) SIGRTMAX-758) SIGRTMAX-6
59) SIGRTMAX-560) SIGRTMAX-461) SIGRTMAX-362) SIGRTMAX-2
63) SIGRTMAX-164) SIGRTMAX
Python提供的信号
Python 2.4.3 (#1, Jun 11 2009, 14:09:58)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-44)] on linux2
Type "help", "right", "credits" or "license" for more information.
>>> import signal
>>> dir(signal)
['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD',
'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT',
'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT',
'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM',
'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1',
'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL',
'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler',
'getsignal', 'pause', 'signal']
操作系统规定了进程收到信号以后的默认行为
但是,我们可以通过绑定信号处理函数来修改进程收到信号以后的行为
有两个信号是不可更改的SIGTOP和SIGKILL
绑定信号处理函数
import os
import signal
from time import sleep
def onsignal_term(a,b):
print '收到SIGTERM信号'
#这里是绑定信号处理函数,将SIGTERM绑定在函数onsignal_term上面
signal.signal(signal.SIGTERM,onsignal_term)
def onsignal_usr1(a,b):
print '收到SIGUSR1信号'
#这里是绑定信号处理函数,将SIGUSR1绑定在函数onsignal_term上面
signal.signal(signal.SIGUSR1,onsignal_usr1)
while 1:
print '我的进程id是',os.getpid()
sleep(10)
运行该程序。然后通过另外一个进程来发送信号。
发送信号
发送信号的代码如下:
import os
import signal
#发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改
os.kill(16175,signal.SIGTERM)
#发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改
os.kill(16175,signal.SIGUSR1)
❸ python如何使用ctrl+c来退出程序
根据我处理这个问题的教训,python的多线程面对这个情况是非常郁闷的,所以我最后选择了用multiprocessing模块(多进程)替换了多线程。如果可以的话,我强烈建议你改用multiprocessing。
❹ 如何让python脚本在关闭的时候运行一段代码
如果是命令行的,一般是捕获ctrl-c事件吧。
import signal
import sys
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()
如果是基于GUI框架开发的,都有事件触发的,你重载事件就好了。比如Qt就有closeEvent事件
❺ python 一个进程可以注册多个信号么
可以
信号的概念
信号(signal)-- 进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。
几个常用信号:
SIGINT 终止进程 中断进程 (control+c)
SIGTERM 终止进程 软件终止信号
SIGKILL 终止进程 杀死进程
SIGALRM 闹钟信号
进程结束信号 SIGTERM和SIGKILL的区别
SIGTERM比较友好,进程能捕捉这个信号,根据您的需要来关闭程序。在关闭程序之前,您可以结束打开的记录文件和完成正在做的任务。在某些情况下,假如进程正在进行作业而且不能中断,那么进程可以忽略这个SIGTERM信号。
对于SIGKILL信号,进程是不能忽略的。这是一个 “我不管您在做什么,立刻停止”的信号。假如您发送SIGKILL信号给进程,Linux就将进程停止在那里。
发送信号一般有两种原因:
1(被动式) 内核检测到一个系统事件.例如子进程退出会像父进程发送SIGCHLD信号.键盘按下control+c会发送SIGINT信号
2(主动式) 通过系统调用kill来向指定进程发送信号
linux操作系统提供的信号
[100003@oss235 myppt]$ kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL
5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE
9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2
13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGSTKFLT
17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU
25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH
29) SIGIO 30) SIGPWR 31) SIGSYS 34) SIGRTMIN
35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3 38) SIGRTMIN+4
39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8
43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12
47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14
51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10
55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7 58) SIGRTMAX-6
59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2
63) SIGRTMAX-1 64) SIGRTMAX
Python提供的信号
Python 2.4.3 (#1, Jun 11 2009, 14:09:58)
[GCC 4.1.2 20080704 (Red Hat 4.1.2-44)] on linux2
Type "help", "right", "credits" or "license" for more information.
>>> import signal
>>> dir(signal)
['NSIG', 'SIGABRT', 'SIGALRM', 'SIGBUS', 'SIGCHLD', 'SIGCLD', 'SIGCONT', 'SIGFPE', 'SIGHUP', 'SIGILL', 'SIGINT', 'SIGIO', 'SIGIOT', 'SIGKILL', 'SIGPIPE', 'SIGPOLL', 'SIGPROF', 'SIGPWR', 'SIGQUIT', 'SIGRTMAX', 'SIGRTMIN', 'SIGSEGV', 'SIGSTOP', 'SIGSYS', 'SIGTERM', 'SIGTRAP', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGURG', 'SIGUSR1', 'SIGUSR2', 'SIGVTALRM', 'SIGWINCH', 'SIGXCPU', 'SIGXFSZ', 'SIG_DFL', 'SIG_IGN', '__doc__', '__name__', 'alarm', 'default_int_handler', 'getsignal', 'pause', 'signal']
操作系统规定了进程收到信号以后的默认行为
但是,我们可以通过绑定信号处理函数来修改进程收到信号以后的行为
有两个信号是不可更改的SIGTOP和SIGKILL
绑定信号处理函数
import os
import signal
from time import sleep
def onsignal_term(a,b):
print '收到SIGTERM信号'
#这里是绑定信号处理函数,将SIGTERM绑定在函数onsignal_term上面
signal.signal(signal.SIGTERM,onsignal_term)
def onsignal_usr1(a,b):
print '收到SIGUSR1信号'
#这里是绑定信号处理函数,将SIGUSR1绑定在函数onsignal_term上面
signal.signal(signal.SIGUSR1,onsignal_usr1)
while 1:
print '我的进程id是',os.getpid()
sleep(10)
运行该程序。然后通过另外一个进程来发送信号。
发送信号
发送信号的代码如下:
import os
import signal
#发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改
os.kill(16175,signal.SIGTERM)
#发送信号,16175是前面那个绑定信号处理函数的pid,需要自行修改
os.kill(16175,signal.SIGUSR1)
SIGCHLD信号
然后显示一个子进程结束后自动向父进程发送SIGCHLD信号的例子。
'''''''
子进程结束会向父进程发送SIGCHLD信号
'''
import os
import signal
from time import sleep
def onsigchld(a,b):
print '收到子进程结束信号'
signal.signal(signal.SIGCHLD,onsigchld)
pid = os.fork()
if pid == 0:
print '我是子进程,pid是',os.getpid()
sleep(2)
else:
print '我是父进程,pid是',os.getpid()
os.wait() #等待子进程结束
使用信号需要特别注意的地方:
如果一个进程收到一个SIGUSR1信号,然后执行信号绑定函数,第二个SIGUSR2信号又来了,第一个信号没有被处理完毕的话,第二个信号就会丢弃。
所以,尽量不要在多线程中使用信号。
这个不妥,测试没发现有信号丢失
例子演示:
接收信号的程序,你会发现如果有另外一端使用多线程向这个进程发送信号,会遗漏一些信号。
import os
import signal
from time import sleep
import Queue
QCOUNT = Queue.Queue() #初始化队列
def onsigchld(a,b):
'''''''收到信号后向队列中插入一个数字1'''
print '收到SIGUSR1信号'
sleep(2)
QCOUNT.put(1) #向队列中写入
def exithanddle(s,e):
raise SystemExit('收到终止命令,退出程序')
signal.signal(signal.SIGUSR1,onsigchld) #绑定信号处理函数
signal.signal(signal.SIGINT,exithanddle) #当按下Ctrl + C 终止进程
while 1:
print '我的pid是',os.getpid()
print '现在队列中元素的个数是',QCOUNT.qsize()
sleep(2)
多线程发信号端的程序:
'''''''
使用多线程向另外一个进程发送信号
'''
import threading
import os
import signal
def sensr1():
print '发送信号'
#这里的进程id需要写前一个程序实际运行的pid
os.kill(17788, signal.SIGUSR1)
WORKER = []
#开启6个线程
for i in range(1, 7):
threadinstance = threading.Thread(target = sensr1)
WORKER.append(threadinstance)
for i in WORKER:
i.start()
for i in WORKER:
i.join()
print '主线程完成'
内容补充:
Alarms 是一个特殊信号类型,它可以让程序要求系统经过一段时间对自己发送通知。os 标准模块中指出,它可用于避免无限制阻塞 I/O 操作或其它系统调用。
像下面例子,原本程序睡眠 10 后才打印出print 'After :', time.ctime(),但是由于 signal.alarm(2),所以 2 秒后就执行了打印。
import signal
import time
def receive_alarm(signum, stack):
print 'Alarm :', time.ctime()
# Call receive_alarm in 2 seconds
signal.signal(signal.SIGALRM, receive_alarm)
signal.alarm(2)
print 'Before:', time.ctime()
time.sleep(10)
print 'After :', time.ctime()
注意Signal只有主线程才能接收信号,像下面例子,print 'Done waiting' 语句打印不出来,如果不调用 signal.alarm(2) ,程序将永远阻塞
import signal
import threading
import os
import time
def signal_handler(num, stack):
print 'Received signal %d in %s' % \
(num, threading.currentThread().name)
signal.signal(signal.SIGUSR1, signal_handler)
def wait_for_signal():
print 'Waiting for signal in', threading.currentThread().name
signal.pause()
print 'Done waiting'
# Start a thread that will not receive the signal
receiver = threading.Thread(target=wait_for_signal, name='receiver')
receiver.start()
time.sleep(0.1)
def send_signal():
print 'Sending signal in', threading.currentThread().name
os.kill(os.getpid(), signal.SIGUSR1)
sender = threading.Thread(target=send_signal, name='sender')
sender.start()
sender.join()
# Wait for the thread to see the signal (not going to happen!)
print 'Waiting for', receiver.name
signal.alarm(2)
receiver.join()
还有一点需要注意的是,虽然 alarms 类信号可以在任何线程中调用,但是只能在主线程中接收,像下面例子即使子线程 use_alarm 中调用 signal.alarm(1) ,但是不起作用 :
import signal
import time
import threading
def signal_handler(num, stack):
print time.ctime(), 'Alarm in', threading.currentThread().name
signal.signal(signal.SIGALRM, signal_handler)
def use_alarm():
t_name = threading.currentThread().name
print time.ctime(), 'Setting alarm in', t_name
signal.alarm(1)
print time.ctime(), 'Sleeping in', t_name
time.sleep(3)
print time.ctime(), 'Done with sleep in', t_name
# Start a thread that will not receive the signal
alarm_thread = threading.Thread(target=use_alarm,
name='alarm_thread')
alarm_thread.start()
time.sleep(0.1)
# Wait for the thread to see the signal (not going to happen!)
print time.ctime(), 'Waiting for', alarm_thread.name
alarm_thread.join()
print time.ctime(), 'Exiting normally'
❻ 有没有python的串口库
串口模块的波特率比较特别,找了几个串口工具都不支持。。。所以,干脆用python自己来写了,其实已经好奇好久了,别人的工具各种不顺手。
需要pyserial的支持,兼容各种平台,不需要新编译二进制文件。
先贴一个定时发送的代码:
6. exception:serial.SerialException
另一个完整收发的例子,单片机数据以TLV(Type,Length,Value)格式发上来
#!/usr/bin/env python
# it's a program of luo, [email protected]
import serialimport arrayimport osimport signalfrom time import sleep
flag_stop = Falsedef onsignal_int(a,b): print "sigint!"
global flag_stop
flag_stop = True
signal.signal(signal.SIGINT, onsignal_int)
signal.signal(signal.SIGTERM, onsignal_int)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout = 0.001)print "serial.isOpen() =",ser.isOpen()
cmd_send = ""cmd_send = cmd_send.decode("hex")
stop = "7b04047d0d0a"stop = stop.decode("hex")
cmd_back = ""cmd_length = 0x00cmd_count = 0x00s = ser.write(cmd_send)while True:
sleep(0.1)
if flag_stop: # read data until Ctrl+c
ser.write(stop) # send cmd stop before exit
print "reset cmd has been sent!"
sleep(0.05) break
text = ser.read(1) # read one, with timout
if text: # check if not timeout
n = ser.inWaiting() # look if there is more to read
if n:
text = text + ser.read(n) #get it
cmd_back = cmd_back + text
text = ""
if len(cmd_back) < 2: # go back if no enough data recvd
continue
if cmd_length == 0x00: # new loop
cmd_length = ord(cmd_back[1]) # Type(1 byte),Length of Value(1 byte),Value
print "new cmd length,",cmd_length
if (cmd_length + 0x02) > len(cmd_back): # do nothing until all bytes is recvd
continue
# so far, we have got a full cmd
hex_list = [hex(ord(i)) for i in cmd_back] # more readable than data.encode("hex")
print "In buffer:",hex_list
cmd_back = cmd_back[cmd_length+2:] # remove this cmd(TLV) from buffer
cmd_length = 0
cmd_count += 1
print "==> %d cmds recvd."%(cmd_count) print "-------------"
ser.close()
——————
❼ python 怎么让程序接受ctrl + c终止信号
花了一天时间用python为服务写了个压力测试。很简单,多线程向服务器发请求。但写完之后发现如果中途想停下来,按Ctrl+C达不到效果,自然想到要用信号处理函数捕捉信号,使线程都停下来,问题解决的方法请往下看:
复制代码代码如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.start()
上面是一个模拟程序,并不真正向服务发送请求,而代之以在一千万以内,每个线程每隔并发数个(cc个)打印一个整数。很明显,当所有线程都完成自己的任务后,进程会正常退出。但如果我们中途想退出(试想一个压力测试程序,在中途已经发现了问题,需要停止测试),该肿么办?你当然可以用ps查找到进程号,然后kill -9杀掉,但这样太繁琐了,捕捉Ctrl+C是最自然的想法。上面示例程序中已经捕捉了这个信号,并修改全局变量is_exit,线程中会检测这个变量,及时退出。
但事实上这个程序并不work,当你按下Ctrl+C时,程序照常运行,并无任何响应。网上搜了一些资料,明白是python的子线程如果不是daemon的话,主线程是不能响应任何中断的。但设为daemon后主线程会随之退出,接着整个进程很快就退出了,所以还需要在主线程中检测各个子线程的状态,直到所有子线程退出后自己才退出,因此上例29行之后的代码可以修改为:
复制代码代码如下:
threads=[]
for i in range(cc):
t = threading.Thread(target=doStress, args=(i, cc))
t.setDaemon(True)
threads.append(t)
t.start()
for i in range(cc):
threads[i].join()
重新试一下,问题依然没有解决,进程还是没有响应Ctrl+C,这是因为join()函数同样会waiting在一个锁上,使主线程无法捕获信号。因此继续修改,调用线程的isAlive()函数判断线程是否完成:
复制代码代码如下:
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
这样修改后,程序完全按照预想运行了:可以顺利的打印每个线程应该打印的所有数字,也可以中途用Ctrl+C终结整个进程。完整的代码如下:
复制代码代码如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
if is_exit:
print "receive a signal to exit, thread[%d] stop."%i
else:
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
threads = []
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.setDaemon(True)
threads.append(t)
t.start()
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
其实,如果用python写一个服务,也需要这样,因为负责服务的那个线程是永远在那里接收请求的,不会退出,而如果你想用Ctrl+C杀死整个服务,跟上面的压力测试程序是一个道理。总结一下,python多线程中要响应Ctrl+C的信号以杀死整个进程,需要:
1.把所有子线程设为Daemon;
2.使用isAlive()函数判断所有子线程是否完成,而不是在主线程中用join()函数等待完成;
3.写一个响应Ctrl+C信号的函数,修改全局变量,使得各子线程能够检测到,并正常退出。
❽ python 运维常用脚本
Python 批量遍历目录文件,并修改访问时间
import os
path = "D:/UASM64/include/"
dirs = os.listdir(path)
temp=[];
for file in dirs:
temp.append(os.path.join(path, file))
for x in temp:
os.utime(x, (1577808000, 1577808000))
Python 实现的自动化服务器管理
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
def ssh_put(user,passwd,source,target):
while True:
try:
shell=str(input("[Shell] # "))
if (shell == ""):
continue
elif (shell == "exit"):
exit()
elif (shell == "put"):
ssh_put("root","123123","./a.py","/root/a.py")
elif (shell =="cron"):
temp=input("输入一个计划任务: ")
temp1="(crontab -l; echo "+ temp + ") |crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
elif (shell == "uncron"):
temp=input("输入要删除的计划任务: ")
temp1="crontab -l | grep -v " "+ temp + "|crontab"
ssh_cmd("root","123123","22","./user_ip.conf",temp1)
else:
ssh_cmd("lyshark","123123","22","./user_ip.conf",shell)
遍历目录和文件
import os
def list_all_files(rootdir):
import os
_files = []
list = os.listdir(rootdir) #列出文件夹下所有的目录与文件
for i in range(0,len(list)):
path = os.path.join(rootdir,list[i])
if os.path.isdir(path):
_files.extend(list_all_files(path))
if os.path.isfile(path):
_files.append(path)
return _files
a=list_all_files("C:/Users/LyShark/Desktop/a")
print(a)
python检测指定端口状态
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(1)
for ip in range(0,254):
try:
sk.connect(("192.168.1."+str(ip),443))
print("192.168.1.%d server open
"%ip)
except Exception:
print("192.168.1.%d server not open"%ip)
sk.close()
python实现批量执行CMD命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print("------------------------------>
")
print("使用说明,在当前目录创建ip.txt写入ip地址")
print("------------------------------>
")
user=input("输入用户名:")
passwd=input("输入密码:")
port=input("输入端口:")
cmd=input("输入执行的命令:")
file = open("./ip.txt", "r")
line = file.readlines()
for i in range(len(line)):
print("对IP: %s 执行"%line[i].strip('
'))
python3-实现钉钉报警
import requests
import sys
import json
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": "监控","text": "apche异常"}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
import psutil
import requests
import time
import os
import json
monitor_name = set(['httpd','cobblerd']) # 用户指定监控的服务进程名称
proc_dict = {}
proc_name = set() # 系统检测的进程名称
monitor_map = {
'httpd': 'systemctl restart httpd',
'cobblerd': 'systemctl restart cobblerd' # 系统在进程down掉后,自动重启
}
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
while True:
for proc in psutil.process_iter(attrs=['pid','name']):
proc_dict[proc.info['pid']] = proc.info['name']
proc_name.add(proc.info['name'])
判断指定端口是否开放
import socket
port_number = [135,443,80]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
print("Port %d is not open" % index)
sock.close()
判断指定端口并且实现钉钉轮询报警
import requests
import sys
import json
import socket
import time
def dingding(title,text):
dingding_url = ' https://oapi.dingtalk.com/robot/send?access_token='
data = {"msgtype": "markdown","markdown": {"title": title,"text": text}}
headers = {'Content-Type':'application/json;charset=UTF-8'}
send_data = json.mps(data).encode('utf-8')
requests.post(url=dingding_url,data=send_data,headers=headers)
def net_scan():
port_number = [80,135,443]
for index in port_number:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((飗.0.0.1', index))
if result == 0:
print("Port %d is open" % index)
else:
return index
sock.close()
while True:
dingding("Warning",net_scan())
time.sleep(60)
python-实现SSH批量CMD执行命令
import sys
import os
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,userfile,cmd):
file = open(userfile, "r")
line = file.readlines()
for i in range(len(line)):
print("对IP: %s 执行"%line[i].strip('
'))
ssh.connect(hostname=line[i].strip('
'),port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
ssh_cmd("lyshark","123","22","./ip.txt","free -h |grep 'Mem:' |awk '{print $3}'")
用python写一个列举当前目录以及所有子目录下的文件,并打印出绝对路径
import sys
import os
for root,dirs,files in os.walk("C://"):
for name in files:
print(os.path.join(root,name))
os.walk()
按照这样的日期格式(xxxx-xx-xx)每日生成一个文件,例如今天生成的文件为2013-09-23.log, 并且把磁盘的使用情况写到到这个文件中。
import os
import sys
import time
new_time = time.strftime("%Y-%m-%d")
disk_status = os.popen("df -h").readlines()
str1 = ''.join(disk_status)
f = open(new_time+'.log','w')
f.write("%s"%str1)
f.flush()
f.close()
统计出每个IP的访问量有多少?(从日志文件中查找)
import sys
list = []
f = open("/var/log/httpd/access_log","r")
str1 = f.readlines()
f.close()
for i in str1:
ip=i.split()[0]
list.append(ip)
list_num=set(list)
for j in list_num:
num=list.count(j)
print("%s -----> %s" %(num,j))
写个程序,接受用户输入数字,并进行校验,非数字给出错误提示,然后重新等待用户输入。
import tab
import sys
while True:
try:
num=int(input("输入数字:").strip())
for x in range(2,num+1):
for y in range(2,x):
if x % y == 0:
break
else:
print(x)
except ValueError:
print("您输入的不是数字")
except KeyboardInterrupt:
sys.exit("
")
ps 可以查看进程的内存占用大小,写一个脚本计算一下所有进程所占用内存大小的和。
import sys
import os
list=[]
sum=0
str1=os.popen("ps aux","r").readlines()
for i in str1:
str2=i.split()
new_rss=str2[5]
list.append(new_rss)
for i in list[1:-1]:
num=int(i)
sum=sum+num
print("%s ---> %s"%(list[0],sum))
关于Python 命令行参数argv
import sys
if len(sys.argv) < 2:
print ("没有输入任何参数")
sys.exit()
if sys.argv[1].startswith("-"):
option = sys.argv[1][1:]
利用random生成6位数字加字母随机验证码
import sys
import random
rand=[]
for x in range(6):
y=random.randrange(0,5)
if y == 2 or y == 4:
num=random.randrange(0,9)
rand.append(str(num))
else:
temp=random.randrange(65,91)
c=chr(temp)
rand.append(c)
result="".join(rand)
print(result)
自动化-使用pexpect非交互登陆系统
import pexpect
import sys
ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.txt', 'w')
ssh.logfile = fout
ssh.expect("[email protected]'s password:")
ssh.sendline("密码")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
Python-取系统时间
import sys
import time
time_str = time.strftime("日期:%Y-%m-%d",time.localtime())
print(time_str)
time_str= time.strftime("时间:%H:%M",time.localtime())
print(time_str)
psutil-获取内存使用情况
import sys
import os
import psutil
memory_convent = 1024 * 1024
mem =psutil.virtual_memory()
print("内存容量为:"+str(mem.total/(memory_convent))+"MB
")
print("已使用内存:"+str(mem.used/(memory_convent))+"MB
")
print("可用内存:"+str(mem.total/(memory_convent)-mem.used/(1024*1024))+"MB
")
print("buffer容量:"+str(mem.buffers/( memory_convent ))+"MB
")
print("cache容量:"+str(mem.cached/(memory_convent))+"MB
")
Python-通过SNMP协议监控CPU
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid + '|grep Raw|grep Cpu|grep -v Kernel').read().split('
')[:-1]
return sn1
def getDate(host):
items = getAllitems(host, '.1.3.6.1.4.1.2021.11')
if name == ' main ':
Python-通过SNMP协议监控系统负载
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
import sys
def getAllitems(host, oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('
')
return sn1
def getload(host,loid):
load_oids = Ƈ.3.6.1.4.1.2021.10.1.3.' + str(loid)
return getAllitems(host,load_oids)[0].split(':')[3]
if name == ' main ':
Python-通过SNMP协议监控内存
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import os
def getAllitems(host, oid):
def getSwapTotal(host):
def getSwapUsed(host):
def getMemTotal(host):
def getMemUsed(host):
if name == ' main ':
Python-通过SNMP协议监控磁盘
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
def getDate(source,newitem):
def getRealDate(item1,item2,listname):
def caculateDiskUsedRate(host):
if name == ' main ':
Python-通过SNMP协议监控网卡流量
注意:被监控的机器上需要支持snmp协议 yum install -y net-snmp*
import re
import os
def getAllitems(host,oid):
sn1 = os.popen('snmpwalk -v 2c -c public ' + host + ' ' + oid).read().split('
')[:-1]
return sn1
def getDevices(host):
device_mib = getAllitems(host,'RFC1213-MIB::ifDescr')
device_list = []
def getDate(host,oid):
date_mib = getAllitems(host,oid)[1:]
date = []
if name == ' main ':
Python-实现多级菜单
import os
import sys
ps="[None]->"
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
flage=1
while True:
ps="[None]->"
temp=input(ps)
if (temp=="test"):
print("test page !!!!")
elif(temp=="user"):
while (flage == 1):
ps="[User]->"
temp1=input(ps)
if(temp1 =="exit"):
flage=0
break
elif(temp1=="show"):
for i in range(len(ip)):
print(i)
Python实现一个没用的东西
import sys
ps="[root@localhost]# "
ip=["192.168.1.1","192.168.1.2","192.168.1.3"]
while True:
temp=input(ps)
temp1=temp.split()
检查各个进程读写的磁盘IO
import sys
import os
import time
import signal
import re
class DiskIO:
def init (self, pname=None, pid=None, reads=0, writes=0):
self.pname = pname
self.pid = pid
self.reads = 0
self.writes = 0
def main():
argc = len(sys.argv)
if argc != 1:
print ("usage: please run this script like [./lyshark.py]")
sys.exit(0)
if os.getuid() != 0:
print ("Error: This script must be run as root")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
os.system('echo 1 > /proc/sys/vm/block_mp')
print ("TASK PID READ WRITE")
while True:
os.system('dmesg -c > /tmp/diskio.log')
l = []
f = open('/tmp/diskio.log', 'r')
line = f.readline()
while line:
m = re.match(
'^(S+)(d+)(d+): (READ|WRITE) block (d+) on (S+)', line)
if m != None:
if not l:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
continue
found = False
for item in l:
if item.pid == m.group(2):
found = True
if m.group(3) == "READ":
item.reads = item.reads + 1
elif m.group(3) == "WRITE":
item.writes = item.writes + 1
if not found:
l.append(DiskIO(m.group(1), m.group(2)))
line = f.readline()
time.sleep(1)
for item in l:
print ("%-10s %10s %10d %10d" %
(item.pname, item.pid, item.reads, item.writes))
def signal_handler(signal, frame):
os.system('echo 0 > /proc/sys/vm/block_mp')
sys.exit(0)
if name ==" main ":
main()
利用Pexpect实现自动非交互登陆linux
import pexpect
import sys
ssh = pexpect.spawn('ssh [email protected]')
fout = file('sshlog.log', 'w')
ssh.logfile = fout
ssh.expect("[email protected]'s password:")
ssh.sendline("密码")
ssh.expect('#')
ssh.sendline('ls /home')
ssh.expect('#')
利用psutil模块获取系统的各种统计信息
import sys
import psutil
import time
import os
time_str = time.strftime( "%Y-%m-%d", time.localtime( ) )
file_name = "./" + time_str + ".log"
if os.path.exists ( file_name ) == False :
os.mknod( file_name )
handle = open ( file_name , "w" )
else :
handle = open ( file_name , "a" )
if len( sys.argv ) == 1 :
print_type = 1
else :
print_type = 2
def isset ( list_arr , name ) :
if name in list_arr :
return True
else :
return False
print_str = "";
if ( print_type == 1 ) or isset( sys.argv,"mem" ) :
memory_convent = 1024 * 1024
mem = psutil.virtual_memory()
print_str += " 内存状态如下:
"
print_str = print_str + " 系统的内存容量为: "+str( mem.total/( memory_convent ) ) + " MB
"
print_str = print_str + " 系统的内存以使用容量为: "+str( mem.used/( memory_convent ) ) + " MB
"
print_str = print_str + " 系统可用的内存容量为: "+str( mem.total/( memory_convent ) - mem.used/( 1024*1024 )) + "MB
"
print_str = print_str + " 内存的buffer容量为: "+str( mem.buffers/( memory_convent ) ) + " MB
"
print_str = print_str + " 内存的cache容量为:" +str( mem.cached/( memory_convent ) ) + " MB
"
if ( print_type == 1 ) or isset( sys.argv,"cpu" ) :
print_str += " CPU状态如下:
"
cpu_status = psutil.cpu_times()
print_str = print_str + " user = " + str( cpu_status.user ) + "
"
print_str = print_str + " nice = " + str( cpu_status.nice ) + "
"
print_str = print_str + " system = " + str( cpu_status.system ) + "
"
print_str = print_str + " idle = " + str ( cpu_status.idle ) + "
"
print_str = print_str + " iowait = " + str ( cpu_status.iowait ) + "
"
print_str = print_str + " irq = " + str( cpu_status.irq ) + "
"
print_str = print_str + " softirq = " + str ( cpu_status.softirq ) + "
"
print_str = print_str + " steal = " + str ( cpu_status.steal ) + "
"
print_str = print_str + " guest = " + str ( cpu_status.guest ) + "
"
if ( print_type == 1 ) or isset ( sys.argv,"disk" ) :
print_str += " 硬盘信息如下:
"
disk_status = psutil.disk_partitions()
for item in disk_status :
print_str = print_str + " "+ str( item ) + "
"
if ( print_type == 1 ) or isset ( sys.argv,"user" ) :
print_str += " 登录用户信息如下:
"
user_status = psutil.users()
for item in user_status :
print_str = print_str + " "+ str( item ) + "
"
print_str += "---------------------------------------------------------------
"
print ( print_str )
handle.write( print_str )
handle.close()
import psutil
mem = psutil.virtual_memory()
print mem.total,mem.used,mem
print psutil.swap_memory() # 输出获取SWAP分区信息
cpu = psutil.cpu_stats()
printcpu.interrupts,cpu.ctx_switches
psutil.cpu_times(percpu=True) # 输出每个核心的详细CPU信息
psutil.cpu_times().user # 获取CPU的单项数据 [用户态CPU的数据]
psutil.cpu_count() # 获取CPU逻辑核心数,默认logical=True
psutil.cpu_count(logical=False) # 获取CPU物理核心数
psutil.disk_partitions() # 列出全部的分区信息
psutil.disk_usage('/') # 显示出指定的挂载点情况【字节为单位】
psutil.disk_io_counters() # 磁盘总的IO个数
psutil.disk_io_counters(perdisk=True) # 获取单个分区IO个数
psutil.net_io_counter() 获取网络总的IO,默认参数pernic=False
psutil.net_io_counter(pernic=Ture)获取网络各个网卡的IO
psutil.pids() # 列出所有进程的pid号
p = psutil.Process(2047)
p.name() 列出进程名称
p.exe() 列出进程bin路径
p.cwd() 列出进程工作目录的绝对路径
p.status()进程当前状态[sleep等状态]
p.create_time() 进程创建的时间 [时间戳格式]
p.uids()
p.gids()
p.cputimes() 【进程的CPU时间,包括用户态、内核态】
p.cpu_affinity() # 显示CPU亲缘关系
p.memory_percent() 进程内存利用率
p.meminfo() 进程的RSS、VMS信息
p.io_counters() 进程IO信息,包括读写IO数及字节数
p.connections() 返回打开进程socket的nametples列表
p.num_threads() 进程打开的线程数
import psutil
from subprocess import PIPE
p =psutil.Popen(["/usr/bin/python" ,"-c","print 'helloworld'"],stdout=PIPE)
p.name()
p.username()
p.communicate()
p.cpu_times()
psutil.users() # 显示当前登录的用户,和Linux的who命令差不多
psutil.boot_time() 结果是个UNIX时间戳,下面我们来转换它为标准时间格式,如下:
datetime.datetime.fromtimestamp(psutil.boot_time()) # 得出的结果不是str格式,继续进行转换 datetime.datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d%H:%M:%S')
Python生成一个随机密码
import random, string
def GenPassword(length):
if name == ' main ':
print (GenPassword(6))
❾ 我执行一段python脚本报错了,怎么解决
这个要看具体的错误谈孙,一般来说python脚悄雹本错误,如果是命令行的,一般是捕启侍帆获ctrl-c事件吧。
import signal
import sys
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()
❿ 如何在Python人为强制退出循环前,先触发一个方法再退出
Python中实现当人为介入时停止循环的一个例子,当程序中出现 KeyboardInterrupt 异常(例如通过键盘的ctrl-C)时,程序会首先执行 stop_program() 方法再退出循环:
def stop_program():
# 这里可以添加程序停止后需要执行的操作
print("程序已停神颂止")
exit()
while True:
try:
# 这里写需要循环执行的程序代码
print("正在执行程序")
except KeyboardInterrupt:
stop_program()
在上述代码中,我们首先定义了一个 stop_program() 方法,用于在程拦瞎嫌序停止前执行一些必要的操作。简手在主程序的循环中,我们使用 try-except 语句来捕获 KeyboardInterrupt 异常,如果用户通过键盘输入中断信号(例如ctrl-C),则会触发这个异常。在异常处理块中,我们先执行 stop_program() 方法,然后退出循环,程序就会停止。
你可以将需要循环执行的代码替换为实际需要执行的代码,然后运行这个程序。当你想要停止程序时,可以通过键盘输入ctrl-C或直接关闭程序窗口来触发停止程序的操作。程序会先执行 stop_program() 方法再停止。当然,你也可以根据实际需求来修改 stop_program() 方法中的代码,例如关闭文件、保存数据等。