1. php+mysql開發的網站 如何使用hadoop+hbase+hive,能代替mysql么
hive和hbase是兩碼事兒吧,區別在於前者基於HQL,後者是NoSQL。hive將HQL(類SQL)翻譯成maprece任務,在hadoop的HDFS上運行。腳本語言(php、python等)可以通過thrift調用hive介面
2. mongodb 請問php中的這句mysql語法,在mongodb中如何寫。
查詢:
MySQL:
SELECT * FROM user
Mongo:
db.user.find()
MySQL:
SELECT * FROM user WHERE name = 'starlee'
Mongo:
db.user.find({『name' : 'starlee'})
插入:
MySQL:
INSERT INOT user (`name`, `age`) values ('starlee',25)
Mongo:
db.user.insert({『name' : 'starlee', 『age' : 25})
如果你想在MySQL里添加一個欄位,你必須:
ALTER TABLE user….
但在MongoDB里你只需要:
db.user.insert({『name' : 'starlee', 『age' : 25, 『email' : '[email protected]'})
刪除:
MySQL:
DELETE * FROM user
Mongo:
db.user.remove({})
MySQL:
DELETE FROM user WHERE age < 30
Mongo:
db.user.remove({『age' : {$lt : 30}})
$gt : > ; $gte : >= ; $lt : < ; $lte : <= ; $ne : !=
更新:
MySQL:
UPDATE user SET `age` = 36 WHERE `name` = 'starlee'
Mongo:
db.user.update({『name' : 'starlee'}, {$set : {『age' : 36}})
MySQL:
UPDATE user SET `age` = `age` + 3 WHERE `name` = 'starlee'
Mongo:
db.user.update({『name' : 'starlee'}, {$inc : {『age' : 3}})
MySQL:
SELECT COUNT(*) FROM user WHERE `name` = 'starlee'
Mongo:
db.user.find({『name' : 'starlee'}).count()
MySQL:
SELECT * FROM user limit 10,20
Mongo:
db.user.find().skip(10).limit(20)
MySQL:
SELECT * FROM user WHERE `age` IN (25, 35,45)
Mongo:
db.user.find({『age' : {$in : [25, 35, 45]}})
MySQL:
SELECT * FROM user ORDER BY age DESC
Mongo:
db.user.find().sort({『age' : -1})
MySQL:
SELECT DISTINCT(name) FROM user WHERE age > 20
Mongo:
db.user.distinct(『name', {『age': {$lt : 20}})
MySQL:
SELECT name, sum(marks) FROM user GROUP BY name
Mongo:
db.user.group({
key : {『name' : true},
cond: {『name' : 『foo'},
rece: function(obj,prev) { prev.msum += obj.marks; },
initial: {msum : 0}
});
MySQL:
SELECT name FROM user WHERE age < 20
Mongo:
db.user.find(『this.age < 20′, {name : 1})
發現很多人在搜MongoDB循環插入數據,下面把MongoDB循環插入數據的方法添加在下面:
for(var i=0;i<100;i++)db.test.insert({uid:i,uname:'nosqlfan'+i});
上面一次性插入一百條數據,大概結構如下:
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6b8″), 「uid」 : 55, 「uname」 : 「nosqlfan55″ }
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6b9″), 「uid」 : 56, 「uname」 : 「nosqlfan56″ }
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6ba」), 「uid」 : 57, 「uname」 : 「nosqlfan57″ }
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6bb」), 「uid」 : 58, 「uname」 : 「nosqlfan58″ }
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6bc」), 「uid」 : 59, 「uname」 : 「nosqlfan59″ }
{ 「_id」 : ObjectId(「4c876e519e86023a30dde6bd」), 「uid」 : 60, 「uname」 : 「nosqlfan60″ }
簡易對照表
SQL Statement Mongo Query Language Statement
CREATE TABLE USERS (a Number, b Number) implicit; can be done explicitly
INSERT INTO USERS VALUES(1,1) db.users.insert({a:1,b:1})
SELECT a,b FROM users db.users.find({}, {a:1,b:1})
SELECT * FROM users db.users.find()
SELECT * FROM users WHERE age=33 db.users.find({age:33})
SELECT a,b FROM users WHERE age=33 db.users.find({age:33}, {a:1,b:1})
SELECT * FROM users WHERE age=33 ORDER BY name db.users.find({age:33}).sort({name:1})
SELECT * FROM users WHERE age>33 db.users.find({'age':{$gt:33}})})
SELECT * FROM users WHERE age<33 db.users.find({'age':{$lt:33}})})
SELECT * FROM users WHERE name LIKE "%Joe%" db.users.find({name:/Joe/})
SELECT * FROM users WHERE name LIKE "Joe%" db.users.find({name:/^Joe/})
SELECT * FROM users WHERE age>33 AND age<=40 db.users.find({'age':{$gt:33,$lte:40}})})
SELECT * FROM users ORDER BY name DESC db.users.find().sort({name:-1})
CREATE INDEX myindexname ON users(name) db.users.ensureIndex({name:1})
CREATE INDEX myindexname ON users(name,ts DESC) db.users.ensureIndex({name:1,ts:-1})
SELECT * FROM users WHERE a=1 and b='q' db.users.find({a:1,b:'q'})
SELECT * FROM users LIMIT 10 SKIP 20 db.users.find().limit(10).skip(20)
SELECT * FROM users WHERE a=1 or b=2 db.users.find( { $or : [ { a : 1 } , { b : 2 } ] } )
SELECT * FROM users LIMIT 1 db.users.findOne()
EXPLAIN SELECT * FROM users WHERE z=3 db.users.find({z:3}).explain()
SELECT DISTINCT last_name FROM users db.users.distinct('last_name')
SELECT COUNT(*y) FROM users db.users.count()
SELECT COUNT(*y) FROM users where AGE > 30 db.users.find({age: {'$gt': 30}}).count()
SELECT COUNT(AGE) from users db.users.find({age: {'$exists': true}}).count()
UPDATE users SET a=1 WHERE b='q' db.users.update({b:'q'}, {$set:{a:1}}, false, true)
UPDATE users SET a=a+2 WHERE b='q' db.users.update({b:'q'}, {$inc:{a:2}}, false, true)
DELETE FROM users WHERE z="abc" db.users.remove({z:'abc'});
###################################################
一、操作符
操作符相信大家肯定都知道了,就是等於、大於、小於、不等於、大於等於、小於等於,但是在mongodb里不能直接使用這些操作符。在mongodb里的操作符是這樣表示的:
(1) $gt > (大於)
(2) $lt< (小於)
(3) $gte>= (大於等於)
(4) $lt<= (小於等於)
(5) $ne!= (不等於)
(6) $inin (包含)
(7) $ninnot in (不包含)
(8) $existsexist (欄位是否存在)
(9) $inc對一個數字欄位field增加value
(10) $set就是相當於sql的set field = value
(11) $unset就是刪除欄位
(12) $push把value追加到field裡面去,field一定要是數組類型才行,如果field不存在,會新增一個數組類型加進去
(13) $pushAll同$push,只是一次可以追加多個值到一個數組欄位內
(14) $addToSet增加一個值到數組內,而且只有當這個值不在數組內才增加。
(15) $pop刪除最後一個值:{ $pop : { field : 1 } }刪除第一個值:{ $pop : { field : -1 } }注意,只能刪除一個值,也就是說只能用1或-1,而不能用2或-2來刪除兩條。mongodb 1.1及以後的版本才可以用
(16) $pull從數組field內刪除一個等於value值
(17) $pullAll同$pull,可以一次刪除數組內的多個值
(18) $ 操作符是他自己的意思,代表按條件找出的數組裡面某項他自己。這個比較坳口,就不說了。
二、CURD 增、改、讀、刪
增加
復制代碼代碼如下:
db.collection->insert({'name' => 'caleng', 'email' => 'admin#admin.com'});
是不是灰常簡單呀,對就是這么簡單,它沒有欄位的限制,你可以隨意起名,並插入數據
復制代碼代碼如下:
db.collection.update( { "count" : { $gt : 1 } } , { $set : { "test2" : "OK"} } ); 只更新了第一條大於1記錄
db.collection.update( { "count" : { $gt : 3 } } , { $set : { "test2" : "OK"} },false,true ); 大於3的記錄 全更新了
db.collection.update( { "count" : { $gt : 4 } } , { $set : { "test5" : "OK"} },true,false ); 大於4的記錄 只加進去了第一條
db.collection.update( { "count" : { $gt : 5 } } , { $set : { "test5" : "OK"} },true,true ); 大於5的記錄 全加進去
查詢
復制代碼代碼如下:
db.collection.find(array('name' => 'ling'), array('email'=>'[email protected]'))
db.collection.findOne(array('name' => 'ling'), array('email''[email protected]'))
大家可以看到查詢我用了兩種不同的寫法,這是為什麼,其實這跟做菜是一樣的,放不同的調料,炒出的菜是不同的味道。下面給大家說一下,這兩種調料的不同作用。
findOne()只返回一個文檔對象,find()返回一個集合列表。
也就是說比如,我們只想查某一條特定數據的詳細信息的話,我們就可以用findOne();
如果想查詢某一組信息,比如說一個新聞列表的時候,我們就可以作用find();
那麼我想大家這時一定會想到我想對這一個列表排序呢,no problem mongodb會為您全心全意服務
復制代碼代碼如下:
db.collection.find().sort({age:1}); //按照age正序排列
db.collection.find().sort({age:-1}); //按照age倒序排列
db.collection.count(); //得到數據總數
db.collection.limit(1); //取數據的開始位置
db.collection.skip(10); //取數據的結束位置
//這樣我們就實現了一個取10條數據,並排序的操作。
刪除
刪除有兩個操作 remove()和drop()
復制代碼代碼如下:
db.collection.remove({"name",'jerry'}) //刪除特定數據
db.collection.drop() //刪除集合內的所有數據
distinct操作
復制代碼代碼如下:
db.user.distinct('name', {'age': {$lt : 20}})
2. 熟悉MongoDB的數據操作語句,類sql
資料庫操作語法
mongo --path
db.AddUser(username,password) 添加用戶
db.auth(usrename,password) 設置資料庫連接驗證
db.cloneDataBase(fromhost) 從目標伺服器克隆一個資料庫
db.commandHelp(name) returns the help for the command
db.Database(fromdb,todb,fromhost) 復制資料庫fromdb---源資料庫名稱,todb---目標資料庫名稱,fromhost---源資料庫伺服器地址
db.createCollection(name,{size:3333,capped:333,max:88888}) 創建一個數據集,相當於一個表
db.currentOp() 取消當前庫的當前操作
db.dropDataBase() 刪除當前資料庫
db.eval(func,args) run code server-side
db.getCollection(cname) 取得一個數據集合,同用法:db['cname'] or db.cname
db.getCollenctionNames() 取得所有數據集合的名稱列表
db.getLastError() 返回最後一個錯誤的提示消息
db.getLastErrorObj() 返回最後一個錯誤的對象
db.getMongo() 取得當前伺服器的連接對象get the server connection object
db.getMondo().setSlaveOk() allow this connection to read from then nonmaster membr of a replica pair
db.getName() 返回當操作資料庫的名稱
db.getPrevError() 返回上一個錯誤對象
db.getProfilingLevel() ?什麼等級
db.getReplicationInfo() ?什麼信息
db.getSisterDB(name) get the db at the same server as this onew
db.killOp() 停止(殺死)在當前庫的當前操作
db.printCollectionStats() 返回當前庫的數據集狀態
db.printReplicationInfo()
db.printSlaveReplicationInfo()
db.printShardingStatus() 返回當前資料庫是否為共享資料庫
db.removeUser(username) 刪除用戶
db.repairDatabase() 修復當前資料庫
db.resetError()
db.runCommand(cmdObj) run a database command. if cmdObj is a string, turns it into {cmdObj:1}
db.setProfilingLevel(level) 0=off,1=slow,2=all
db.shutdownServer() 關閉當前服務程序
db.version() 返回當前程序的版本信息
數據集(表)操作語法
db.linlin.find({id:10}) 返回linlin數據集ID=10的數據集
db.linlin.find({id:10}).count() 返回linlin數據集ID=10的數據總數
db.linlin.find({id:10}).limit(2) 返回linlin數據集ID=10的數據集從第二條開始的數據集
db.linlin.find({id:10}).skip(8) 返回linlin數據集ID=10的數據集從0到第八條的數據集
db.linlin.find({id:10}).limit(2).skip(8) 返回linlin數據集ID=1=的數據集從第二條到第八條的數據
db.linlin.find({id:10}).sort() 返回linlin數據集ID=10的排序數據集
db.linlin.findOne([query]) 返回符合條件的一條數據
db.linlin.getDB() 返回此數據集所屬的資料庫名稱
db.linlin.getIndexes() 返回些數據集的索引信息
db.linlin.group({key:...,initial:...,rece:...[,cond:...]})
db.linlin.mapRece(mayFunction,receFunction,<optional params>)
db.linlin.remove(query) 在數據集中刪除一條數據
db.linlin.renameCollection(newName) 重命名些數據集名稱
db.linlin.save(obj) 往數據集中插入一條數據
db.linlin.stats() 返回此數據集的狀態
db.linlin.storageSize() 返回此數據集的存儲大小
db.linlin.totalIndexSize() 返回此數據集的索引文件大小
db.linlin.totalSize() 返回些數據集的總大小
db.linlin.update(query,object[,upsert_bool]) 在此數據集中更新一條數據
db.linlin.validate() 驗證此數據集
db.linlin.getShardVersion() 返回數據集共享版本號
3. hadoop maprece 分桶
老大之前在網路,由於shell 和awk 寫的溜,所以他總是推薦 使用shell 和awk 來跑 hadoop streaming 【hs】,hs還真是一個好東西,不需要編譯,想怎麼執行就怎麼整,還不需要IDE,只要你你記住主要的執行內容就完全沒有問題,hs也支持 python ,這樣一來 hadoop 可以讓所有人都可以使用了。
可以參考這個文章
https://www.programcreek.com/java-api-examples/index.php?api=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
https://github.com/roanjain/hadoop-partitioner/blob/master/src/com/mapred/partitioner/PartitionerDemo.java
老大講在網路 執行hs 時經常分桶,用來控制 map 和 Rece 任務的個數,確實還是有幫助的,可以控制 桶的個數,也可以控制資源的使用情況,比如 cpu 和內存
一篇干貨很多的文章可以參考 http://www.cnblogs.com/van19/p/5756448.html
Hadoop用於對key的排序和分桶的設置選項比較多和復雜,目前在公司內主要以KeyFieldBasePartitioner和KeyFieldBaseComparator被hadoop用戶廣泛使用。
基本概念:
Partition:分桶過程,用戶輸出的key經過partition分發到不同的rece里,因而partitioner就是分桶器,一般用平台默認的hash分桶也可以自己指定。
Key:是需要排序的欄位,相同分桶&&相同key的行排序到一起。
下面以一個簡單的文本作為例子,通過搭配不同的參數跑出真實作業的結果來演示這些參數的使用方法。
假設map的輸出是這樣以點號分隔的若干行:
d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3
我們知道,在streaming模式默認hadoop會把map輸出的一行中遇到的第一個設定的欄位分隔符前面的部分作為key,後面的作為 value,如果輸出的一行中沒有指定的欄位分隔符,則整行作為key,value被設置為空字元串。 那麼對於上面的輸出,如果想用map輸出的前2個欄位作為key,後面欄位作為value,並且不使用hadoop默認的「\t」欄位分隔符,而是根據該 文本特點使用「.」來分割,需要如何設置呢
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf mapred.rece.tasks=5
結果:
e.9 4.5
f.8 3.3
——————
d.1 5.23
e.5 1.23
e.5 1.45
e.5 9.22
——————
a.7 2.6
總結:
從結果可以看出,在rece的輸出中,前兩列和後兩列用「\t」分隔,證明map輸出時確實把用「.」分隔的前兩列作為key,後面的作為 value。並且前兩列相同的「e.5」開頭的三行被分到了同一個rece中,證明確實以前兩列作為key整體做的partition。
stream.num.map.output.key.fields 設置map輸出的前幾個欄位作為key
stream.map.output.field.separator 設置map輸出的欄位分隔符
KeyFieldBasePartitioner的用法
如果想要靈活設置key中用於partition的欄位,而不是把整個key都用來做partition。就需要使用hadoop中的org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner了。
下面只用第一列作partition,但依然使用前兩列作為key。
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=2
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf num.key.fields.for.partition=1
-jobconf mapred.rece.tasks=5
結果:
d.1 5.23
——————
e.5 1.23
e.5 1.45
e.5 9.22
e.9 4.5
——————
a.7 2.6
f.8 3.3
總結:
從結果可以看出,這次「e」開頭的行都被分到了一個桶內,證明做partition是以第一列為準的,而key依然是前兩列。並且在同一個 partition內,先按照第一列排序,第一列相同的,按照第二列排序。這里要注意的是使用 map.output.key.field.separator來指定key內欄位的分隔符,這個參數是KeyFieldBasePartitioner 和KeyFieldBaseComparator所特有的。
map.output.key.field.separator 設置key內的欄位分隔符
num.key.fields.for.partition 設置key內前幾個欄位用來做partition
事實上KeyFieldBasePartitioner還有一個高級參數 mapred.text.key.partitioner.options,這個參數可以認為是 num.key.fields.for.partition的升級版,它可以指定不僅限於key中的前幾個欄位用做partition,而是可以單獨指定 key中某個欄位或者某幾個欄位一起做partition。
比如上面的需求用mapred.text.key.partitioner.options表示為
mapred.text.key.partitioner.options=-k1,1
注意mapred.text.key.partitioner.options和num.key.fields.for.partition不需要一起使用,一起使用則以num.key.fields.for.partition為准。
這里再舉一個例子,使用mapred.text.key.partitioner.options
bin/hadoop streaming -input /tmp/comp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf stream.num.map.output.key.fields=3
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k2,3
-jobconf mapred.rece.tasks=5
結果:
e.9.4 5
——————
a.7.2 6
e.5.9 22
——————
d.1.5 23
e.5.1 23
e.5.1 45
f.8.3 3
可見,這次是以前3列作為key的,而partition則以key中的第2-3列,因此以「e」開頭的行被拆散了,但第二三列相同的「5,1」被 分到一個桶內。在同一個桶內,依然是從key的第一列開始排序的,注意,KeyFieldBasePartitioner隻影響分桶並不影響排序。
mapred.text.key.partitioner.options 設置key內某個欄位或者某個欄位范圍用做partition
KeyFieldBaseComparator的用法
首先簡單解釋一下hadoop框架中key的comparator,對於hadoop所識別的所有java的key類型(在框架看來key的類型只 能是java的),很多類型都自定義了基於位元組的比較器,比如Text,IntWritable等等,如果不特別指定比較器而使用這些類型默認的,則會將 key作為一個整體的位元組數組來進行比較。而KeyFieldBaseComparator則相當於是一個可以靈活設置比較位置的高級比較器,但是它並沒 有自己獨有的比較邏輯,而是使用默認Text的基於字典序或者通過-n來基於數字比較。
之前的例子使用KeyFieldBasePartitioner自定義了使用key中的部分欄位做partition,現在我們通過org.apache.hadoop.mapred.lib.KeyFieldBasedComparator來自定義使用key中的部分欄位做比較。
這次把前四列都作為key,前兩列做partition,排序依據優先依據第三列正序(文本序),第四列逆序(數字序)的組合排序。
bin/hadoop streaming -input /tmpcomp-test.txt -output /tmp/xx -mapper cat -recer cat
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
-jobconf stream.num.map.output.key.fields=4
-jobconf stream.map.output.field.separator=.
-jobconf map.output.key.field.separator=.
-jobconf mapred.text.key.partitioner.options=-k1,2
-jobconf mapred.text.key.comparator.options="-k3,3 -k4nr"
-jobconf mapred.rece.tasks=5
結果:
e.5.1.45
e.5.1.23
d.1.5.23
e.5.9.22
——————
a.7.2.6
——————
f.8.3.3
e.9.4.5
總結:
從結果可以看出,符合預期的按照先第三列文本正序,然後第四列基於數字逆序的排序。
另外注意,如果這種寫法
mapred.text.key.comparator.options=」-k2″
則會從第二列開始,用字典序一直比較到key的最後一個位元組。所以對於希望准確排序欄位的需求,還是使用「k2,2」這種確定首尾范圍的形式更好。另外如果給定的key中某些行需要排序的列數不夠時,會比較到最後一列,缺列的行默認缺少的那一列排序值最小。
mapred.text.key.comparator.options 設置key中需要比較的欄位或位元組范圍
4. 如何在Hadoop上編寫MapRece程序
用戶配置並將一個Hadoop作業提到Hadoop框架中,Hadoop框架會把這個作業分解成一系列map tasks 和rece tasks。Hadoop框架負責task分發和執行,結果收集和作業進度監控。
在編寫MapRece程序時,用戶分別通過InputFormat和OutputFormat指定輸入和輸出格式,並定義Mapper和Recer指定map階段和rece階段的要做的工作。在Mapper或者Recer中,用戶只需指定一對key/value的處理邏輯,Hadoop框架會自動順序迭代解析所有key/value,並將每對key/value交給Mapper或者Recer處理。表面上看來,Hadoop限定數據格式必須為key/value形式,過於簡單,很難解決復雜問題,實際上,可以通過組合的方法使key或者value(比如在key或者value中保存多個欄位,每個欄位用分隔符分開,或者value是個序列化後的對象,在Mapper中使用時,將其反序列化等)保存多重信息,以解決輸入格式較復雜的應用。
2.2 用戶的工作
用戶編寫MapRece需要實現的類或者方法有:
(1) InputFormat介面
用戶需要實現該介面以指定輸入文件的內容格式。該介面有兩個方法
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
其中getSplits函數將所有輸入數據分成numSplits個split,每個split交給一個map task處理。getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每個record解析成key/value對。
Hadoop本身提供了一些InputFormat:
(2)Mapper介面
用戶需繼承Mapper介面實現自己的Mapper,Mapper中必須實現的函數是
void map(K1 key,
V1 value,
OutputCollector<K2,V2> output,
Reporter reporter
) throws IOException
其中,<K1 V1>是通過Inputformat中的RecordReader對象解析處理 的,OutputCollector獲取map()的輸出結果,Reporter保存了當前task處理進度。
Hadoop本身提供了一些Mapper供用戶使用:
(3)Partitioner介面
用戶需繼承該介面實現自己的Partitioner以指定map task產生的key/value對交給哪個rece task處理,好的Partitioner能讓每個rece task處理的數據相近,從而達到負載均衡。Partitioner中需實現的函數是
getPartition( K2 key, V2 value, int numPartitions)
該函數返回<K2 V2>對應的rece task ID。
用戶如果不提供Partitioner,Hadoop會使用默認的(實際上是個hash函數)。
(4)Combiner
Combiner使得map task與rece task之間的數據傳輸量大大減小,可明顯提高性能。大多數情況下,Combiner與Recer相同。
(5)Recer介面
用戶需繼承Recer介面實現自己的Recer,Recer中必須實現的函數是
void rece(K2 key,
Iterator<V2> values,
OutputCollector<K3,V3> output,
Reporter reporter
) throws IOException
Hadoop本身提供了一些Recer供用戶使用:
(6)OutputFormat
用戶通過OutputFormat指定輸出文件的內容格式,不過它沒有split。每個rece task將其數據寫入自己的文件,文件名為part-nnnnn,其中nnnnn為rece task的ID。
Hadoop本身提供了幾個OutputFormat:
3. 分布式緩存
Haoop中自帶了一個分布式緩存,即DistributedCache對象,方便map task之間或者rece task之間共享一些信息,比如某些實際應用中,所有map task要讀取同一個配置文件或者字典,則可將該配置文件或者字典放到分布式緩存中。
4. 多語言編寫MapRece作業
Hadoop採用java編寫,因而Hadoop天生支持java語言編寫作業,但在實際應用中,有時候,因要用到非java的第三方庫或者其他原因,要採用C/C++或者其他語言編寫MapRece作業,這時候可能要用到Hadoop提供的一些工具。
如果你要用C/C++編寫MpaRece作業,可使用的工具有Hadoop Streaming或者Hadoop Pipes。
如果你要用Python編寫MapRece作業,可以使用Hadoop Streaming或者Pydoop。
如果你要使用其他語言,如shell,php,ruby等,可使用Hadoop Streaming。
關於Hadoop Streaming編程,可參見我的這篇博文:《Hadoop Streaming編程》(http://dongxicheng.org/maprece/hadoop-streaming-programming/ )
關於Pydoop編程,可參見其官方網站:http://sourceforge.net/projects/pydoop/
關於Hadoop pipes編程,可參見《Hadoop Tutorial 2.2 — Running C++ Programs on Hadoop》。
5. 編程方式比較
(1)java。 Hadoop支持的最好最全面的語言,而且提供了很多工具方便程序員開發。
(2)Hadoop Streaming。 它最大的優點是支持多種語言,但效率較低,rece task需等到map 階段完成後才能啟動;它不支持用戶自定義InputFormat,如果用戶想指定輸入文件格式,可使用java語言編寫或者在命令行中指定分隔符;它採用標准輸入輸出讓C/C++與java通信,因而只支持text數據格式。
(3)Hadoop Pipes。 專門為C/C++語言設計,由於其採用了socket方式讓C/C++與java通信,因而其效率較低(其優勢在於,但作業需要大量,速度很快)。它支持用戶(用C/C++)編寫RecordReader。
(4)Pydoop。它是專門方便python程序員編寫MapRece作業設計的,其底層使用了Hadoop Streaming介面和libhdfs庫。
6. 總結
Hadoop使得分布式程序的編寫變得異常簡單,很多情況下,用戶只需寫map()和rece()兩個函數即可(InputFormat,Outputformat可用系統預設的)。正是由於Hadoop編程的簡單性,越來越多的公司或者研究單位開始使用Hadoop。
7. 注意事項
(1) Hadoop默認的InputFormat是TextInputFormat,它將文件中的每一行作為value,該行的偏移量為key。
(2)如果你的輸入是文本文件,且每一行包括key,value,則可使用Hadoop中自帶的KeyValueTextInputFormat,它默認的每行是一個key/value對,且key與value的分割如為TAB(』\t『),如果想修改key/value之間的分隔符,如將分割符改為「,」,可使用conf.set(「key.value.separator.in.input.line」,」,」);或者-D key.value.separator.in.input.line=,。
8. 參考資料
(1) 書籍 Jason Venner 《Pro Hadoop》
(2) 書籍 Chuck Lam 《Hadoop in Action》
(3) 書籍 Tom White 《Hadoop The Definitive Guide》
(4) Hadoop分布式緩存例子:書籍《Hadoop The Definitive Guide》 第八章 最後一節「Distributed Cache」
(5) Hadoop Pipes例子:源代碼中$HAOOOP_HOME/src/examples/pipes路徑下。
(6) Hadoop Pipes資料:
http://developer.yahoo.com/hadoop/tutorial/mole4.html#pipes
http://wiki.apache.org/hadoop/C%2B%2BWordCount
5. 如何在Hadoop中使用Streaming編寫MapRece
Michael G. Noll在他的Blog中提到如何在Hadoop中用Python編寫MapRece程序,韓國的gogamza在其Bolg中也提到如何用C編寫MapRece程序(我稍微修改了一下原程序,因為他的Map對單詞切分使用tab鍵)。我合並他們兩人的文章,也讓國內的Hadoop用戶能夠使用別的語言來編寫MapRece程序。
首先您得配好您的Hadoop集群,這方面的介紹網上比較多,這兒給個鏈接(Hadoop學習筆記二 安裝部署)。Hadoop Streaming幫 助我們用非Java的編程語言使用MapRece,Streaming用STDIN (標准輸入)和STDOUT (標准輸出)來和我們編寫的Map和Rece進行數據的交換數據。任何能夠使用STDIN和STDOUT都可以用來編寫MapRece程序,比如 我們用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。
我們還是使用Hadoop的例子WordCount來做示範如何編寫MapRece,在WordCount的例子中我們要解決計算在一批文檔中每一個單詞的出現頻率。首先我們在Map程序中會接受到這批文檔每一行的數據,然後我們編寫的Map程序把這一行按空格切開成一個數組。並對這個數組遍歷按" 1"用標準的輸出輸出來,代表這個單詞出現了一次。在Rece中我們來統計單詞的出現頻率。
Python Code
Map: mapper.py
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Rece: recer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
C Code
Map: Mapper.c
#include
#include
#include
#include
#define BUF_SIZE 2048
#define DELIM "\n"
int main(int argc, char *argv[]){
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
if(buffer[len-1] == '\n')
buffer[len-1] = 0;
char *querys = index(buffer, ' ');
char *query = NULL;
if(querys == NULL) continue;
querys += 1; /* not to include '\t' */
query = strtok(buffer, " ");
while(query){
printf("%s\t1\n", query);
query = strtok(NULL, " ");
}
}
return 0;
}
h>h>h>h>
Rece: Recer.c
#include
#include
#include
#include
#define BUFFER_SIZE 1024
#define DELIM "\t"
int main(int argc, char *argv[]){
char strLastKey[BUFFER_SIZE];
char strLine[BUFFER_SIZE];
int count = 0;
*strLastKey = '\0';
*strLine = '\0';
while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
char *strCurrKey = NULL;
char *strCurrNum = NULL;
strCurrKey = strtok(strLine, DELIM);
strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
if( strLastKey[0] == '\0'){
strcpy(strLastKey, strCurrKey);
}
if(strcmp(strCurrKey, strLastKey)){
printf("%s\t%d\n", strLastKey, count);
count = atoi(strCurrNum);
}else{
count += atoi(strCurrNum);
}
strcpy(strLastKey, strCurrKey);
}
printf("%s\t%d\n", strLastKey, count); /* flush the count */
return 0;
}
h>h>h>h>
首先我們調試一下源碼:
chmod +x mapper.py
chmod +x recer.py
echo "foo foo quux labs foo bar quux" | ./mapper.py | ./recer.py
bar 1
foo 3
labs 1
quux 2
g++ Mapper.c -o Mapper
g++ Recer.c -o Recer
chmod +x Mapper
chmod +x Recer
echo "foo foo quux labs foo bar quux" | ./Mapper | ./Recer
bar 1
foo 2
labs 1
quux 1
foo 1
quux 1
你可能看到C的輸出和Python的不一樣,因為Python是把他放在詞典里了.我們在Hadoop時,會對這進行排序,然後相同的單詞會連續在標准輸出中輸出.
在Hadoop中運行程序
首先我們要下載我們的測試文檔wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我們把文檔存放在/tmp/doc這個目錄下.拷貝測試文檔到HDFS中.
bin/hadoop dfs -FromLocal /tmp/doc doc
運行 bin/hadoop dfs -ls doc 看看拷貝是否成功.接下來我們運行我們的MapRece的Job.
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/Mapper\
-recer /home/hadoop/Recer -input doc/* -output c-output -jobconf mapred.rece.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
08/03/04 19:03:14 INFO streaming.StreamJob: map 0% rece 0%
08/03/04 19:03:15 INFO streaming.StreamJob: map 33% rece 0%
08/03/04 19:03:16 INFO streaming.StreamJob: map 100% rece 0%
08/03/04 19:03:19 INFO streaming.StreamJob: map 100% rece 100%
08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py\
-recer /home/hadoop/recer.py -input doc/* -output python-output -jobconf mapred.rece.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
08/03/04 19:05:42 INFO streaming.StreamJob: map 0% rece 0%
08/03/04 19:05:48 INFO streaming.StreamJob: map 33% rece 0%
08/03/04 19:05:49 INFO streaming.StreamJob: map 100% rece 0%
08/03/04 19:05:52 INFO streaming.StreamJob: map 100% rece 100%
08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output
當Job提交後我們還能夠在web的界面http://localhost:50030/看到每一個工作的運行情況。
當Job工作完成後我們能夠在c-output和python-output看到一些文件
bin/hadoop dfs -ls c-output
輸入下面的命令我們能夠看到我們運行完MapRece的結果
bin/hadoop dfs -cat c-output/part-00000
用Hadoop Streaming運行MapRece會比較用Java的代碼要慢,因為有兩方面的原因:
使用 Java API >> C Streaming >> Perl Streaming 這樣的一個流程運行會阻塞IO.
不像Java在運行Map後輸出結果有一定數量的結果集就啟動Rece的程序,用Streaming要等到所有的Map都運行完畢後才啟動Rece
如果用Python編寫MapRece的話,另一個可選的是使用Jython來轉編譯Pyhton為Java的原生碼.另外對於C程序員更好的選擇是使用Hadoop新的C++ MapRece API Pipes來編寫.不管怎樣,畢竟Hadoop提供了一種不使用Java來進行分布式運算的方法.
下面是從http://www.lunchpauze.com/2007/10/writing-hadoop-maprece-program-in-php.html頁面中摘下的用php編寫的MapRece程序,供php程序員參考:
Map: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN (standard input)
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace and lowercase
$line = strtolower(trim($line));
// split the line into words while removing any empty string
$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
// increase counters
foreach ($words as $word) {
$word2count[$word] += 1;
}
}
// write the results to STDOUT (standard output)
// what we output here will be the input for the
// Rece step, i.e. the input for recer.py
foreach ($word2count as $word => $count) {
// tab-delimited
echo $word, chr(9), $count, PHP_EOL;
}
?>
Rece: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace
$line = trim($line);
// parse the input we got from mapper.php
list($word, $count) = explode(chr(9), $line);
// convert count (currently a string) to int
$count = intval($count);
// sum counts
if ($count > 0) $word2count[$word] += $count;
}
// sort the words lexigraphically
//
// this set is NOT required, we just do it so that our
// final output will look more like the official Hadoop
// word count examples
ksort($word2count);
// write the results to STDOUT (standard output)
foreach ($word2count as $word => $count) {
echo $word, chr(9), $count, PHP_EOL;
}
?>
作者:馬士華 發表於:2008-03-05
6. PHP的演算法可以實現大數據分析嗎
1.Bloom filter
適用范圍:可以用來實現數據字典,進行數據的判重,或者集合求交集
基本原理及要點:
對於原理來說很簡單,位數組+k個獨立hash函數。將hash函數對應的值的位數組置1,查找時如果發現所有hash函數對應位都是1說明存在,很明顯這個過程並不保證查找的結果是100%正確的。同時也不支持刪除一個已經插入的關鍵字,因為該關鍵字對應的位會牽動到其他的關鍵字。所以一個簡單的改進就是 counting Bloom filter,用一個counter數組代替位數組,就可以支持刪除了。
還有一個比較重要的問題,如何根據輸入元素個數n,確定位數組m的大小及hash函數個數。當hash函數個數k=(ln2)*(m/n)時錯誤率最小。在錯誤率不大於E的情況下,m至少要等於n*lg(1/E)才能表示任意n個元素的集合。但m還應該更大些,因為還要保證bit數組里至少一半為 0,則m 應該>=nlg(1/E)*lge 大概就是nlg(1/E)1.44倍(lg表示以2為底的對數)。
舉個例子我們假設錯誤率為0.01,則此時m應大概是n的13倍。這樣k大概是8個。
注意這里m與n的單位不同,m是bit為單位,而n則是以元素個數為單位(准確的說是不同元素的個數)。通常單個元素的長度都是有很多bit的。所以使用bloom filter內存上通常都是節省的。
擴展:
Bloom filter將集合中的元素映射到位數組中,用k(k為哈希函數個數)個映射位是否全1表示元素在不在這個集合中。Counting bloom filter(CBF)將位數組中的每一位擴展為一個counter,從而支持了元素的刪除操作。Spectral Bloom Filter(SBF)將其與集合元素的出現次數關聯。SBF採用counter中的最小值來近似表示元素的出現頻率。
問題實例:給你A,B兩個文件,各存放50億條URL,每條URL佔用64位元組,內存限制是4G,讓你找出A,B文件共同的URL。如果是三個乃至n個文件呢?
根據這個問題我們來計算下內存的佔用,4G=2^32大概是40億*8大概是340億,n=50億,如果按出錯率0.01算需要的大概是650億個 bit。現在可用的是340億,相差並不多,這樣可能會使出錯率上升些。另外如果這些urlip是一一對應的,就可以轉換成ip,則大大簡單了。
2.Hashing
適用范圍:快速查找,刪除的基本數據結構,通常需要總數據量可以放入內存
基本原理及要點:
hash函數選擇,針對字元串,整數,排列,具體相應的hash方法。
碰撞處理,一種是open hashing,也稱為拉鏈法;另一種就是closed hashing,也稱開地址法,opened addressing。 (http://www.my400800.cn)
擴展:
d-left hashing中的d是多個的意思,我們先簡化這個問題,看一看2-left hashing。2-left hashing指的是將一個哈希表分成長度相等的兩半,分別叫做T1和T2,給T1和T2分別配備一個哈希函數,h1和h2。在存儲一個新的key時,同時用兩個哈希函數進行計算,得出兩個地址h1[key]和h2[key]。這時需要檢查T1中的h1[key]位置和T2中的h2[key]位置,哪一個位置已經存儲的(有碰撞的)key比較多,然後將新key存儲在負載少的位置。如果兩邊一樣多,比如兩個位置都為空或者都存儲了一個key,就把新key 存儲在左邊的T1子表中,2-left也由此而來。在查找一個key時,必須進行兩次hash,同時查找兩個位置。
問題實例:
1).海量日誌數據,提取出某日訪問網路次數最多的那個IP。
IP的數目還是有限的,最多2^32個,所以可以考慮使用hash將ip直接存入內存,然後進行統計。
3.bit-map
適用范圍:可進行數據的快速查找,判重,刪除,一般來說數據范圍是int的10倍以下
基本原理及要點:使用bit數組來表示某些元素是否存在,比如8位電話號碼
擴展:bloom filter可以看做是對bit-map的擴展
問題實例:
1)已知某個文件內包含一些電話號碼,每個號碼為8位數字,統計不同號碼的個數。
8位最多99 999 999,大概需要99m個bit,大概10幾m位元組的內存即可。
2)2.5億個整數中找出不重復的整數的個數,內存空間不足以容納這2.5億個整數。
將bit-map擴展一下,用2bit表示一個數即可,0表示未出現,1表示出現一次,2表示出現2次及以上。或者我們不用2bit來進行表示,我們用兩個bit-map即可模擬實現這個2bit-map。
4.堆
適用范圍:海量數據前n大,並且n比較小,堆可以放入內存
基本原理及要點:最大堆求前n小,最小堆求前n大。方法,比如求前n小,我們比較當前元素與最大堆里的最大元素,如果它小於最大元素,則應該替換那個最大元素。這樣最後得到的n個元素就是最小的n個。適合大數據量,求前n小,n的大小比較小的情況,這樣可以掃描一遍即可得到所有的前n元素,效率很高。
擴展:雙堆,一個最大堆與一個最小堆結合,可以用來維護中位數。
問題實例:
1)100w個數中找最大的前100個數。
用一個100個元素大小的最小堆即可。
5.雙層桶劃分 ----其實本質上就是【分而治之】的思想,重在「分」的技巧上!
適用范圍:第k大,中位數,不重復或重復的數字
基本原理及要點:因為元素范圍很大,不能利用直接定址表,所以通過多次劃分,逐步確定范圍,然後最後在一個可以接受的范圍內進行。可以通過多次縮小,雙層只是一個例子。
擴展:
問題實例:
1).2.5億個整數中找出不重復的整數的個數,內存空間不足以容納這2.5億個整數。
有點像鴿巢原理,整數個數為2^32,也就是,我們可以將這2^32個數,劃分為2^8個區域(比如用單個文件代表一個區域),然後將數據分離到不同的區域,然後不同的區域在利用bitmap就可以直接解決了。也就是說只要有足夠的磁碟空間,就可以很方便的解決。
2).5億個int找它們的中位數。
這個例子比上面那個更明顯。首先我們將int劃分為2^16個區域,然後讀取數據統計落到各個區域里的數的個數,之後我們根據統計結果就可以判斷中位數落到那個區域,同時知道這個區域中的第幾大數剛好是中位數。然後第二次掃描我們只統計落在這個區域中的那些數就可以了。
實際上,如果不是int是int64,我們可以經過3次這樣的劃分即可降低到可以接受的程度。即可以先將int64分成2^24個區域,然後確定區域的第幾大數,在將該區域分成2^20個子區域,然後確定是子區域的第幾大數,然後子區域里的數的個數只有2^20,就可以直接利用direct addr table進行統計了。
6.資料庫索引
適用范圍:大數據量的增刪改查
基本原理及要點:利用數據的設計實現方法,對海量數據的增刪改查進行處理。
擴展:
問題實例:
7.倒排索引(Inverted index)
適用范圍:搜索引擎,關鍵字查詢
基本原理及要點:為何叫倒排索引?一種索引方法,被用來存儲在全文搜索下某個單詞在一個文檔或者一組文檔中的存儲位置的映射。
以英文為例,下面是要被索引的文本:
T0 = "it is what it is"
T1 = "what is it"
T2 = "it is a banana"
我們就能得到下面的反向文件索引:
"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
檢索的條件"what", "is" 和 "it" 將對應集合的交集。
正向索引開發出來用來存儲每個文檔的單詞的列表。正向索引的查詢往往滿足每個文檔有序頻繁的全文查詢和每個單詞在校驗文檔中的驗證這樣的查詢。在正向索引中,文檔占據了中心的位置,每個文檔指向了一個它所包含的索引項的序列。也就是說文檔指向了它包含的那些單詞,而反向索引則是單詞指向了包含它的文檔,很容易看到這個反向的關系。
擴展:
問題實例:文檔檢索系統,查詢那些文件包含了某單詞,比如常見的學術論文的關鍵字搜索。
8.外排序
適用范圍:大數據的排序,去重
基本原理及要點:外排序的歸並方法,置換選擇 敗者樹原理,最優歸並樹
擴展:
問題實例:
1).有一個1G大小的一個文件,裡面每一行是一個詞,詞的大小不超過16個位元組,內存限制大小是1M。返回頻數最高的100個詞。
這個數據具有很明顯的特點,詞的大小為16個位元組,但是內存只有1m做hash有些不夠,所以可以用來排序。內存可以當輸入緩沖區使用。
9.trie樹
適用范圍:數據量大,重復多,但是數據種類小可以放入內存
基本原理及要點:實現方式,節點孩子的表示方式
擴展:壓縮實現。
問題實例:
1).有10個文件,每個文件1G, 每個文件的每一行都存放的是用戶的query,每個文件的query都可能重復。要你按照query的頻度排序 。
2).1000萬字元串,其中有些是相同的(重復),需要把重復的全部去掉,保留沒有重復的字元串。請問怎麼設計和實現?
3).尋找熱門查詢:查詢串的重復度比較高,雖然總數是1千萬,但如果除去重復後,不超過3百萬個,每個不超過255位元組。
10.分布式處理 maprece
適用范圍:數據量大,但是數據種類小可以放入內存
基本原理及要點:將數據交給不同的機器去處理,數據劃分,結果歸約。
擴展:
問題實例:
1).The canonical example application of MapRece is a process to count the appearances of
each different word in a set of documents:
void map(String name, String document):
// name: document name
// document: document contents
for each word w in document:
EmitIntermediate(w, 1);
void rece(String word, Iterator partialCounts):
// key: a word
// values: a list of aggregated partial counts
int result = 0;
for each v in partialCounts:
result += ParseInt(v);
Emit(result);
Here, each document is split in words, and each word is counted initially with a "1" value by
the Map function, using the word as the result key. The framework puts together all the pairs
with the same key and feeds them to the same call to Rece, thus this function just needs to
sum all of its input values to find the total appearances of that word.
2).海量數據分布在100台電腦中,想個辦法高效統計出這批數據的TOP10。
3).一共有N個機器,每個機器上有N個數。每個機器最多存O(N)個數並對它們操作。如何找到N^2個數的中數(median)?
經典問題分析
上千萬or億數據(有重復),統計其中出現次數最多的前N個數據,分兩種情況:可一次讀入內存,不可一次讀入。
可用思路:trie樹+堆,資料庫索引,劃分子集分別統計,hash,分布式計算,近似統計,外排序
所謂的是否能一次讀入內存,實際上應該指去除重復後的數據量。如果去重後數據可以放入內存,我們可以為數據建立字典,比如通過 map,hashmap,trie,然後直接進行統計即可。當然在更新每條數據的出現次數的時候,我們可以利用一個堆來維護出現次數最多的前N個數據,當然這樣導致維護次數增加,不如完全統計後在求前N大效率高。
如果數據無法放入內存。一方面我們可以考慮上面的字典方法能否被改進以適應這種情形,可以做的改變就是將字典存放到硬碟上,而不是內存,這可以參考資料庫的存儲方法。
當然還有更好的方法,就是可以採用分布式計算,基本上就是map-rece過程,首先可以根據數據值或者把數據hash(md5)後的值,將數據按照范圍劃分到不同的機子,最好可以讓數據劃分後可以一次讀入內存,這樣不同的機子負責處理各種的數值范圍,實際上就是map。得到結果後,各個機子只需拿出各自的出現次數最多的前N個數據,然後匯總,選出所有的數據中出現次數最多的前N個數據,這實際上就是rece過程。
實際上可能想直接將數據均分到不同的機子上進行處理,這樣是無法得到正確的解的。因為一個數據可能被均分到不同的機子上,而另一個則可能完全聚集到一個機子上,同時還可能存在具有相同數目的數據。比如我們要找出現次數最多的前100個,我們將1000萬的數據分布到10台機器上,找到每台出現次數最多的前 100個,歸並之後這樣不能保證找到真正的第100個,因為比如出現次數最多的第100個可能有1萬個,但是它被分到了10台機子,這樣在每台上只有1千個,假設這些機子排名在1000個之前的那些都是單獨分布在一台機子上的,比如有1001個,這樣本來具有1萬個的這個就會被淘汰,即使我們讓每台機子選出出現次數最多的1000個再歸並,仍然會出錯,因為可能存在大量個數為1001個的發生聚集。因此不能將數據隨便均分到不同機子上,而是要根據hash 後的值將它們映射到不同的機子上處理,讓不同的機器處理一個數值范圍。
而外排序的方法會消耗大量的IO,效率不會很高。而上面的分布式方法,也可以用於單機版本,也就是將總的數據根據值的范圍,劃分成多個不同的子文件,然後逐個處理。處理完畢之後再對這些單詞的及其出現頻率進行一個歸並。實際上就可以利用一個外排序的歸並過程。
另外還可以考慮近似計算,也就是我們可以通過結合自然語言屬性,只將那些真正實際中出現最多的那些詞作為一個字典,使得這個規模可以放入內存。
7. 中軟卓越php大數據課程學哪些知識,誰說一下。
首先是向了解PHP呢還是大數據呢?不過想都拿下的話,不是一朝一夕的,步子邁得太大,不好,技在於精而後於多。
先說PHP:最基本其實就是web前端基礎;第二的話包括PHP語言基礎、Ajax、資料庫強化、運行環境及配置、面向對象OOAD&UML等;第三就是主流框架的掌握了,Smarty、PDO等
再說大數據:
基礎階段:Linux、Docker、KVM、MySQL基礎、Oracle基礎、MongoDB、redis。
hadoop maprece hdfs yarn:hadoop:Hadoop 概念、版本、歷史,HDFS工作原理,YARN介紹及組件介紹。
大數據存儲階段:hbase、hive、sqoop。
大數據架構設計階段:Flume分布式、Zookeeper、Kafka。
大數據實時計算階段:Mahout、Spark、storm。
大數據數據採集階段:Python、Scala。
大數據商業實戰階段:實操企業大數據處理業務場景,分析需求、解決方案實施,綜合技術實戰應用。
8. 用Java寫MapRece,用python和R,哪種更適合從事數據行業,做數據...
必然python啊,不過R也很好。python更加靈活,但是R是這一方面的功能一點不弱。但是我感覺很多演算法拿python實現會更容易,而且python更好學,語法更簡潔。具體看個人。