1. php+mysql开发的网站 如何使用hadoop+hbase+hive,能代替mysql么
hive和hbase是两码事儿吧,区别在于前者基于HQL,后者是NoSQL。hive将HQL(类SQL)翻译成maprece任务,在hadoop的HDFS上运行。脚本语言(php、python等)可以通过thrift调用hive接口
2. mongodb 请问php中的这句mysql语法,在mongodb中如何写。
查询:
MySQL:
SELECT * FROM user
Mongo:
db.user.find()
MySQL:
SELECT * FROM user WHERE name = 'starlee'
Mongo:
db.user.find({‘name' : 'starlee'})
插入:
MySQL:
INSERT INOT user (`name`, `age`) values ('starlee',25)
Mongo:
db.user.insert({‘name' : 'starlee', ‘age' : 25})
如果你想在MySQL里添加一个字段,你必须:
ALTER TABLE user….
但在MongoDB里你只需要:
db.user.insert({‘name' : 'starlee', ‘age' : 25, ‘email' : '[email protected]'})
删除:
MySQL:
DELETE * FROM user
Mongo:
db.user.remove({})
MySQL:
DELETE FROM user WHERE age < 30
Mongo:
db.user.remove({‘age' : {$lt : 30}})
$gt : > ; $gte : >= ; $lt : < ; $lte : <= ; $ne : !=
更新:
MySQL:
UPDATE user SET `age` = 36 WHERE `name` = 'starlee'
Mongo:
db.user.update({‘name' : 'starlee'}, {$set : {‘age' : 36}})
MySQL:
UPDATE user SET `age` = `age` + 3 WHERE `name` = 'starlee'
Mongo:
db.user.update({‘name' : 'starlee'}, {$inc : {‘age' : 3}})
MySQL:
SELECT COUNT(*) FROM user WHERE `name` = 'starlee'
Mongo:
db.user.find({‘name' : 'starlee'}).count()
MySQL:
SELECT * FROM user limit 10,20
Mongo:
db.user.find().skip(10).limit(20)
MySQL:
SELECT * FROM user WHERE `age` IN (25, 35,45)
Mongo:
db.user.find({‘age' : {$in : [25, 35, 45]}})
MySQL:
SELECT * FROM user ORDER BY age DESC
Mongo:
db.user.find().sort({‘age' : -1})
MySQL:
SELECT DISTINCT(name) FROM user WHERE age > 20
Mongo:
db.user.distinct(‘name', {‘age': {$lt : 20}})
MySQL:
SELECT name, sum(marks) FROM user GROUP BY name
Mongo:
db.user.group({
key : {‘name' : true},
cond: {‘name' : ‘foo'},
rece: function(obj,prev) { prev.msum += obj.marks; },
initial: {msum : 0}
});
MySQL:
SELECT name FROM user WHERE age < 20
Mongo:
db.user.find(‘this.age < 20′, {name : 1})
发现很多人在搜MongoDB循环插入数据,下面把MongoDB循环插入数据的方法添加在下面:
for(var i=0;i<100;i++)db.test.insert({uid:i,uname:'nosqlfan'+i});
上面一次性插入一百条数据,大概结构如下:
{ “_id” : ObjectId(“4c876e519e86023a30dde6b8″), “uid” : 55, “uname” : “nosqlfan55″ }
{ “_id” : ObjectId(“4c876e519e86023a30dde6b9″), “uid” : 56, “uname” : “nosqlfan56″ }
{ “_id” : ObjectId(“4c876e519e86023a30dde6ba”), “uid” : 57, “uname” : “nosqlfan57″ }
{ “_id” : ObjectId(“4c876e519e86023a30dde6bb”), “uid” : 58, “uname” : “nosqlfan58″ }
{ “_id” : ObjectId(“4c876e519e86023a30dde6bc”), “uid” : 59, “uname” : “nosqlfan59″ }
{ “_id” : ObjectId(“4c876e519e86023a30dde6bd”), “uid” : 60, “uname” : “nosqlfan60″ }
简易对照表
SQL Statement Mongo Query Language Statement
CREATE TABLE USERS (a Number, b Number) implicit; can be done explicitly
INSERT INTO USERS VALUES(1,1) db.users.insert({a:1,b:1})
SELECT a,b FROM users db.users.find({}, {a:1,b:1})
SELECT * FROM users db.users.find()
SELECT * FROM users WHERE age=33 db.users.find({age:33})
SELECT a,b FROM users WHERE age=33 db.users.find({age:33}, {a:1,b:1})
SELECT * FROM users WHERE age=33 ORDER BY name db.users.find({age:33}).sort({name:1})
SELECT * FROM users WHERE age>33 db.users.find({'age':{$gt:33}})})
SELECT * FROM users WHERE age<33 db.users.find({'age':{$lt:33}})})
SELECT * FROM users WHERE name LIKE "%Joe%" db.users.find({name:/Joe/})
SELECT * FROM users WHERE name LIKE "Joe%" db.users.find({name:/^Joe/})
SELECT * FROM users WHERE age>33 AND age<=40 db.users.find({'age':{$gt:33,$lte:40}})})
SELECT * FROM users ORDER BY name DESC db.users.find().sort({name:-1})
CREATE INDEX myindexname ON users(name) db.users.ensureIndex({name:1})
CREATE INDEX myindexname ON users(name,ts DESC) db.users.ensureIndex({name:1,ts:-1})
SELECT * FROM users WHERE a=1 and b='q' db.users.find({a:1,b:'q'})
SELECT * FROM users LIMIT 10 SKIP 20 db.users.find().limit(10).skip(20)
SELECT * FROM users WHERE a=1 or b=2 db.users.find( { $or : [ { a : 1 } , { b : 2 } ] } )
SELECT * FROM users LIMIT 1 db.users.findOne()
EXPLAIN SELECT * FROM users WHERE z=3 db.users.find({z:3}).explain()
SELECT DISTINCT last_name FROM users db.users.distinct('last_name')
SELECT COUNT(*y) FROM users db.users.count()
SELECT COUNT(*y) FROM users where AGE > 30 db.users.find({age: {'$gt': 30}}).count()
SELECT COUNT(AGE) from users db.users.find({age: {'$exists': true}}).count()
UPDATE users SET a=1 WHERE b='q' db.users.update({b:'q'}, {$set:{a:1}}, false, true)
UPDATE users SET a=a+2 WHERE b='q' db.users.update({b:'q'}, {$inc:{a:2}}, false, true)
DELETE FROM users WHERE z="abc" db.users.remove({z:'abc'});
###################################################
一、操作符
操作符相信大家肯定都知道了,就是等于、大于、小于、不等于、大于等于、小于等于,但是在mongodb里不能直接使用这些操作符。在mongodb里的操作符是这样表示的:
(1) $gt > (大于)
(2) $lt< (小于)
(3) $gte>= (大于等于)
(4) $lt<= (小于等于)
(5) $ne!= (不等于)
(6) $inin (包含)
(7) $ninnot in (不包含)
(8) $existsexist (字段是否存在)
(9) $inc对一个数字字段field增加value
(10) $set就是相当于sql的set field = value
(11) $unset就是删除字段
(12) $push把value追加到field里面去,field一定要是数组类型才行,如果field不存在,会新增一个数组类型加进去
(13) $pushAll同$push,只是一次可以追加多个值到一个数组字段内
(14) $addToSet增加一个值到数组内,而且只有当这个值不在数组内才增加。
(15) $pop删除最后一个值:{ $pop : { field : 1 } }删除第一个值:{ $pop : { field : -1 } }注意,只能删除一个值,也就是说只能用1或-1,而不能用2或-2来删除两条。mongodb 1.1及以后的版本才可以用
(16) $pull从数组field内删除一个等于value值
(17) $pullAll同$pull,可以一次删除数组内的多个值
(18) $ 操作符是他自己的意思,代表按条件找出的数组里面某项他自己。这个比较坳口,就不说了。
二、CURD 增、改、读、删
增加
复制代码代码如下:
db.collection->insert({'name' => 'caleng', 'email' => 'admin#admin.com'});
是不是灰常简单呀,对就是这么简单,它没有字段的限制,你可以随意起名,并插入数据
复制代码代码如下:
db.collection.update( { "count" : { $gt : 1 } } , { $set : { "test2" : "OK"} } ); 只更新了第一条大于1记录
db.collection.update( { "count" : { $gt : 3 } } , { $set : { "test2" : "OK"} },false,true ); 大于3的记录 全更新了
db.collection.update( { "count" : { $gt : 4 } } , { $set : { "test5" : "OK"} },true,false ); 大于4的记录 只加进去了第一条
db.collection.update( { "count" : { $gt : 5 } } , { $set : { "test5" : "OK"} },true,true ); 大于5的记录 全加进去
查询
复制代码代码如下:
db.collection.find(array('name' => 'ling'), array('email'=>'[email protected]'))
db.collection.findOne(array('name' => 'ling'), array('email''[email protected]'))
大家可以看到查询我用了两种不同的写法,这是为什么,其实这跟做菜是一样的,放不同的调料,炒出的菜是不同的味道。下面给大家说一下,这两种调料的不同作用。
findOne()只返回一个文档对象,find()返回一个集合列表。
也就是说比如,我们只想查某一条特定数据的详细信息的话,我们就可以用findOne();
如果想查询某一组信息,比如说一个新闻列表的时候,我们就可以作用find();
那么我想大家这时一定会想到我想对这一个列表排序呢,no problem mongodb会为您全心全意服务
复制代码代码如下:
db.collection.find().sort({age:1}); //按照age正序排列
db.collection.find().sort({age:-1}); //按照age倒序排列
db.collection.count(); //得到数据总数
db.collection.limit(1); //取数据的开始位置
db.collection.skip(10); //取数据的结束位置
//这样我们就实现了一个取10条数据,并排序的操作。
删除
删除有两个操作 remove()和drop()
复制代码代码如下:
db.collection.remove({"name",'jerry'}) //删除特定数据
db.collection.drop() //删除集合内的所有数据
distinct操作
复制代码代码如下:
db.user.distinct('name', {'age': {$lt : 20}})
2. 熟悉MongoDB的数据操作语句,类sql
数据库操作语法
mongo --path
db.AddUser(username,password) 添加用户
db.auth(usrename,password) 设置数据库连接验证
db.cloneDataBase(fromhost) 从目标服务器克隆一个数据库
db.commandHelp(name) returns the help for the command
db.Database(fromdb,todb,fromhost) 复制数据库fromdb---源数据库名称,todb---目标数据库名称,fromhost---源数据库服务器地址
db.createCollection(name,{size:3333,capped:333,max:88888}) 创建一个数据集,相当于一个表
db.currentOp() 取消当前库的当前操作
db.dropDataBase() 删除当前数据库
db.eval(func,args) run code server-side
db.getCollection(cname) 取得一个数据集合,同用法:db['cname'] or db.cname
db.getCollenctionNames() 取得所有数据集合的名称列表
db.getLastError() 返回最后一个错误的提示消息
db.getLastErrorObj() 返回最后一个错误的对象
db.getMongo() 取得当前服务器的连接对象get the server connection object
db.getMondo().setSlaveOk() allow this connection to read from then nonmaster membr of a replica pair
db.getName() 返回当操作数据库的名称
db.getPrevError() 返回上一个错误对象
db.getProfilingLevel() ?什么等级
db.getReplicationInfo() ?什么信息
db.getSisterDB(name) get the db at the same server as this onew
db.killOp() 停止(杀死)在当前库的当前操作
db.printCollectionStats() 返回当前库的数据集状态
db.printReplicationInfo()
db.printSlaveReplicationInfo()
db.printShardingStatus() 返回当前数据库是否为共享数据库
db.removeUser(username) 删除用户
db.repairDatabase() 修复当前数据库
db.resetError()
db.runCommand(cmdObj) run a database command. if cmdObj is a string, turns it into {cmdObj:1}
db.setProfilingLevel(level) 0=off,1=slow,2=all
db.shutdownServer() 关闭当前服务程序
db.version() 返回当前程序的版本信息
数据集(表)操作语法
db.linlin.find({id:10}) 返回linlin数据集ID=10的数据集
db.linlin.find({id:10}).count() 返回linlin数据集ID=10的数据总数
db.linlin.find({id:10}).limit(2) 返回linlin数据集ID=10的数据集从第二条开始的数据集
db.linlin.find({id:10}).skip(8) 返回linlin数据集ID=10的数据集从0到第八条的数据集
db.linlin.find({id:10}).limit(2).skip(8) 返回linlin数据集ID=1=的数据集从第二条到第八条的数据
db.linlin.find({id:10}).sort() 返回linlin数据集ID=10的排序数据集
db.linlin.findOne([query]) 返回符合条件的一条数据
db.linlin.getDB() 返回此数据集所属的数据库名称
db.linlin.getIndexes() 返回些数据集的索引信息
db.linlin.group({key:...,initial:...,rece:...[,cond:...]})
db.linlin.mapRece(mayFunction,receFunction,<optional params>)
db.linlin.remove(query) 在数据集中删除一条数据
db.linlin.renameCollection(newName) 重命名些数据集名称
db.linlin.save(obj) 往数据集中插入一条数据
db.linlin.stats() 返回此数据集的状态
db.linlin.storageSize() 返回此数据集的存储大小
db.linlin.totalIndexSize() 返回此数据集的索引文件大小
db.linlin.totalSize() 返回些数据集的总大小
db.linlin.update(query,object[,upsert_bool]) 在此数据集中更新一条数据
db.linlin.validate() 验证此数据集
db.linlin.getShardVersion() 返回数据集共享版本号
3. hadoop maprece 分桶
老大之前在网络,由于shell 和awk 写的溜,所以他总是推荐 使用shell 和awk 来跑 hadoop streaming 【hs】,hs还真是一个好东西,不需要编译,想怎么执行就怎么整,还不需要IDE,只要你你记住主要的执行内容就完全没有问题,hs也支持 python ,这样一来 hadoop 可以让所有人都可以使用了。
可以参考这个文章
https://www.programcreek.com/java-api-examples/index.php?api=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
https://github.com/roanjain/hadoop-partitioner/blob/master/src/com/mapred/partitioner/PartitionerDemo.java
老大讲在网络 执行hs 时经常分桶,用来控制 map 和 Rece 任务的个数,确实还是有帮助的,可以控制 桶的个数,也可以控制资源的使用情况,比如 cpu 和内存
一篇干货很多的文章可以参考 http://www.cnblogs.com/van19/p/5756448.html
Hadoop用于对key的排序和分桶的设置选项比较多和复杂,目前在公司内主要以KeyFieldBasePartitioner和KeyFieldBaseComparator被hadoop用户广泛使用。
基本概念:
Partition:分桶过程,用户输出的key经过partition分发到不同的rece里,因而partitioner就是分桶器,一般用平台默认的hash分桶也可以自己指定。
Key:是需要排序的字段,相同分桶&&相同key的行排序到一起。
下面以一个简单的文本作为例子,通过搭配不同的参数跑出真实作业的结果来演示这些参数的使用方法。
假设map的输出是这样以点号分隔的若干行:
d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3
我们知道,在streaming模式默认hadoop会把map输出的一行中遇到的第一个设定的字段分隔符前面的部分作为key,后面的作为 value,如果输出的一行中没有指定的字段分隔符,则整行作为key,value被设置为空字符串。 那么对于上面的输出,如果想用map输出的前2个字段作为key,后面字段作为value,并且不使用hadoop默认的“\t”字段分隔符,而是根据该 文本特点使用“.”来分割,需要如何设置呢
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf mapred.rece.tasks=5
结果:
e.9 4.5
f.8 3.3
——————
d.1 5.23
e.5 1.23
e.5 1.45
e.5 9.22
——————
a.7 2.6
总结:
从结果可以看出,在rece的输出中,前两列和后两列用“\t”分隔,证明map输出时确实把用“.”分隔的前两列作为key,后面的作为 value。并且前两列相同的“e.5”开头的三行被分到了同一个rece中,证明确实以前两列作为key整体做的partition。
stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符
KeyFieldBasePartitioner的用法
如果想要灵活设置key中用于partition的字段,而不是把整个key都用来做partition。就需要使用hadoop中的org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner了。
下面只用第一列作partition,但依然使用前两列作为key。
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf num.key.fields.for.partition=1
-jobconf mapred.rece.tasks=5
结果:
d.1 5.23
——————
e.5 1.23
e.5 1.45
e.5 9.22
e.9 4.5
——————
a.7 2.6
f.8 3.3
总结:
从结果可以看出,这次“e”开头的行都被分到了一个桶内,证明做partition是以第一列为准的,而key依然是前两列。并且在同一个 partition内,先按照第一列排序,第一列相同的,按照第二列排序。这里要注意的是使用 map.output.key.field.separator来指定key内字段的分隔符,这个参数是KeyFieldBasePartitioner 和KeyFieldBaseComparator所特有的。
map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
事实上KeyFieldBasePartitioner还有一个高级参数 mapred.text.key.partitioner.options,这个参数可以认为是 num.key.fields.for.partition的升级版,它可以指定不仅限于key中的前几个字段用做partition,而是可以单独指定 key中某个字段或者某几个字段一起做partition。
比如上面的需求用mapred.text.key.partitioner.options表示为
mapred.text.key.partitioner.options=-k1,1
注意mapred.text.key.partitioner.options和num.key.fields.for.partition不需要一起使用,一起使用则以num.key.fields.for.partition为准。
这里再举一个例子,使用mapred.text.key.partitioner.options
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=3
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k2,3
-jobconf mapred.rece.tasks=5
结果:
e.9.4 5
——————
a.7.2 6
e.5.9 22
——————
d.1.5 23
e.5.1 23
e.5.1 45
f.8.3 3
可见,这次是以前3列作为key的,而partition则以key中的第2-3列,因此以“e”开头的行被拆散了,但第二三列相同的“5,1”被 分到一个桶内。在同一个桶内,依然是从key的第一列开始排序的,注意,KeyFieldBasePartitioner只影响分桶并不影响排序。
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition
KeyFieldBaseComparator的用法
首先简单解释一下hadoop框架中key的comparator,对于hadoop所识别的所有java的key类型(在框架看来key的类型只 能是java的),很多类型都自定义了基于字节的比较器,比如Text,IntWritable等等,如果不特别指定比较器而使用这些类型默认的,则会将 key作为一个整体的字节数组来进行比较。而KeyFieldBaseComparator则相当于是一个可以灵活设置比较位置的高级比较器,但是它并没 有自己独有的比较逻辑,而是使用默认Text的基于字典序或者通过-n来基于数字比较。
之前的例子使用KeyFieldBasePartitioner自定义了使用key中的部分字段做partition,现在我们通过org.apache.hadoop.mapred.lib.KeyFieldBasedComparator来自定义使用key中的部分字段做比较。
这次把前四列都作为key,前两列做partition,排序依据优先依据第三列正序(文本序),第四列逆序(数字序)的组合排序。
bin/hadoop streaming -input /tmpcomp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-jobconf stream.num.map.output.key.fields=4
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k1,2
-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr"
-jobconf mapred.rece.tasks=5
结果:
e.5.1.45
e.5.1.23
d.1.5.23
e.5.9.22
——————
a.7.2.6
——————
f.8.3.3
e.9.4.5
总结:
从结果可以看出,符合预期的按照先第三列文本正序,然后第四列基于数字逆序的排序。
另外注意,如果这种写法
mapred.text.key.comparator.options=”-k2″
则会从第二列开始,用字典序一直比较到key的最后一个字节。所以对于希望准确排序字段的需求,还是使用“k2,2”这种确定首尾范围的形式更好。另外如果给定的key中某些行需要排序的列数不够时,会比较到最后一列,缺列的行默认缺少的那一列排序值最小。
mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围
4. 如何在Hadoop上编写MapRece程序
用户配置并将一个Hadoop作业提到Hadoop框架中,Hadoop框架会把这个作业分解成一系列map tasks 和rece tasks。Hadoop框架负责task分发和执行,结果收集和作业进度监控。
在编写MapRece程序时,用户分别通过InputFormat和OutputFormat指定输入和输出格式,并定义Mapper和Recer指定map阶段和rece阶段的要做的工作。在Mapper或者Recer中,用户只需指定一对key/value的处理逻辑,Hadoop框架会自动顺序迭代解析所有key/value,并将每对key/value交给Mapper或者Recer处理。表面上看来,Hadoop限定数据格式必须为key/value形式,过于简单,很难解决复杂问题,实际上,可以通过组合的方法使key或者value(比如在key或者value中保存多个字段,每个字段用分隔符分开,或者value是个序列化后的对象,在Mapper中使用时,将其反序列化等)保存多重信息,以解决输入格式较复杂的应用。
2.2 用户的工作
用户编写MapRece需要实现的类或者方法有:
(1) InputFormat接口
用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
其中getSplits函数将所有输入数据分成numSplits个split,每个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每个record解析成key/value对。
Hadoop本身提供了一些InputFormat:
(2)Mapper接口
用户需继承Mapper接口实现自己的Mapper,Mapper中必须实现的函数是
void map(K1 key,
V1 value,
OutputCollector<K2,V2> output,
Reporter reporter
) throws IOException
其中,<K1 V1>是通过Inputformat中的RecordReader对象解析处理 的,OutputCollector获取map()的输出结果,Reporter保存了当前task处理进度。
Hadoop本身提供了一些Mapper供用户使用:
(3)Partitioner接口
用户需继承该接口实现自己的Partitioner以指定map task产生的key/value对交给哪个rece task处理,好的Partitioner能让每个rece task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition( K2 key, V2 value, int numPartitions)
该函数返回<K2 V2>对应的rece task ID。
用户如果不提供Partitioner,Hadoop会使用默认的(实际上是个hash函数)。
(4)Combiner
Combiner使得map task与rece task之间的数据传输量大大减小,可明显提高性能。大多数情况下,Combiner与Recer相同。
(5)Recer接口
用户需继承Recer接口实现自己的Recer,Recer中必须实现的函数是
void rece(K2 key,
Iterator<V2> values,
OutputCollector<K3,V3> output,
Reporter reporter
) throws IOException
Hadoop本身提供了一些Recer供用户使用:
(6)OutputFormat
用户通过OutputFormat指定输出文件的内容格式,不过它没有split。每个rece task将其数据写入自己的文件,文件名为part-nnnnn,其中nnnnn为rece task的ID。
Hadoop本身提供了几个OutputFormat:
3. 分布式缓存
Haoop中自带了一个分布式缓存,即DistributedCache对象,方便map task之间或者rece task之间共享一些信息,比如某些实际应用中,所有map task要读取同一个配置文件或者字典,则可将该配置文件或者字典放到分布式缓存中。
4. 多语言编写MapRece作业
Hadoop采用java编写,因而Hadoop天生支持java语言编写作业,但在实际应用中,有时候,因要用到非java的第三方库或者其他原因,要采用C/C++或者其他语言编写MapRece作业,这时候可能要用到Hadoop提供的一些工具。
如果你要用C/C++编写MpaRece作业,可使用的工具有Hadoop Streaming或者Hadoop Pipes。
如果你要用Python编写MapRece作业,可以使用Hadoop Streaming或者Pydoop。
如果你要使用其他语言,如shell,php,ruby等,可使用Hadoop Streaming。
关于Hadoop Streaming编程,可参见我的这篇博文:《Hadoop Streaming编程》(http://dongxicheng.org/maprece/hadoop-streaming-programming/ )
关于Pydoop编程,可参见其官方网站:http://sourceforge.net/projects/pydoop/
关于Hadoop pipes编程,可参见《Hadoop Tutorial 2.2 — Running C++ Programs on Hadoop》。
5. 编程方式比较
(1)java。 Hadoop支持的最好最全面的语言,而且提供了很多工具方便程序员开发。
(2)Hadoop Streaming。 它最大的优点是支持多种语言,但效率较低,rece task需等到map 阶段完成后才能启动;它不支持用户自定义InputFormat,如果用户想指定输入文件格式,可使用java语言编写或者在命令行中指定分隔符;它采用标准输入输出让C/C++与java通信,因而只支持text数据格式。
(3)Hadoop Pipes。 专门为C/C++语言设计,由于其采用了socket方式让C/C++与java通信,因而其效率较低(其优势在于,但作业需要大量,速度很快)。它支持用户(用C/C++)编写RecordReader。
(4)Pydoop。它是专门方便python程序员编写MapRece作业设计的,其底层使用了Hadoop Streaming接口和libhdfs库。
6. 总结
Hadoop使得分布式程序的编写变得异常简单,很多情况下,用户只需写map()和rece()两个函数即可(InputFormat,Outputformat可用系统缺省的)。正是由于Hadoop编程的简单性,越来越多的公司或者研究单位开始使用Hadoop。
7. 注意事项
(1) Hadoop默认的InputFormat是TextInputFormat,它将文件中的每一行作为value,该行的偏移量为key。
(2)如果你的输入是文本文件,且每一行包括key,value,则可使用Hadoop中自带的KeyValueTextInputFormat,它默认的每行是一个key/value对,且key与value的分割如为TAB(’\t‘),如果想修改key/value之间的分隔符,如将分割符改为“,”,可使用conf.set(“key.value.separator.in.input.line”,”,”);或者-D key.value.separator.in.input.line=,。
8. 参考资料
(1) 书籍 Jason Venner 《Pro Hadoop》
(2) 书籍 Chuck Lam 《Hadoop in Action》
(3) 书籍 Tom White 《Hadoop The Definitive Guide》
(4) Hadoop分布式缓存例子:书籍《Hadoop The Definitive Guide》 第八章 最后一节“Distributed Cache”
(5) Hadoop Pipes例子:源代码中$HAOOOP_HOME/src/examples/pipes路径下。
(6) Hadoop Pipes资料:
http://developer.yahoo.com/hadoop/tutorial/mole4.html#pipes
http://wiki.apache.org/hadoop/C%2B%2BWordCount
5. 如何在Hadoop中使用Streaming编写MapRece
Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapRece程序,韩国的gogamza在其Bolg中也提到如何用C编写MapRece程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapRece程序。
首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapRece,Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Rece进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapRece程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。
我们还是使用Hadoop的例子WordCount来做示范如何编写MapRece,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Rece中我们来统计单词的出现频率。
Python Code
Map: mapper.py
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Rece: recer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
C Code
Map: Mapper.c
#include
#include
#include
#include
#define BUF_SIZE 2048
#define DELIM "\n"
int main(int argc, char *argv[]){
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
if(buffer[len-1] == '\n')
buffer[len-1] = 0;
char *querys = index(buffer, ' ');
char *query = NULL;
if(querys == NULL) continue;
querys += 1; /* not to include '\t' */
query = strtok(buffer, " ");
while(query){
printf("%s\t1\n", query);
query = strtok(NULL, " ");
}
}
return 0;
}
h>h>h>h>
Rece: Recer.c
#include
#include
#include
#include
#define BUFFER_SIZE 1024
#define DELIM "\t"
int main(int argc, char *argv[]){
char strLastKey[BUFFER_SIZE];
char strLine[BUFFER_SIZE];
int count = 0;
*strLastKey = '\0';
*strLine = '\0';
while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
char *strCurrKey = NULL;
char *strCurrNum = NULL;
strCurrKey = strtok(strLine, DELIM);
strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
if( strLastKey[0] == '\0'){
strcpy(strLastKey, strCurrKey);
}
if(strcmp(strCurrKey, strLastKey)){
printf("%s\t%d\n", strLastKey, count);
count = atoi(strCurrNum);
}else{
count += atoi(strCurrNum);
}
strcpy(strLastKey, strCurrKey);
}
printf("%s\t%d\n", strLastKey, count); /* flush the count */
return 0;
}
h>h>h>h>
首先我们调试一下源码:
chmod +x mapper.py
chmod +x recer.py
echo "foo foo quux labs foo bar quux" | ./mapper.py | ./recer.py
bar 1
foo 3
labs 1
quux 2
g++ Mapper.c -o Mapper
g++ Recer.c -o Recer
chmod +x Mapper
chmod +x Recer
echo "foo foo quux labs foo bar quux" | ./Mapper | ./Recer
bar 1
foo 2
labs 1
quux 1
foo 1
quux 1
你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.
在Hadoop中运行程序
首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.
bin/hadoop dfs -FromLocal /tmp/doc doc
运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapRece的Job.
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/Mapper\
-recer /home/hadoop/Recer -input doc/* -output c-output -jobconf mapred.rece.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
08/03/04 19:03:14 INFO streaming.StreamJob: map 0% rece 0%
08/03/04 19:03:15 INFO streaming.StreamJob: map 33% rece 0%
08/03/04 19:03:16 INFO streaming.StreamJob: map 100% rece 0%
08/03/04 19:03:19 INFO streaming.StreamJob: map 100% rece 100%
08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py\
-recer /home/hadoop/recer.py -input doc/* -output python-output -jobconf mapred.rece.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
08/03/04 19:05:42 INFO streaming.StreamJob: map 0% rece 0%
08/03/04 19:05:48 INFO streaming.StreamJob: map 33% rece 0%
08/03/04 19:05:49 INFO streaming.StreamJob: map 100% rece 0%
08/03/04 19:05:52 INFO streaming.StreamJob: map 100% rece 100%
08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output
当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。
当Job工作完成后我们能够在c-output和python-output看到一些文件
bin/hadoop dfs -ls c-output
输入下面的命令我们能够看到我们运行完MapRece的结果
bin/hadoop dfs -cat c-output/part-00000
用Hadoop Streaming运行MapRece会比较用Java的代码要慢,因为有两方面的原因:
使用 Java API >> C Streaming >> Perl Streaming 这样的一个流程运行会阻塞IO.
不像Java在运行Map后输出结果有一定数量的结果集就启动Rece的程序,用Streaming要等到所有的Map都运行完毕后才启动Rece
如果用Python编写MapRece的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapRece API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.
下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-maprece-program-in-php.html页面中摘下的用php编写的MapRece程序,供php程序员参考:
Map: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN (standard input)
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace and lowercase
$line = strtolower(trim($line));
// split the line into words while removing any empty string
$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
// increase counters
foreach ($words as $word) {
$word2count[$word] += 1;
}
}
// write the results to STDOUT (standard output)
// what we output here will be the input for the
// Rece step, i.e. the input for recer.py
foreach ($word2count as $word => $count) {
// tab-delimited
echo $word, chr(9), $count, PHP_EOL;
}
?>
Rece: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace
$line = trim($line);
// parse the input we got from mapper.php
list($word, $count) = explode(chr(9), $line);
// convert count (currently a string) to int
$count = intval($count);
// sum counts
if ($count > 0) $word2count[$word] += $count;
}
// sort the words lexigraphically
//
// this set is NOT required, we just do it so that our
// final output will look more like the official Hadoop
// word count examples
ksort($word2count);
// write the results to STDOUT (standard output)
foreach ($word2count as $word => $count) {
echo $word, chr(9), $count, PHP_EOL;
}
?>
作者:马士华 发表于:2008-03-05
6. PHP的算法可以实现大数据分析吗
1.Bloom filter
适用范围:可以用来实现数据字典,进行数据的判重,或者集合求交集
基本原理及要点:
对于原理来说很简单,位数组+k个独立hash函数。将hash函数对应的值的位数组置1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是100%正确的。同时也不支持删除一个已经插入的关键字,因为该关键字对应的位会牵动到其他的关键字。所以一个简单的改进就是 counting Bloom filter,用一个counter数组代替位数组,就可以支持删除了。
还有一个比较重要的问题,如何根据输入元素个数n,确定位数组m的大小及hash函数个数。当hash函数个数k=(ln2)*(m/n)时错误率最小。在错误率不大于E的情况下,m至少要等于n*lg(1/E)才能表示任意n个元素的集合。但m还应该更大些,因为还要保证bit数组里至少一半为 0,则m 应该>=nlg(1/E)*lge 大概就是nlg(1/E)1.44倍(lg表示以2为底的对数)。
举个例子我们假设错误率为0.01,则此时m应大概是n的13倍。这样k大概是8个。
注意这里m与n的单位不同,m是bit为单位,而n则是以元素个数为单位(准确的说是不同元素的个数)。通常单个元素的长度都是有很多bit的。所以使用bloom filter内存上通常都是节省的。
扩展:
Bloom filter将集合中的元素映射到位数组中,用k(k为哈希函数个数)个映射位是否全1表示元素在不在这个集合中。Counting bloom filter(CBF)将位数组中的每一位扩展为一个counter,从而支持了元素的删除操作。Spectral Bloom Filter(SBF)将其与集合元素的出现次数关联。SBF采用counter中的最小值来近似表示元素的出现频率。
问题实例:给你A,B两个文件,各存放50亿条URL,每条URL占用64字节,内存限制是4G,让你找出A,B文件共同的URL。如果是三个乃至n个文件呢?
根据这个问题我们来计算下内存的占用,4G=2^32大概是40亿*8大概是340亿,n=50亿,如果按出错率0.01算需要的大概是650亿个 bit。现在可用的是340亿,相差并不多,这样可能会使出错率上升些。另外如果这些urlip是一一对应的,就可以转换成ip,则大大简单了。
2.Hashing
适用范围:快速查找,删除的基本数据结构,通常需要总数据量可以放入内存
基本原理及要点:
hash函数选择,针对字符串,整数,排列,具体相应的hash方法。
碰撞处理,一种是open hashing,也称为拉链法;另一种就是closed hashing,也称开地址法,opened addressing。 (http://www.my400800.cn)
扩展:
d-left hashing中的d是多个的意思,我们先简化这个问题,看一看2-left hashing。2-left hashing指的是将一个哈希表分成长度相等的两半,分别叫做T1和T2,给T1和T2分别配备一个哈希函数,h1和h2。在存储一个新的key时,同时用两个哈希函数进行计算,得出两个地址h1[key]和h2[key]。这时需要检查T1中的h1[key]位置和T2中的h2[key]位置,哪一个位置已经存储的(有碰撞的)key比较多,然后将新key存储在负载少的位置。如果两边一样多,比如两个位置都为空或者都存储了一个key,就把新key 存储在左边的T1子表中,2-left也由此而来。在查找一个key时,必须进行两次hash,同时查找两个位置。
问题实例:
1).海量日志数据,提取出某日访问网络次数最多的那个IP。
IP的数目还是有限的,最多2^32个,所以可以考虑使用hash将ip直接存入内存,然后进行统计。
3.bit-map
适用范围:可进行数据的快速查找,判重,删除,一般来说数据范围是int的10倍以下
基本原理及要点:使用bit数组来表示某些元素是否存在,比如8位电话号码
扩展:bloom filter可以看做是对bit-map的扩展
问题实例:
1)已知某个文件内包含一些电话号码,每个号码为8位数字,统计不同号码的个数。
8位最多99 999 999,大概需要99m个bit,大概10几m字节的内存即可。
2)2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数。
将bit-map扩展一下,用2bit表示一个数即可,0表示未出现,1表示出现一次,2表示出现2次及以上。或者我们不用2bit来进行表示,我们用两个bit-map即可模拟实现这个2bit-map。
4.堆
适用范围:海量数据前n大,并且n比较小,堆可以放入内存
基本原理及要点:最大堆求前n小,最小堆求前n大。方法,比如求前n小,我们比较当前元素与最大堆里的最大元素,如果它小于最大元素,则应该替换那个最大元素。这样最后得到的n个元素就是最小的n个。适合大数据量,求前n小,n的大小比较小的情况,这样可以扫描一遍即可得到所有的前n元素,效率很高。
扩展:双堆,一个最大堆与一个最小堆结合,可以用来维护中位数。
问题实例:
1)100w个数中找最大的前100个数。
用一个100个元素大小的最小堆即可。
5.双层桶划分 ----其实本质上就是【分而治之】的思想,重在“分”的技巧上!
适用范围:第k大,中位数,不重复或重复的数字
基本原理及要点:因为元素范围很大,不能利用直接寻址表,所以通过多次划分,逐步确定范围,然后最后在一个可以接受的范围内进行。可以通过多次缩小,双层只是一个例子。
扩展:
问题实例:
1).2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数。
有点像鸽巢原理,整数个数为2^32,也就是,我们可以将这2^32个数,划分为2^8个区域(比如用单个文件代表一个区域),然后将数据分离到不同的区域,然后不同的区域在利用bitmap就可以直接解决了。也就是说只要有足够的磁盘空间,就可以很方便的解决。
2).5亿个int找它们的中位数。
这个例子比上面那个更明显。首先我们将int划分为2^16个区域,然后读取数据统计落到各个区域里的数的个数,之后我们根据统计结果就可以判断中位数落到那个区域,同时知道这个区域中的第几大数刚好是中位数。然后第二次扫描我们只统计落在这个区域中的那些数就可以了。
实际上,如果不是int是int64,我们可以经过3次这样的划分即可降低到可以接受的程度。即可以先将int64分成2^24个区域,然后确定区域的第几大数,在将该区域分成2^20个子区域,然后确定是子区域的第几大数,然后子区域里的数的个数只有2^20,就可以直接利用direct addr table进行统计了。
6.数据库索引
适用范围:大数据量的增删改查
基本原理及要点:利用数据的设计实现方法,对海量数据的增删改查进行处理。
扩展:
问题实例:
7.倒排索引(Inverted index)
适用范围:搜索引擎,关键字查询
基本原理及要点:为何叫倒排索引?一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。
以英文为例,下面是要被索引的文本:
T0 = "it is what it is"
T1 = "what is it"
T2 = "it is a banana"
我们就能得到下面的反向文件索引:
"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
检索的条件"what", "is" 和 "it" 将对应集合的交集。
正向索引开发出来用来存储每个文档的单词的列表。正向索引的查询往往满足每个文档有序频繁的全文查询和每个单词在校验文档中的验证这样的查询。在正向索引中,文档占据了中心的位置,每个文档指向了一个它所包含的索引项的序列。也就是说文档指向了它包含的那些单词,而反向索引则是单词指向了包含它的文档,很容易看到这个反向的关系。
扩展:
问题实例:文档检索系统,查询那些文件包含了某单词,比如常见的学术论文的关键字搜索。
8.外排序
适用范围:大数据的排序,去重
基本原理及要点:外排序的归并方法,置换选择 败者树原理,最优归并树
扩展:
问题实例:
1).有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16个字节,内存限制大小是1M。返回频数最高的100个词。
这个数据具有很明显的特点,词的大小为16个字节,但是内存只有1m做hash有些不够,所以可以用来排序。内存可以当输入缓冲区使用。
9.trie树
适用范围:数据量大,重复多,但是数据种类小可以放入内存
基本原理及要点:实现方式,节点孩子的表示方式
扩展:压缩实现。
问题实例:
1).有10个文件,每个文件1G, 每个文件的每一行都存放的是用户的query,每个文件的query都可能重复。要你按照query的频度排序 。
2).1000万字符串,其中有些是相同的(重复),需要把重复的全部去掉,保留没有重复的字符串。请问怎么设计和实现?
3).寻找热门查询:查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个,每个不超过255字节。
10.分布式处理 maprece
适用范围:数据量大,但是数据种类小可以放入内存
基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约。
扩展:
问题实例:
1).The canonical example application of MapRece is a process to count the appearances of
each different word in a set of documents:
void map(String name, String document):
// name: document name
// document: document contents
for each word w in document:
EmitIntermediate(w, 1);
void rece(String word, Iterator partialCounts):
// key: a word
// values: a list of aggregated partial counts
int result = 0;
for each v in partialCounts:
result += ParseInt(v);
Emit(result);
Here, each document is split in words, and each word is counted initially with a "1" value by
the Map function, using the word as the result key. The framework puts together all the pairs
with the same key and feeds them to the same call to Rece, thus this function just needs to
sum all of its input values to find the total appearances of that word.
2).海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10。
3).一共有N个机器,每个机器上有N个数。每个机器最多存O(N)个数并对它们操作。如何找到N^2个数的中数(median)?
经典问题分析
上千万or亿数据(有重复),统计其中出现次数最多的前N个数据,分两种情况:可一次读入内存,不可一次读入。
可用思路:trie树+堆,数据库索引,划分子集分别统计,hash,分布式计算,近似统计,外排序
所谓的是否能一次读入内存,实际上应该指去除重复后的数据量。如果去重后数据可以放入内存,我们可以为数据建立字典,比如通过 map,hashmap,trie,然后直接进行统计即可。当然在更新每条数据的出现次数的时候,我们可以利用一个堆来维护出现次数最多的前N个数据,当然这样导致维护次数增加,不如完全统计后在求前N大效率高。
如果数据无法放入内存。一方面我们可以考虑上面的字典方法能否被改进以适应这种情形,可以做的改变就是将字典存放到硬盘上,而不是内存,这可以参考数据库的存储方法。
当然还有更好的方法,就是可以采用分布式计算,基本上就是map-rece过程,首先可以根据数据值或者把数据hash(md5)后的值,将数据按照范围划分到不同的机子,最好可以让数据划分后可以一次读入内存,这样不同的机子负责处理各种的数值范围,实际上就是map。得到结果后,各个机子只需拿出各自的出现次数最多的前N个数据,然后汇总,选出所有的数据中出现次数最多的前N个数据,这实际上就是rece过程。
实际上可能想直接将数据均分到不同的机子上进行处理,这样是无法得到正确的解的。因为一个数据可能被均分到不同的机子上,而另一个则可能完全聚集到一个机子上,同时还可能存在具有相同数目的数据。比如我们要找出现次数最多的前100个,我们将1000万的数据分布到10台机器上,找到每台出现次数最多的前 100个,归并之后这样不能保证找到真正的第100个,因为比如出现次数最多的第100个可能有1万个,但是它被分到了10台机子,这样在每台上只有1千个,假设这些机子排名在1000个之前的那些都是单独分布在一台机子上的,比如有1001个,这样本来具有1万个的这个就会被淘汰,即使我们让每台机子选出出现次数最多的1000个再归并,仍然会出错,因为可能存在大量个数为1001个的发生聚集。因此不能将数据随便均分到不同机子上,而是要根据hash 后的值将它们映射到不同的机子上处理,让不同的机器处理一个数值范围。
而外排序的方法会消耗大量的IO,效率不会很高。而上面的分布式方法,也可以用于单机版本,也就是将总的数据根据值的范围,划分成多个不同的子文件,然后逐个处理。处理完毕之后再对这些单词的及其出现频率进行一个归并。实际上就可以利用一个外排序的归并过程。
另外还可以考虑近似计算,也就是我们可以通过结合自然语言属性,只将那些真正实际中出现最多的那些词作为一个字典,使得这个规模可以放入内存。
7. 中软卓越php大数据课程学哪些知识,谁说一下。
首先是向了解PHP呢还是大数据呢?不过想都拿下的话,不是一朝一夕的,步子迈得太大,不好,技在于精而后于多。
先说PHP:最基本其实就是web前端基础;第二的话包括PHP语言基础、Ajax、数据库强化、运行环境及配置、面向对象OOAD&UML等;第三就是主流框架的掌握了,Smarty、PDO等
再说大数据:
基础阶段:Linux、Docker、KVM、MySQL基础、Oracle基础、MongoDB、redis。
hadoop maprece hdfs yarn:hadoop:Hadoop 概念、版本、历史,HDFS工作原理,YARN介绍及组件介绍。
大数据存储阶段:hbase、hive、sqoop。
大数据架构设计阶段:Flume分布式、Zookeeper、Kafka。
大数据实时计算阶段:Mahout、Spark、storm。
大数据数据采集阶段:Python、Scala。
大数据商业实战阶段:实操企业大数据处理业务场景,分析需求、解决方案实施,综合技术实战应用。
8. 用Java写MapRece,用python和R,哪种更适合从事数据行业,做数据...
必然python啊,不过R也很好。python更加灵活,但是R是这一方面的功能一点不弱。但是我感觉很多算法拿python实现会更容易,而且python更好学,语法更简洁。具体看个人。