『壹』 如何運行含spark的python腳本
1、Spark腳本提交/運行/部署1.1spark-shell(交互窗口模式)運行Spark-shell需要指向申請資源的standalonespark集群信息,其參數為MASTER,還可以指定executor及driver的內存大小。sudospark-shell--executor-memory5g--driver-memory1g--masterspark://192.168.180.216:7077spark-shell啟動完後,可以在交互窗口中輸入Scala命令,進行操作,其中spark-shell已經默認生成sc對象,可以用:valuser_rdd1=sc.textFile(inputpath,10)讀取數據資源等。1.2spark-shell(腳本運行模式)上面方法需要在交互窗口中一條一條的輸入scala程序;將scala程序保存在test.scala文件中,可以通過以下命令一次運行該文件中的程序代碼:sudospark-shell--executor-memory5g--driver-memory1g--masterspark//192.168.180.216:7077
『貳』 python開發spark環境該如何配置,又該如何操作
1)輸入:welcome="Hello!"回車
再輸入:printwelcome或者直接welcome回車就可以看到輸出Hello!
2)
[html]viewplain
welcome="hello"
you="world!"
printwelcome+you
輸出:helloworld!
以上使用的是字元串,變數還有幾種類型:數,字元串,列表,字典,文件。其他的和別的語言類似,下面先講下列表:
3)
[html]viewplain
my_list=[]//這個就產生了一個空的列表。然後給它賦值
my_list=[1,2]
printmy_list
my_list.append(3)
printmy_list
4)字典:
[html]viewplain
contact={}
contact["name"]="shiyuezhong"
contact["phone"]=12332111
5)結合列表和字典:
[html]viewplain
contact_list=[]
contact1={}
contact1['name']='shiyuezhong'
contact1['phone']=12332111
contact_list.append(contact1)
contact2={}
contact2['name']='buding'
contact2['phone']=88888888
contact_list.append(contact2)
『叄』 如何用Python寫spark
1.RDD是PariRDD類型
def add1(line):
return line[0] + line[1]
def add2(x1,x2):
return x1 + x2
sc = SparkContext(appName="gridAnalyse")
rdd = sc.parallelize([1,2,3])
list1 = rdd.map(lambda line: (line,1)).map(lambda (x1,x2) : x1 + x2).collect() #只有一個參數,通過匹配來直接獲取(賦值給裡面對應位置的變數)
list1 = rdd.map(lambda line: (line,1)).map(lambda x1,x2 : x1 + x2).collect() #錯誤,相當於函數有兩個參數
list2 = rdd.map(lambda line: (line,1)).map(lambda line : line[0] + line[1]).collect() #只有一個參數,參數是Tuple或List數據類型,再從集合的對應位置取出數據
list3 = rdd.map(lambda line: (line,1)).map(add1).collect() #傳遞函數,將Tuple或List類型數據傳給形參
list4 = rdd.map(lambda line: (line,1)).map(add2).collect() #錯誤,因為輸入只有一個,卻有兩個形參
當RDD是PairRDD時,map中可以寫lambda表達式和傳入一個函數。
a、寫lambda表達式:
可以通過(x1,x2,x3)來匹配獲取值;或者使用line獲取集合,然後從集合中獲取。
b、傳入函數
根據spark具體的transaction OR action 操作來確定自定義函數參數的個數,此例子中只有一個參數,從形參(集合類型)中獲取相應位置的數據。
『肆』 有沒有Python寫的spark連接Hbase的例子
沒有sprak寫得連接hbase的例子
spark編程python實例
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[])
1.pyspark在jupyter notebook中開發,測試,提交
1.1.啟動
IPYTHON_OPTS="notebook" /opt/spark/bin/pyspark11
下載應用,將應用下載為.py文件(默認notebook後綴是.ipynb)
在shell中提交應用
wxl@wxl-pc:/opt/spark/bin$ spark-submit /bin/spark-submit /home/wxl/Downloads/pysparkdemo.py11
!
3.遇到的錯誤及解決
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*])
d*
3.1.錯誤
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*])
d*
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <mole> at /usr/local/lib/python2.7/dist-packages/IPython/utils/py3compat.py:28811
3.2.解決,成功運行
在from之後添加
try:
sc.stop()
except:
pass
sc=SparkContext('local[2]','First Spark App')1234512345
貼上錯誤解決方法來源StackOverFlow
4.源碼
pysparkdemo.ipynb
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark import SparkContext"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"try:\n",
" sc.stop()\n",
"except:\n",
" pass\n",
"sc=SparkContext('local[2]','First Spark App')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"data = sc.textFile(\"data/UserPurchaseHistory.csv\").map(lambda line: line.split(\",\")).map(lambda record: (record[0], record[1], record[2]))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Total purchases: 5\n"
]
}
],
"source": [
"numPurchases = data.count()\n",
"print \"Total purchases: %d\" % numPurchases"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
pysparkdemo.py
# coding: utf-8
# In[1]:
from pyspark import SparkContext
# In[2]:
try:
sc.stop()
except:
pass
sc=SparkContext('local[2]','First Spark App')
# In[3]:
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# In[4]:
numPurchases = data.count()
print "Total purchases: %d" % numPurchases
# In[ ]:
『伍』 機器學習實踐:如何將Spark與Python結合
可以學習一下林大貴這本書,從頭到尾教你如何使用python+spark+hadoop實現常用的演算法訓練和部署。
《Python+Spark2.0+Hadoop機器學習與大數據實戰_林大貴》
鏈接:https://pan..com/s/1VGUOyr3WnOb_uf3NA_ZdLA
提取碼:ewzf
『陸』 如何在ipython或python中使用Spark
在ipython中使用spark
說明:
spark 1.6.0
scala 2.10.5
spark安裝路徑是/usr/local/spark;已經在.bashrc中配置了SPARK_HOME環境變數。
方法一
/usr/local/Spark/bin/pyspark默認打開的是Python,而不是ipython。通過在pyspark文件中添加一行,來使用ipython打開。
cp pyspark ipyspark
vi ipyspark
# 在最前面添加
IPYTHON=1
# 啟動
ipyspark
方法二:
通過為spark創建一個ipython 配置的方式實現。
# 為spark創建一個ipython 配置
ipython profile create spark
# 創建啟動配置文件
cd ~/.config/ipython/profile_spark/startup
vi 00-pyspark-setup.py
在00-pyspark-setup.py中添加如下內容:
import os
import sys
# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/srv/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "pyspark"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib", "py4j-0.9-src.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
啟動ipython
ipython –profile spark
測試程序
在ipython中輸入一下命令,如果下面的程序執行完後輸出一個數字,說明正確。
from pyspark import SparkContext
sc = SparkContext( 'local', 'pyspark')
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
# Compute the number of primes in the RDD
print 「Result: 」, nums.filter(isprime).count()
方法三
將上面的程序放入test.py文件,執行命令python test.py。發現錯誤。因為沒有將pyspark路徑加入PYTHONPATH環境變數。
在~/.bashrc或/etc/profile中添加如下內容:
# python can call pyspark directly
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH
執行如下命令:
# 使配置生效
source ~/.bashrc
# 測試程序
python test.py
此時,已經能夠運行了。