⑴ 一篇文章带你深度解析python线程和进程
使用Python中的线程模块,能够同时运行程序的不同部分,并简化设计。如果你已经入门Python,并且想用线程来提升程序运行速度的话,希望这篇教程会对你有所帮助。
线程与进程
什么是进程
进程是系统进行资源分配和调度的一个独立单位 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。由于进程比较重量,占据独立的内存,所以上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大,但相对比较稳定安全。
什么是线程
CPU调度和分派的基本单位 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。
进程与线程的关系图
线程与进程的区别:
进程
现实生活中,有很多的场景中的事情是同时进行的,比如开车的时候 手和脚共同来驾驶 汽车 ,比如唱歌跳舞也是同时进行的,再比如边吃饭边打电话;试想如果我们吃饭的时候有一个领导来电,我们肯定是立刻就接听了。但是如果你吃完饭再接听或者回电话,很可能会被开除。
注意:
多任务的概念
什么叫 多任务 呢?简单地说,就是操作系统可以同时运行多个任务。打个比方,你一边在用浏览器上网,一边在听MP3,一边在用Word赶作业,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。
现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?
答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒,这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。
真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。 其实就是CPU执行速度太快啦!以至于我们感受不到在轮流调度。
并行与并发
并行(Parallelism)
并行:指两个或两个以上事件(或线程)在同一时刻发生,是真正意义上的不同事件或线程在同一时刻,在不同CPU资源呢上(多核),同时执行。
特点
并发(Concurrency)
指一个物理CPU(也可以多个物理CPU) 在若干道程序(或线程)之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。
特点
multiprocess.Process模块
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
语法:Process([group [, target [, name [, args [, kwargs]]]]])
由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)。
注意:1. 必须使用关键字方式来指定参数;2. args指定的为传给target函数的位置参数,是一个元祖形式,必须有逗号。
参数介绍:
group:参数未使用,默认值为None。
target:表示调用对象,即子进程要执行的任务。
args:表示调用的位置参数元祖。
kwargs:表示调用对象的字典。如kwargs = {'name':Jack, 'age':18}。
name:子进程名称。
代码:
除了上面这些开启进程的方法之外,还有一种以继承Process的方式开启进程的方式:
通过上面的研究,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题,我们可以考虑加锁,我们以模拟抢票为例,来看看数据安全的重要性。
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改。加锁牺牲了速度,但是却保证了数据的安全。
因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。
mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性( 后续扩展该内容 )。
线程
Python的threading模块
Python 供了几个用于多线程编程的模块,包括 thread, threading 和 Queue 等。thread 和 threading 模块允许程序员创建和管理线程。thread 模块 供了基本的线程和锁的支持,而 threading 供了更高级别,功能更强的线程管理的功能。Queue 模块允许用户创建一个可以用于多个线程之间 共享数据的队列数据结构。
python创建和执行线程
创建线程代码
1. 创建方法一:
2. 创建方法二:
进程和线程都是实现多任务的一种方式,例如:在同一台计算机上能同时运行多个QQ(进程),一个QQ可以打开多个聊天窗口(线程)。资源共享:进程不能共享资源,而线程共享所在进程的地址空间和其他资源,同时,线程有自己的栈和栈指针。所以在一个进程内的所有线程共享全局变量,但多线程对全局变量的更改会导致变量值得混乱。
代码演示:
得到的结果是:
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行(其中的JPython就没有GIL)。
那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了避免误导,我们还是来看一下官方给出的解释:
主要意思为:
因此,解释器实际上被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。在多线程环境中,Python 虚拟机按以下方式执行:
由于GIL的存在,Python的多线程不能称之为严格的多线程。因为 多线程下每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程在运行。
由于GIL的存在,即使是多线程,事实上同一时刻只能保证一个线程在运行, 既然这样多线程的运行效率不就和单线程一样了吗,那为什么还要使用多线程呢?
由于以前的电脑基本都是单核CPU,多线程和单线程几乎看不出差别,可是由于计算机的迅速发展,现在的电脑几乎都是多核CPU了,最少也是两个核心数的,这时差别就出来了:通过之前的案例我们已经知道,即使在多核CPU中,多线程同一时刻也只有一个线程在运行,这样不仅不能利用多核CPU的优势,反而由于每个线程在多个CPU上是交替执行的,导致在不同CPU上切换时造成资源的浪费,反而会更慢。即原因是一个进程只存在一把gil锁,当在执行多个线程时,内部会争抢gil锁,这会造成当某一个线程没有抢到锁的时候会让cpu等待,进而不能合理利用多核cpu资源。
但是在使用多线程抓取网页内容时,遇到IO阻塞时,正在执行的线程会暂时释放GIL锁,这时其它线程会利用这个空隙时间,执行自己的代码,因此多线程抓取比单线程抓取性能要好,所以我们还是要使用多线程的。
GIL对多线程Python程序的影响
程序的性能受到计算密集型(CPU)的程序限制和I/O密集型的程序限制影响,那什么是计算密集型和I/O密集型程序呢?
计算密集型:要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。
IO密集型:涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。
当然为了避免GIL对我们程序产生影响,我们也可以使用,线程锁。
Lock&RLock
常用的资源共享锁机制:有Lock、RLock、Semphore、Condition等,简单给大家分享下Lock和RLock。
Lock
特点就是执行速度慢,但是保证了数据的安全性
RLock
使用锁代码操作不当就会产生死锁的情况。
什么是死锁
死锁:当线程A持有独占锁a,并尝试去获取独占锁b的同时,线程B持有独占锁b,并尝试获取独占锁a的情况下,就会发生AB两个线程由于互相持有对方需要的锁,而发生的阻塞现象,我们称为死锁。即死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。
所以,在系统设计、进程调度等方面注意如何不让这四个必要条件成立,如何确定资源的合理分配算法,避免进程永久占据系统资源。
死锁代码
python线程间通信
如果各个线程之间各干各的,确实不需要通信,这样的代码也十分的简单。但这一般是不可能的,至少线程要和主线程进行通信,不然计算结果等内容无法取回。而实际情况中要复杂的多,多个线程间需要交换数据,才能得到正确的执行结果。
python中Queue是消息队列,提供线程间通信机制,python3中重名为为queue,queue模块块下提供了几个阻塞队列,这些队列主要用于实现线程通信。
在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。
简单代码演示
此时代码会阻塞,因为queue中内容已满,此时可以在第四个queue.put('苹果')后面添加timeout,则成为 queue.put('苹果',timeout=1)如果等待1秒钟仍然是满的就会抛出异常,可以捕获异常。
同理如果队列是空的,无法获取到内容默认也会阻塞,如果不阻塞可以使用queue.get_nowait()。
在掌握了 Queue 阻塞队列的特性之后,在下面程序中就可以利用 Queue 来实现线程通信了。
下面演示一个生产者和一个消费者,当然都可以多个
使用queue模块,可在线程间进行通信,并保证了线程安全。
协程
协程,又称微线程,纤程。英文名Coroutine。
协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。为啥说它是一个执行单元,因为它自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。
通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。
在实现多任务时,线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。
greenlet与gevent
为了更好使用协程来完成多任务,除了使用原生的yield完成模拟协程的工作,其实python还有的greenlet模块和gevent模块,使实现协程变的更加简单高效。
greenlet虽说实现了协程,但需要我们手工切换,太麻烦了,gevent是比greenlet更强大的并且能够自动切换任务的模块。
其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
模拟耗时操作:
如果有耗时操作也可以换成,gevent中自己实现的模块,这时候就需要打补丁了。
使用协程完成一个简单的二手房信息的爬虫代码吧!
以下文章来源于Python专栏 ,作者宋宋
文章链接:https://mp.weixin.qq.com/s/2r3_ipU3HjdA5VnqSHjUnQ
⑵ 主存空间的分配和回收,
#include "iostream.h"
#include "iomanip.h"
#define nofreearea 2
#define noadequacyarea 3
#define allocated 4
#define noprocess 2
#define nosuchprocess 3
#define reclaimed 4
typedef struct TUN
{
int address;
int size;
char name;
struct TUN *next;
} usedarea , *usedtable;
typedef struct TFN
{
int address;
int size;
struct TFN *next;
} freearea, *freetable;
usedtable usedTable = NULL;
freetable freeTable = NULL;
int alloc( char processname , int processsize )
{
if( freeTable == NULL )
return 1;
freetable p = freeTable;
freetable q = p;
while( p != NULL && p->size < processsize )
{
q = p;
p = p->next;
}
if( p == NULL )
return 3;
usedtable x = new usedarea;
x->address = p->address;
x->size = processsize;
x->name = processname;
x->next = NULL;
if( p->size > processsize )
{
p->size -= processsize;
p->address += processsize;
}
else
{
if( p == freeTable )
freeTable = NULL;
else
q->next = p->next;
delete p;
}
usedtable r = usedTable;
usedtable t = r;
while( r != NULL && r->address < x->address )
{
t = r;
r = r->next;
}
if( usedTable == NULL )
usedTable = x;
else
{
x->next = r;
t->next = x;
}
return 4;
}
int Reclaim( char processname )
{
if( usedTable == NULL )
return 1;
usedtable p = usedTable;
usedtable q = p;
while( p != NULL && p->name != processname )
{
q = p;
p = p->next;
}
if( p == NULL )
return 3;
freetable r = freeTable;
freetable t = r;
freetable x;
while( r != NULL && r->address < p->address )
{
t = r;
r = r->next;
}
x = new freearea;
x->address = p->address;
x->size = p->size;
x->next = NULL;
if( r == freeTable )
{
x->next = r;
freeTable = x;
t = freeTable;
}
else
{
x->next = r;
t->next = x;
}
while( t->next != NULL && t->address + t->size == t->next->address )
{
t->size += t->next->size;
r = t->next;
t->next = t->next->next;
delete r;
}
if( p == usedTable )
{
usedTable = usedTable->next;
}
else
q->next = p->next;
delete p;
return 4;
}
int Init()
{
freeTable = new freearea;
freeTable->address = 0;
freeTable->size = 128;
freeTable->next = NULL;
return 1;
}
void processrequest()
{
char processname;
int processsize;
cout<<"...................."<<endl;
cout<<"作业名: ";
cin >> processname;
cout<<"作业长度: ";
cin >> processsize;
if(processsize<=128)
{int i;
if( alloc( processname , processsize) == 4 )
{
i=i+processsize;
if(i>128)
{cout<<"该作业超出空间"<<endl;
}
if(i<=128)
cout<<"该作业已成功获得所需空间"<<endl;
i=i+processsize;
cout<<"........................................"<<endl;
}
else
cout<<"该作业超出空间,没有获得所需空间"<<endl;
cout<<"........................................"<<endl;
return;
}
if(processsize>128)
{cout<<"该作业超出空间"<<endl;
cout<<"........................................"<<endl;
}
}
void processreclaim()
{
int processname;
cout<<"...................."<<endl;
cout<<"作业名: ";
cin >>processname;
int result = Reclaim( processname );
if( result == 4 )
cout<<"该作业已成功回收"<<endl;
else if( result == 2 || result == 1 )
cout<<"系统没有作业或该作业不存在"<<endl;
cout<<"...................."<<endl;
}
void freeTablePrint()
{
cout<<endl<<endl<<endl<<"***********************************"<<endl;
cout<<setw(10)<<"address"<<setw(10)<<"length"<<setw(10)<<"state"<<endl<<endl;
freetable p = freeTable;
usedtable q = usedTable;
int x , y;
while( p || q )
{
if( p )
x = p->address;
else
x = 0x7fffffff;
if( q )
y = q->address;
else
y = 0x7fffffff;
if( x < y )
{
cout<<setw(10)<<p->address<<setw(10)<<p->size<<setw(10)<<"空闲"<<endl;
p = p->next;
}
if( x > y )
{
cout<<setw(10)<<q->address<<setw(10)<<q->size<<setw(10)<<"已分配"<<setw(10)<<"ID="<<q->name<<endl;
q = q->next;
}
}
cout<<endl<<endl<<endl<<"************************************"<<endl<<endl<<endl;
}
void main()
{
Init();
int choose;
bool exitFlag = false;
while( !exitFlag )
{
cout<<"************************0 - 退出 ************************"<<endl;
cout<<"************************1 - 分配主存 ************************"<<endl;
cout<<"************************2 - 回收主存 ************************"<<endl;
cout<<"************************3 - 显示主存 ************************"<<endl<<endl<<endl;
cout<<"************************选择所要执行的操作:";
cin>>choose;
switch( choose )
{
case 0:
exitFlag = true;
break;
case 1:
processrequest();
break;
case 2:
processreclaim();
break;
case 3:
freeTablePrint();
break;
}
}
}
⑶ hadoop的maprece常见算法案例有几种
基本MapRece模式
计数与求和
问题陈述:
有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
解决方案:
让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Recer一个个遍历这些词的集合然后把他们的频次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Recer
7 method Rece(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Recer的数据量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Recer
14 method Rece(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
应用:Log 分析, 数据查询
整理归类
问题陈述:
有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Recer。 Recer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。
应用:倒排索引, ETL
过滤 (文本查找),解析和校验
问题陈述:
假设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录传换成另外一种形式(转换操作相对于各条记录独立,即对一条记录的操作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。
解决方案:
非常简单,在Mapper 里逐条进行操作,输出需要的值或转换后的形式。
应用:日志分析,数据查询,ETL,数据校验
分布式任务执行
问题陈述:
大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。
解决方案: 将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Recer把多个Mapper的结果组合成一个。
案例研究: 数字通信系统模拟
像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,然后在 Recer 里计算平均错误率。
应用:工程模拟,数字分析,性能测试
排序
问题陈述:
有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。
解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapRece 核心的原因(“核心”是说排序?因为证明Hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。
MapRece 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看这篇博客。
按照BigTable的概念,使用 MapRece来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。
应用:ETL,数据分析
非基本 MapRece 模式
迭代消息传递 (图处理)
问题陈述:
假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。
解决方案:
网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapRece 迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,recer 就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:
1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Recer
8 method Rece(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)
一个节点的状态可以迅速的沿着网络传全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示一样:
案例研究: 沿分类树的有效性传递
问题陈述:
这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。
解决方案:
这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:
1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )
案例研究:广度优先搜索
问题陈述:需要计算出一个图结构中某一个节点到其它所有节点的距离。
解决方案: Source源节点给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:
1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )
案例研究:网页排名和 Mapper 端数据聚合
这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个节点的各联接节点的权重的均值来计算节点自身的权重。
1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )
要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):
1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Recer
14 method Rece(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
应用:图分析,网页索引
值去重 (对唯一项计数)
问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照 G分组).
这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}
Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2
解决方案 I:
第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,然后在Recer中输出每个值对,目的是为了保证F值的唯一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。
第一阶段:
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Recer
7 method Rece(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )
第二阶段:
1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Recer
6 method Rece(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )
解决方案 II:
第二种方法只需要一次MapRece 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Recer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Recer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Recer之前使用Combiner来去除分类的重复值。
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Recer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Rece(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
应用:日志分析,用户计数
互相关
问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。
这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果N*N小到可以容纳于一台机器的内存,实现起来就比较简单了。
配对法
第一种方法是在Mapper中给所有条目配对,然后在Recer中将同一条目对的计数加和。但这种做法也有缺点:
使用 combiners 带来的的好处有限,因为很可能所有项对都是唯一的
不能有效利用内存
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Recer
8 method Rece(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)
Stripes Approach(条方法?不知道这个名字怎么理解)
第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
中间结果的键数量相对较少,因此减少了排序消耗。
可以有效利用 combiners。
可在内存中执行,不过如果没有正确执行的话也会带来问题。
实现起来比较复杂。
一般来说, “stripes” 比 “pairs” 更快
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Recer
10 method Rece(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
应用:文本分析,市场分析
参考资料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
用MapRece 表达关系模式
在这部分我们会讨论一下怎么使用MapRece来进行主要的关系操作。
筛选(Selection)
1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)
投影(Projection)
投影只比筛选稍微复杂一点,在这种情况下我们可以用Recer来消除可能的重复值。
1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Recer
⑷ 如何实现hadoop 的安全机制
为了增强Hadoop的安全机制, 从2009年起, Apache专门抽出一个团队,为Hadoop增加安全认证和授权机制,至今为止,已经可用。
Apache Hadoop 1.0.0版本和Cloudera CDH3之后的版本添加了安全机制,如果你将Hadoop升级到这两个版本,可能会导致Hadoop的一些应用不可用。
Hadoop提供了两种安全机制:Simple和Kerberos。Simple机制(默认情况,Hadoop采用该机制)采用了SAAS协议。 也就是说,用户提交作业时,你说你是XXX(在JobConf的user.name中说明),则在JobTracker端要进行核实,包括两部分核实,一是你到底是不是这个人,即通过检查执行当前代码的人与user.name中的用户是否一致;然后检查ACL(Access Control List)配置文件(由管理员配置),看你是否有提交作业的权限。一旦你通过验证,会获取HDFS或者maprece授予的delegation token(访问不同模块由不同的delegation token),之后的任何操作,比如访问文件,均要检查该token是否存在,且使用者跟之前注册使用该token的人是否一致。
⑸ 求用c++程序设计的实验:模拟分页式存储管理中硬件的地址转换和用先进先出调度算法(FIFO)处理缺页中断。
#include<iostream.h>
#include<stdlib.h>
#include<iomanip.h>
#include"windows.h"
#include"os.h"
#define n 64//实验中假定主存的长度
#define m 4//实验中假定每个作业分得主存块块数
int p[m];//定义页
struct
{
short int lnumber;//页号
short int flag;//表示该页是否在主存,“1”表示在主存,“0”表示不在主存
short int pnumber;//该页所在主存块的块号
short int write;//该页是否被修改过,“1”表示修改过,“0”表示没有修改过
short int dnumber;//该页存放在磁盘上的位置,即磁盘块号
short int times;//被访问的次数,用于LRU算法
}page[n];//定义页表
//各个函数的实现如下:
computer::computer()
{
int i;
for(i=0;i<n;i++)
{
page[i].lnumber = i;
page[i].flag = 0;
page[i].pnumber = 10000;//用10000表示为空
page[i].write = 0;
page[i].dnumber = i;
page[i].times = 0;
}//初始化页表
for(i=0;i<m;i++)
{
page[i].pnumber = i;
}
for(i=0;i<m;i++)
{
p[i] = i;
page[i].flag = 1;
}//初始化页
}
void computer::showpagelist()
{
int i;
cout<<"页号"<<"\t"<<"是否在主存中"<<"\t"<<"块 号"<<"\t"<<"是否被修改过"<<"\t"<<"磁盘块号"<<"\t"<<"访问次数"<<endl;
for(i=0;i<n;i++)
{
cout<<page[i].lnumber<<"\t"<<page[i].flag<<" "<<page[i].pnumber<<"\t"<<page[i].write<<" "<<page[i].dnumber<<" \t"<<page[i].times<<endl;
}
}
void computer::showpage()
{
int i;
for(i=0;i<m;i++)
{
cout<<"\t"<<p[i];
}
cout<<endl;
}
void computer::transformation()
{
unsigned logicAddress,logicNumber,innerAddress,physicsAddress,physicsNumber;
int i,head=0,fail = 0;
int method,temppage=0;
short int times = 10000;
cout<<"请输入一个逻辑地址(四位十六进制数):";
cin>>hex>>logicAddress;//读入逻辑地址
logicNumber = logicAddress >> 10;//得到页号
cout<<"页号为:"<<logicNumber<<endl;
innerAddress = logicAddress & 0x03ff;//得到页内地址
cout<<"页内地址为:"<<innerAddress<<endl;
for(i=0;i<n;i++)
{
if(logicNumber==(unsigned)page[i].lnumber)
{
if(page[i].flag == 1)
{
cout<<"请求的页面在主存中!"<<endl;
page[i].times++;
physicsNumber = page[i].pnumber;//由页号得到块号
cout<<"请求的主存块号为:"<<physicsNumber<<endl;
physicsAddress = physicsNumber << 10 |innerAddress;//得到物理地址
cout<<"请求的物理地址为:"<<physicsAddress<<endl;//输出物理地址
break;
}
else
{
cout<<"请求的页面不在主存中! 将进行缺页中断处理!"<<endl<<"请选择算法!"<<endl;
cout<<"1.先进先出"<<endl<<"2.最近最少用"<<endl<<"请选择置换算法:";
cin>>method;
if(method == 1) //采用先进先出算法
{
cout<<"采用先进先出算法!"<<endl;
fail = p[head];
cout<<"第"<<fail<<"页将被替换!"<<endl;
p[head] = logicNumber;
head = (head+1) % m;
if(page[fail].write == 1)
cout<<"第"<<fail<<"页曾被修改过!"<<endl;
page[fail].flag = 0;
page[logicNumber].flag = 1;
page[logicNumber].write = 0;
page[logicNumber].pnumber = page[fail].pnumber;
page[fail].pnumber = 10000;
page[logicNumber].times++;
break;
}
else if(method == 2) //采用最近最少用算法
{
cout<<"采用最近最少用算法!"<<endl;
for(i=0;i<n;i++)
{
if(page[i].flag == 1)
{
if(page[i].times<times)
{
times = page[i].times;
temppage = page[i].lnumber;
}
}
}
cout<<"第"<<temppage<<"页将被替换!"<<endl;
for(i=0;i<m;i++)
{
if(p[i] == temppage)
{
p[i] = logicNumber;
}
}
if(page[temppage].write == 1)
cout<<"第"<<temppage<<"页曾被修改过!"<<endl;
page[temppage].flag = 0;
page[logicNumber].flag = 1;
page[logicNumber].write = 0;
page[logicNumber].pnumber = page[temppage].pnumber;
page[temppage].pnumber = 10000;
page[logicNumber].times++;
break;
}
else
{ cout<<"你输入有误,即将退出!";
exit(1);
}
}
}
}
}
void main()
{
char c,d;
computer os;
cout<<"页表正在初始化中...,3秒钟后为你显示页和页表!"<<endl;
Sleep(3000);
os.showpage();
os.showpagelist();
T:
os.transformation();
cout<<"是否显示页和页表?(Y/N)";
cin>>c;
switch(c)
{
case 'y':
os.showpage();
os.showpagelist();
case 'n':
cout<<"是否继续进行请求分页?(Y/N)";
cin>>d;
if (d=='Y'||d=='y')
goto T;
else if (d=='N'||d=='n')
exit(1);
else
cout<<"输入错误!"<<endl;
default:cout<<"输入错误!"<<endl;
}
}
⑹ 虚拟机是什么,有什么好处
虚拟机(Virtual Machine)指通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的完整计算机系统。
虚拟系统通过生成现有操作系统的全新虚拟镜像,它具有真实windows系统完全一样的功能,进入虚拟系统后,所有操作都是在这个全新的独立的虚拟系统里面进行,可以独立安装运行软件,保存数据,拥有自己的独立桌面,不会对真正的系统产生任何影响 ,而且具有能够在现有系统与虚拟镜像之间灵活切换的一类操作系统。
(6)磁盘调度算法的模拟扩展阅读:
虚拟机技术最早由 IBM 于上世纪六七十年代提出,被定义为硬件设备的软件模拟实现,通常的使用模式是分时共享昂贵的大型机。 虚拟机监视器(Virtual Machine Monitor,VMM)是虚拟机技术的核心,它是一层位于操作系统和计算机硬件之间的代码,用来将硬件平台分割成多个虚拟机。
VMM 运行在特权模式,主要作用是隔离并且管理上层运行的多个虚拟机,仲裁它们对底层硬件的访问,并为每个客户操作系统虚拟一套独立于实际硬件的虚拟硬件环境(包括处理器,内存,I/O 设备)。VMM 采用某种调度算法在各个虚拟机之间共享 CPU,如采用时间片轮转调度算法