A. 如何使用python 連接kafka 並獲取數據
連接
kafka
的庫有兩種類型,一種是直接連接
kafka
的,存儲
offset
的事情要自己在客戶端完成。還有一種是先連接
zookeeper
然後再通過
zookeeper
獲取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
來協調。
我現在使用
samsa
這個
highlevel
庫
Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer
向
kafka
的
topic
裡面提交數據後才能連接,否則會出錯。
在
Kafka
中一個
consumer
需要指定
groupname
,
groue
中保存著
offset
等信息,新開啟一個
group
會從
offset
0
的位置重新開始獲取日誌。
kafka
的配置參數中有個
partition
,默認是
1
,這個會對數據進行分區,如果多個
consumer
想連接同個
group
就必需要增加
partition
,
partition
只能大於
consumer
的數量,否則多出來的
consumer
將無法獲取到數據。
B. python每個線程消費不用數據
關於python每個線程消費不用數據相關資料如下
python kafka多線程消費數據
1、列印每個線粗搏程id,滿足預期,開啟了8個線程,每個線程號都不一樣凱凳嫌;
2、查看kafka狀態,也能滿足預期,每個分區的消費者id都是不一樣的,下面第二個圖盯手是開啟一個消費者時的狀態,每個分區的消費者id都是相同的;對比之下能滿足需求;
C. filebeat利用kafka進行日誌實時傳輸
選擇安裝目錄:例如安裝在/usr/local/或者/opt/下都可以。
創建一個軟鏈接:
filebeat的配置很簡單,只需要指定input和output就可以了。
由於kafka server高低版本的客戶端API區別較大,因此推薦同時使用高版本的filebeat和kafka server。 注意 高版本的filebeat配置使用低版本的kafka server會出現kafka server接受不到消息的情況。這里我使用的kafka server版本是:2.12-0.11.0.3,可參考 快速搭建kafka
編輯filebeat安裝目錄下 filebeat.yml 文件:
配置Filebeat inputs:
上面 /opt/test/*.log 是我要傳輸的日誌,根據實際情況改成你自己的值。
配置Filebeat outputs:
"111.11.1.243:9092" 是我的單機kafka broker,如果你是kafka集群,請用 , 分隔。 test 是kafka topic,請改成你自己實際情況的值。另外以下信胡這段需要刪除:
因為我並沒有用到Elasticsearch,所以有多個輸出在啟動filebeat時會報錯。到這里filebeat配置kafka就完成了,是不是很簡單,讓我們啟動它測試一下。
啟動,進入filebeat的安裝目錄:
查看是否啟動:
很好,已經啟動了。如果沒有啟動,請查看啟動日誌睜旦文件nohup.out。
停止:
隨機生成日誌腳本:
執行這段python腳本,開啟一個kafka消費者如果成功消費日誌消息:
哈哈,大功告成。 注 上面這段腳本要適時手動停止,因為它是個死循環,如果忘記手動停止那麼就杯具了,我就是這樣悉坦擾把機器寫宕機的。
D. python 消費kafka 寫入es 小記
# -*- coding: utf8 -*-
# __author__ = '小紅帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 列印幫助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消費組
--topic=<topic_name> topic名稱
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主鍵,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 條數據" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超時時間,插入es %s 條數據" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
E. python怎麼安裝acl包
搜索本產品內容
查詢
文檔中心 > 消息隊列 CKafka > SDK 文檔 > Python SDK > 公網 SASL_SSL 方式接入
公網 SASL_SSL 方式接入
最近更新時間:2021-12-28 09:54:00
操作場景
前提條件基塵豎
操作步驟
步驟1:准備工作
步驟2:生產消息
步驟3:消費消息
操作場景
該任務以 Python 客戶端為例,指導您使用公網 SASL_SSL 方式接入消息隊列 CKafka 並收發消息。
前提條件
安裝 Python
安裝 pip
配置 ACL 策略
下載 Demo
下載 SASL_SSL 證書
操作步驟
步驟1:准備工作
創建接入點。
在 實例列表 頁面,單擊目標實例 ID,進入實例詳情頁。
在 基本信息 > 接入方式 中,單擊添加路由策略,在打開窗口中選擇:路由類型:公網域名接入,接入方式:SASL_SSL。
創建角色。
在用戶管理頁面新建角色,設置密碼。
創建 Topic。
在控制台 topic 管理頁面新建 Topic(參見 創建 Topic)。
添加 Python 依賴庫。
執行以下命令安裝:
pip install kafka-python
步驟2:生產消息
修改生產消息程序 procer.py 中配置參數。
procer = KafkaProcer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),
#
# SASL_SSL 公網接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
message = "Hello World! Hello Ckafka!"
msg = json.mps(message).encode()
procer.send('topic_name', value = msg)
print("proce message " + message + " success.")
procer.close()
參數
描述
bootstrap_servers
接入網路,在控制台的實例詳情頁面接入方搏大式模塊的網路列復制。
sasl_plain_username
用戶名,格式為 實例 ID + # + 用戶名。實例 ID 在 CKafka 控制台 的實例詳情頁面的基本信息獲取,用戶在用戶管理創建用戶時設置。
sasl_plain_password
用戶密碼,在 CKafka 控制台實例詳情頁面的用戶管理創建用戶時設置。
topic_name
Topic 名稱,您可以在控制台上 topic管理頁面復制。
CARoot.pem
採用 SASL_SSL 方式接入時,所需的證書兄瞎路徑。
編譯並運行 procer.py。
查看運行結果。
在 CKafka 控制台 的 topic管理頁面,選擇對應的 Topic , 單擊更多 > 消息查詢,查看剛剛發送的消息。
步驟3:消費消息
修改消費消息程序 consumer.py 中配置參數。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),
#
# SASL_SSL 公網接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
參數
描述
bootstrap_servers
接入網路,在控制台的實例詳情頁面接入方式模塊的網路列復制。
group_id
消費者的組 ID,根據業務需求自定義。
sasl_plain_username
用戶名,格式為 實例 ID + # + 用戶名。實例 ID 在CKafka 控制台的實例詳情頁面的基本信息獲取,用戶在用戶管理創建用戶時設置。
sasl_plain_password
用戶名密碼,在 CKafka 控制台實例詳情頁面的用戶管理創建用戶時設置
topic_name
Topic 名稱,您可以在控制台上 topic管理頁面復制。
CARoot.pem
採用 SASL_SSL 方式接入時,所需的證書路徑。
編譯並運行 consumer.py。
查看運行結果。
在 CKafka 控制台 的 Consumer Group 頁面,選擇對應的消費組名稱,在主題名稱輸入 Topic 名稱,單擊查詢詳情,查看消費詳情。
上一篇: 公網 SASL_PLAINTEXT 方式接入下一篇: VPC 網路接入
文檔內容是否對您有幫助?
有幫助沒幫助
如果遇到產品相關問題,您可咨詢 在線客服 尋求幫助。
消息隊列 CKafka 相關文檔
API 概覽
創建主題
簽名方法
獲取實例列表
刪除主題白名單
增加主題白名單
錯誤返回結果
產品概述
獲取主題屬性
點擊搜索騰訊雲文檔
取消
清除查詢
F. Hadoop生態架構之kafka
1、定位:分布式的消息隊列系統,同時提供數據分布式緩存態卜功能(默認7天)
2、消息持久化到磁碟,達到O(1)訪問速度,預讀和後寫,對磁碟的順序訪問(比內存訪問還要快)
3、Storm(分布式的實時計算框架)
Kafka目標成為隊列平台
4、基本組件:
Broker:每一台機器是一個Broker
Procer:日誌消息生產者,主要寫數據
Consumer:日誌消息消費者,主要讀數據
Topic:是虛擬概念,不同的consumer去指定的topic去讀數據,不同procer可以往不同的topic去寫
Partition:是實際概念,文件夾,是在Topic的基礎上做了進一步分層
5、Partition功能:負載均衡,需要保證消息的順序性
順序性拿爛的保證:訂閱消息是從頭往後讀取的,寫消息是尾部追加,所以整體消息是順序的
如果有多個partiton存在,可能會出現順序不一致帆敏穗的情況,原因:每個Partition相互獨立
6、Topic:邏輯概念
一個或多個Partition組成一個Topic
7、Partition以文件夾的形式存在
8、Partition有兩部分組成:
(1)index log:(存儲索引信息,快速定位segment文件)
(2)message log:(真實數據的所在)
9、HDFS多副本的方式來完成數據高可用
如果設置一個Topic,假設這個Topic有5個Partition,3個replication
Kafka分配replication的演算法:
假設:
將第i個Partition分配到(i % N)個Broker上
將第i個Partition的第j個replication分配到( (i+j) % N)個Broker上
雖然Partition裡面有多個replication
如果裡面有M個replication,其中有一個是Leader,其他M-1個follower
10、zookeeper包系統的可用性,zk中會保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分開存儲的
12、偏移量——offset:用來定位數據讀取的位置
13、kafka內部最基本的消息單位——message
14、傳輸最大消息message的size不能超過1M,可以通過配置來修改
15、Consumer Group
16、傳輸效率:zero-
0拷貝:減少Kernel和User模式上下文的切換
直接把disk上的data傳輸給socket,而不是通過應用程序來傳輸
17、Kafka的消息是無狀態的,消費者必須自己維護已消費的狀態信息(offset)
減輕Kafka的實現難度
18、Kafka內部有一個時間策略:SLA——消息保留策略(消息超過一定時間後,會自動刪除)
19、交付保證:
at least once:至少一次(會有重復、但不丟失)
at most once:最多發送一次(不重復、但可能丟失)
exactly once:只有一次(最理想),目前不支持,只能靠客戶端維護
20、Kafka集群裡面,topic內部由多個partition(包含多個replication),達到高可用的目的:
日誌副本:保證可靠性
角色:主、從
ISR:是一個集合,只有在集合中的follower,才有機會被選為leader
如何讓leader知道follower是否成功接收數據(心跳,ack)
如果心跳正常,代表節點活著
21、怎麼算「活著」
(1)心跳
(2)如果follower能夠緊隨leader的更新,不至於被落的太遠
如果一旦掛掉,從ISR集合把該節點刪除掉
前提:需要把zookeeper提前啟動好
一、單機版
1、啟動進程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、創建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、procer發送數據:
]# ./bin/kafka-console-procer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收數據:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、刪除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定設置不同
分別在slave1和slave2上開啟進程:
./bin/kafka-server-start.sh config/server.properties
創建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
1、實現一個consumer group
首先在不同的終端分別開啟consumer,保證groupid一致
]# python consumer_kafka.py
執行一次procer:
]# python procer_kafka.py
2、指定partition發送數據
]# python procer_kafka_2.py
3、指定partition讀出數據
]# python consumer_kafka_2.py
consumer_kafka.py:
procer_kafka.py:
consumer_kafka_2.py:
procer_kafka_2.py:
1.新建./conf/kafka_test/flume_kafka.conf
2.啟動flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功,如下圖:
3.測試:
1.1flume監控產生的數據:
]# for i in seq 1 100 ; do echo '====> '$i >> 1.log ; done
1.2kafka消費數據:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消費結果如下圖:
G. pyflink消費kafka-connect-jdbc消息(帶schema)
1、數據接入
通過kafka的restFul介面創建連接mysql的連接器並啟動。跡山
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州則baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
}
}
2.kafka-connect創建主題中的猛棚默認數據格式為
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}
3.使用pyflink消費帶schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema為字元串, 用 ROW 函數封裝 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行數據'
)comment '從kafkaConnect獲取帶模式的數據'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示讀取最早的消息 | latest-offset 表示讀取消息隊列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.執行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.執行結果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,執行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
H. PHP開發人員的Python基礎知識
PHP(外文名:PHP: Hypertext Preprocessor,中文名:「超文本預處理器」)是一種通用開源腳本語言。語法吸收了C語言、Java和Perl的特點,利於學習,使用廣泛,主要適用於Web開發領域。那麼PHP開發人員的Python基礎知識都有哪些呢?以下僅供參考!
常用縮略語
Ajax:非同步 JavaScript + XML
XML:可擴展標記語言(Extensible Markup Language)
什麼是 Python?
Python 的定義是一種 「通用的高級編程語言」。它以簡潔性和易用性著稱,而且是少有的幾種對空格和縮進有要求的語言之一。Python 的主要作者 Guido Van Rossum 在社區中仍然非常活躍,並且被人們戲稱為仁慈的領導。
Python 的靈活性和緊湊性是值得稱贊的。它支持面向對象編程、結構化編程、面向方面編程以及函數編程等。Python 採用小內核設計,但具備大量擴展庫,從而確保了該語言的緊湊性和靈活性。
從語法的角度來說,您會發現 Python 的簡潔性異常突出——幾乎可以說是一種純粹的境界。PHP 開發人員要麼會對這種方法的語法深深陶醉,要麼會發現它的局限性。這主要取決於您自己的見解。Python 社區推動這種美感的態度是非常明確的,它們更加重視的是美學和簡潔性,而不是靈動的技巧。已形成 Perl 傳統(「可以通過多種方式實現它」)的 PHP 開發人員(像我自己)將面對一種完全相反的哲學(「應該只有一種方法可以實現它」)。
事實上,該社區定義了一種特有的代碼風格術語,即 Python 化(pythonic)。您可以說您的代碼是 Python 化,這是對 Python 術語的良好運用,同時還可展現語言的自然特性。本文並不打算成為 Pythonista(或 Pythoneer),但如果您想繼續 Python 之路,那麼千萬不能錯過本文的知識點。就像 PHP 有自己的編程風格,Perl 有自己的概念方法,學習 Python 語言必然也需要開始用該語言來思考問題。
另一個要點:在撰寫本文時,Python 的最新版本是 V3.0,但本文主要側重於 Python V2.6。Python V3.0 並不能向後兼容之前的版本,而且 V2.6 是使用最為廣泛的版本。當然,您可以根據需求使用自己喜好的版本。
Python 與 PHP 有何不同?
一般來說,PHP 是一種 Web 開發語言。是的,它提供了一個命令行介面,並且甚至可用於開發嵌入式應用程序,但它主要還是用於 Web 開發。相反,Python 是一種腳本語言,並且也可用於 Web 開發。從這方面來說,我知道我會這樣說——它比 PHP 更加接近 Perl。(當然,在其他方面,它們之間並無實際不同。我們繼續往下看。)
PHP 的語法中充斥著美元符號($)和大括弧({}),而 Python 相對來說則更加簡潔和干凈。PHP 支持 switch 和 do...while 結構,而 Python 則不盡然。PHP 使用三元操作符(foo?bar:baz)和冗長的函數名列表,而命名約定更是無所不有;相反,您會發現 Python 要簡潔多了。PHP 的數組類型可同時支持簡單列表和字典或散列,但 Python 卻將這兩者分開。
Python 同時使用可變性和不變性的概念:舉例來說,tuple 就是一個不可變的列表。您可以創建 tuple,但在創建之後不能修改它。這一概念可能要花些時間來熟悉,但對於避免錯誤極為有效。當然,更改 tuple 的惟一方法是復制它。因此,如果您發現對不可變對象執行了大量更改,則應該重新考量自己的方法。
之前提到,Python 中的縮進是有含義的:您在剛開始學習該語言時會對此非常難以適應。您還可以創建使用關鍵字作為參數的函數和方法——這與 PHP 中的標准位置參數迥然不同。面向對象的追隨者會對 Python 中真正的面向對象思想感到欣喜,當然還包括它的 「一級」 類和函數。如果您使用非英語語言,則會鍾愛於 Python 強大的.國際化和 Unicode 支持。您還會喜歡 Python 的多線程功能;這也是最開始令我為之著迷的特性之一。
綜上所述,PHP 和 Python 在許多方面都彼此類似。您可以方便地創建變數、循環,使用條件和創建函數。您甚至可以輕松地創建可重用的模塊。兩種語言的用戶社區都充滿活力和激情。PHP 的用戶群體更加龐大,但這主要歸因於它在託管伺服器及 Web 專注性方面的優勢和普及性。
很好 簡要介紹到此為止。我們開始探索之旅。
使用 Python
清單 1 展示了一個基本的 Python 腳本。
清單 1. 一個簡單的 Python 腳本
for i in range(20):
print(i)
清單 2 展示了腳本的必然結果。
清單 2. 清單 1 的結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在深入探索之前,我們先來了解一些預備知識。首先從變數開始。
變數
可以看到,表示變數並不需要任何特殊的字元。變數 i 就是一個純粹的 i——毫無特殊之處。表示代碼塊或語言結束也不需要任何特殊字元(比如分號和括弧);只需要在 for 行使用一個簡單的冒號即可(:)。還需注意,縮進會向 Python 指示哪些內容屬於 for 循環。舉例來說,清單 3 中的代碼會在循環中為各編號輸出一個說明。
清單 3. 為各循環添加一條語句
for i in range(20):
print(i)
print('all done?')
相反,清單 4 中的代碼會在循環結束處輸出一條說明。
清單 4. 在循環後添加一條語句
for i in range(20):
print(i)
print('all done!')
現在,我第一次看到這樣的代碼時,我認為這完全是無稽之談。什麼?讓我相信換行和縮進能保證代碼的結構和運行?請相信我,不用多久,您就會習慣它(但我需要承認必須到達到分號處才會結束語句的運行)。如果您與其他開發人員共同開發 Python 項目,則會發現這種可讀性的用處是多麼大了。您不再像以前那樣總是猜測 「這個聰明的傢伙在這里究竟想幹些什麼?」
在 PHP,您使用 = 操作符為變數分配值(參見 清單 5)。在 Python 中,您使用相同的操作符,只是需要標記或指向值。對於我來說,它就是賦值操作而已,我不需要過多擔心專門的術語。
清單 5. 創建變數
yorkie = 'Marlowe' #meet our Yorkie Marlowe!
mutt = 'Kafka' #meet our mutt Kafka
print(mutt) #prints Kafka
Python 的變數名稱約定與 PHP 類似:您在創建變數名時只能使用字母、數字和下劃線(_)。同樣,變數名的第一個字元不能是數字。Python 變數名是區分大小寫的,並且您不能使用特定的 Python 關鍵字(比如 if、else、while、def、or、and、not、in 和 is 開始符)作為變數名。這沒有什麼值得奇怪的。
Python 允許您隨意執行基於字元串的操作。清單 6 中的大多數操作應該都是您熟悉的。
清單 6. 常見的基於字元串的操作
yorkie = 'Marlowe'
mutt = 'Kafka'
ylen = len(yorkie) #length of variable yorkie
print(ylen) #prints 7
print(len(yorkie)) #does the same thing
len(yorkie) #also does the same thing, print is implicit
print(yorkie.lower()) #lower cases the string
print(yorkie.strip('aeiou')) #removes vowels from end of string
print(mutt.split('f')) #splits "Kafka" into ['Ka', 'ka']
print(mutt.count('a')) #prints 2, the number of a's in string
yorkie.replace('a','4') #replace a's with 4's
條件語句
您已經了解了如何使用 for 循環;現在,我們來討論條件語句。您會發現 Phyon 中的條件語句與 PHP 基本相同:您可以使用熟悉的 if/else型結構,如清單 7 所示。
清單 7. 一個簡單的條件測試
yorkie = 'Marlowe'
mutt = 'Kafka'
if len(yorkie) > len(mutt):
print('The yorkie wins!')
else:
print('The mutt wins!')
您還可以使用 if/elif/else(elif,等價於 PHP 中的 elseif)創建更加復雜的條件測試,如清單 8 所示。
清單 8. 一個比較復雜的條件測試
yorkie = 'Marlowe'
mutt = 'Kafka'
if len(yorkie) + len(mutt) > 15:
print('The yorkie and the mutt win!')
elif len(yorkie) + len(mutt) > 10:
print('Too close to tell!')
else:
print('Nobody wins!')
您可能會說,目前為止並沒有什麼與眾不同的地方:甚本上和想像中沒有太大區別。現在,我們來看 Python 處理列表的方式,您會發現兩種語言之間的不同之處。
列表
一種常用的列表類型是 tuple,它是不可變的。在 tuple 中載入一系列值之後,您不會更改它。Tuple 可以包含數字、字元串、變數,甚至其他 tuples。Tuples 從 0 開始建立索引,這很正常;您可以使用 -1 索引訪問最後一個項目。您還可以對 tuple 運行一些函數(請參見清單 9)。
清單 9. Tuples
items = (1, mutt, 'Honda', (1,2,3))
print items[1] #prints Kafka
print items[-1] #prints (1,2,3)
items2 = items[0:2] #items2 now contains (1, 'Kafka') thanks to slice operation
'Honda' in items #returns TRUE
len(items) #returns 4
items.index('Kafka') #returns 1, because second item matches this index location
列表與 tuple 類似,只不過它們是可變的。創建列表之後,您可以添加、刪除和更新列表中的值。列表使用方括弧,而不是圓括弧(()),如清單 10 所示。
清單 10. 列表
groceries = ['ham','spam','eggs']
len(groceries) #returns 3
print groceries[1] #prints spam
for x in groceries:
print x.upper() #prints HAM SPAM EGGS
groceries[2] = 'bacon'
groceries #list is now ['ham','spam','bacon']
groceries.append('eggs')
groceries #list is now ['ham', 'spam', 'bacon', 'eggs']
groceries.sort()
groceries #list is now ['bacon', 'eggs', 'ham', 'spam']
字典類似於關聯數組或散列;它使用鍵值對來存儲和限制信息。但它不使用方括弧和圓括弧,而是使用尖括弧。與列表類似,字典是可變的,這意味著您可以添加、刪除和更新其中的值(請參見清單 11)。
清單 11. 字典
colorvalues = {'red' : 1, 'blue' : 2, 'green' : 3, 'yellow' : 4, 'orange' : 5}
colorvalues #prints {'blue': 2, 'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}
colorvalues['blue'] #prints 2
colorvalues.keys() #retrieves all keys as a list:
#['blue', 'orange', 'green', 'yellow', 'red']
colorvalues.pop('blue') #prints 2 and removes the blue key/value pair
colorvalues #after pop, we have:
#{'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}
在 Python 中創建一個簡單的腳本
現在,您已經對 Python 有了一定的了解。接下來,我們將創建一個簡單的 Python 腳本。該腳本將讀取位於您的伺服器 /tmp 目錄下的 PHP 會話文件的數量,並在日誌文件中寫入摘要報告。在該腳本中,您將學習如何導入特定函數的模塊,如何使用文件,以及如何寫入日誌文件。您還將設置一系列變數來跟蹤所收集的信息。
清單 12 展示了整個腳本。打開一個編輯器,並將代碼粘貼到其中,然後在系統中將該文件保存為 tmp.py。然後,對該文件運行 chmod + x,使它成為可執行文件(假定您使用 UNIX? 系統)。
清單 12. tmp.py
#!/usr/bin/python
import os
from time import strftime
stamp = strftime("%Y-%m-%d %H:%M:%S")
logfile = '/path/to/your/logfile.log'
path = '/path/to/tmp/directory/'
files = os.listdir(path)
bytes = 0
numfiles = 0
for f in files:
if f.startswith('sess_'):
info = os.stat(path + f)
numfiles += 1
bytes += info[6]
if numfiles > 1:
title = 'files'
else:
title = 'file'
string = stamp + " -- " + str(numfiles) + " session "
+ title +", " + str(bytes) + " bytes "
file = open(logfile,"a")
file.writelines(string)
file.close()
在第一行中,您可以看到一個 hash-bang 行:它用於標識 Python 解釋器的位置。在我的系統中,它位於 /usr/bin/python。請根據系統需求調整這一行。
接下來的兩行用於導入特定的模塊,這些模塊將幫助您執行作業。考慮到腳本需要處理文件夾和文件,因此您需要導入 os 模塊,因為其中包含各種函數和方法,可幫助您列出文件、讀取文件和操作文件夾。您還需要寫入一個日誌文件,因此可以為條目添加一個時間戳 — 這就需要使用時間函數。您不需要所有時間函數,只需要導入 strftime函數即可。
在接下來的六行中,您設置了一些變數。第一個變數是 stamp,其中包含一個日期字元串。然後,您使用 strftime 函數創建了一個特定格式的時間戳 — 在本例中,時間戳的格式為 2010-01-03 12:43:03。
接下來,創建一個 logfile 變數,並在文件中添加一個實際存儲日誌文件消息的路徑(該文件不需要實際存在)。為簡單起見,我在 /logs 文件夾中放置了一個日誌文件,但您也可以將它放置在別處。同樣,path 變數包含到 /tmp 目錄的路徑。您可以使用任何路徑,只要使用斜杠作為結束即可 (/)。
接下來的三個變數也非常簡單:files 列表包含指定路徑中的所有文件和文件夾,另外還包含 bytes 和 numfiles 兩個變數。這兩個變數都設置為 0;腳本會在處理文件時遞增這些值。
完成所有這些定義之後,接下來就是腳本的核心了:一個簡單的 for 循環,用於處理文件列表中的各文件。每次運行循環時,腳本都會計算文件名;如果它以 sess_ 開頭,則腳本會對該文件運行 os.stat(),提取文件數據(比如創建時間、修改時間和位元組大小),遞增 numfiles 計數器並將該文件的位元組大小累計到總數中。
當循環完成運行後,腳本會檢查 numfiles 變數中的值是否大於 1。如果大於 1,則會將一個新的 title 變數設置為 files;否則,title 將被設置為單數形式的 file。
腳本的最後部分也非常簡單:您創建了一個 string 變數,並在該變數中添加了一行以時間戳開始的數據,並且其後還包含 numfiles(已轉換為字元串)和位元組(也已轉換為字元串)。請注意繼續字元();該字元可允許代碼運行到下一行。它是一個提高可讀性的小技巧。
然後,您使用 open() 函數以附加模式打開日誌文件(畢竟始終需要在該文件中添加內容),writelines() 函數會將字元串添加到日誌文件中,而 close() 函數用於關閉該文件。
現在,您已經創建了一個簡單的 Python 腳本。該腳本可用於完成許多任務,舉例來說,您可以設置一個 cron作業來每小時運行一次這個腳本,以幫助您跟蹤 24 小時內所使用的 PHP 會話的數量。您還可以使用 jQuery 或其他一些 JavaScript 框架通過 Ajax 連接這個腳本,用於為您提供日誌文件提要(如果採用這種方式,則需要使用 print命令來返回數據)。
I. 如何在kafka-python和confluent-kafka之間做出選擇
kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。