導航:首頁 > 編程語言 > kafkapython教程

kafkapython教程

發布時間:2023-03-22 12:11:49

A. 如何使用python 連接kafka 並獲取數據

連接
kafka
的庫有兩種類型,一種是直接連接
kafka
的,存儲
offset
的事情要自己在客戶端完成。還有一種是先連接
zookeeper
然後再通過
zookeeper
獲取
kafka

brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
來協調。
我現在使用
samsa
這個
highlevel

Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer

kafka

topic
裡面提交數據後才能連接,否則會出錯。

Kafka
中一個
consumer
需要指定
groupname

groue
中保存著
offset
等信息,新開啟一個
group
會從
offset
0
的位置重新開始獲取日誌。
kafka
的配置參數中有個
partition
,默認是
1
,這個會對數據進行分區,如果多個
consumer
想連接同個
group
就必需要增加
partition
,
partition
只能大於
consumer
的數量,否則多出來的
consumer
將無法獲取到數據。

B. python每個線程消費不用數據

關於python每個線程消費不用數據相關資料如下
python kafka多線程消費數據

1、列印每個線粗搏程id,滿足預期,開啟了8個線程,每個線程號都不一樣凱凳嫌;

 

2、查看kafka狀態,也能滿足預期,每個分區的消費者id都是不一樣的,下面第二個圖盯手是開啟一個消費者時的狀態,每個分區的消費者id都是相同的;對比之下能滿足需求;

C. filebeat利用kafka進行日誌實時傳輸

選擇安裝目錄:例如安裝在/usr/local/或者/opt/下都可以。

創建一個軟鏈接:

filebeat的配置很簡單,只需要指定input和output就可以了。

由於kafka server高低版本的客戶端API區別較大,因此推薦同時使用高版本的filebeat和kafka server。 注意 高版本的filebeat配置使用低版本的kafka server會出現kafka server接受不到消息的情況。這里我使用的kafka server版本是:2.12-0.11.0.3,可參考 快速搭建kafka

編輯filebeat安裝目錄下 filebeat.yml 文件:

配置Filebeat inputs:

上面 /opt/test/*.log 是我要傳輸的日誌,根據實際情況改成你自己的值。
配置Filebeat outputs:

"111.11.1.243:9092" 是我的單機kafka broker,如果你是kafka集群,請用 , 分隔。 test 是kafka topic,請改成你自己實際情況的值。另外以下信胡這段需要刪除:

因為我並沒有用到Elasticsearch,所以有多個輸出在啟動filebeat時會報錯。到這里filebeat配置kafka就完成了,是不是很簡單,讓我們啟動它測試一下。

啟動,進入filebeat的安裝目錄:

查看是否啟動:

很好,已經啟動了。如果沒有啟動,請查看啟動日誌睜旦文件nohup.out。
停止:

隨機生成日誌腳本:

執行這段python腳本,開啟一個kafka消費者如果成功消費日誌消息:

哈哈,大功告成。 注 上面這段腳本要適時手動停止,因為它是個死循環,如果忘記手動停止那麼就杯具了,我就是這樣悉坦擾把機器寫宕機的。

D. python 消費kafka 寫入es 小記

# -*- coding: utf8 -*-

# __author__ = '小紅帽'

# Date: 2020-05-11

"""Naval Fate.

Usage:

        py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>

        py_kafka_protobuf_consume.py -h | --help

        py_kafka_protobuf_consume.py --version

Options:

        -h --help                                      列印幫助信息.

        --bootstrap_servers=<host:port,host2:port2..>  kafka servers

        --groupId=<groupId>                            kafka消費組

        --topic=<topic_name>                            topic名稱

        --es-servers=<host:port>                        ES 地址

        --index=<index_name>                            ES 索引

        --type=<doc> ES type

        --id=<order_id> 指定id主鍵,快速更新

"""

import json

from kafka import KafkaConsumer

from docopt import docopt

from elasticsearch import Elasticsearch

from elasticsearch import helpers

class Kafka_consumer():

    def __init__(self,args):

        self.topic = args['--topic']

        self.bootstrapServers = args['--bootstrap-servers']

        self.groupId = args['--groupId']

        self.id = args['--id']

        self.es_host = args['--es-servers'].split(':')[0]

        self.es_port = args['--es-servers'].split(':')[1]

        self.es_index = args['--index']

        self.es_type = args['--type']

        self.consumer = KafkaConsumer(

            bootstrap_servers=self.bootstrapServers,

            group_id=self.groupId,

            enable_auto_commit = True,

            auto_commit_interval_ms=5000,

            consumer_timeout_ms=5000

        )

    def consume_data_es(self):

        while True:

            try:

                es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)

                self.consumer.subscribe([self.topic])

                actions=[]

                for message in self.consumer:

                    if message is not None:

                        query = json.loads(message.value)['data'][0]

                        action = {

                            "_index": self.es_index,

                            "_type": self.es_type,

                            "_id": json.loads(message.value)['data'][0][self.id],

                            "_source": query

                        }

                        actions.append(action)

                        if len(actions) > 50:

                            helpers.bulk(client=es, actions=actions)

                            print("插入es %s 條數據" % len(actions))

                            actions = []

                if len(actions) > 0:

                    helpers.bulk(client=es, actions=actions)

                    print("等待超時時間,插入es %s 條數據" % len(actions))

                    actions=[]

            except BaseException as e:

                print(e)

if __name__ == '__main__':

    arguments = docopt(__doc__,version='sbin 1.0')

    consumer = Kafka_consumer(arguments)

    consumer.consume_data_es()

E. python怎麼安裝acl包

搜索本產品內容
查詢
文檔中心 > 消息隊列 CKafka > SDK 文檔 > Python SDK > 公網 SASL_SSL 方式接入
公網 SASL_SSL 方式接入
最近更新時間:2021-12-28 09:54:00

操作場景
前提條件基塵豎
操作步驟
步驟1:准備工作
步驟2:生產消息
步驟3:消費消息
操作場景
該任務以 Python 客戶端為例,指導您使用公網 SASL_SSL 方式接入消息隊列 CKafka 並收發消息。
前提條件
安裝 Python
安裝 pip
配置 ACL 策略
下載 Demo
下載 SASL_SSL 證書
操作步驟
步驟1:准備工作
創建接入點。
在 實例列表 頁面,單擊目標實例 ID,進入實例詳情頁。
在 基本信息 > 接入方式 中,單擊添加路由策略,在打開窗口中選擇:路由類型:公網域名接入,接入方式:SASL_SSL。

創建角色。
在用戶管理頁面新建角色,設置密碼。

創建 Topic。
在控制台 topic 管理頁面新建 Topic(參見 創建 Topic)。
添加 Python 依賴庫。
執行以下命令安裝:
pip install kafka-python
步驟2:生產消息
修改生產消息程序 procer.py 中配置參數。
procer = KafkaProcer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),

#
# SASL_SSL 公網接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)

message = "Hello World! Hello Ckafka!"
msg = json.mps(message).encode()
procer.send('topic_name', value = msg)
print("proce message " + message + " success.")
procer.close()
參數
描述
bootstrap_servers
接入網路,在控制台的實例詳情頁面接入方搏大式模塊的網路列復制。

sasl_plain_username
用戶名,格式為 實例 ID + # + 用戶名。實例 ID 在 CKafka 控制台 的實例詳情頁面的基本信息獲取,用戶在用戶管理創建用戶時設置。
sasl_plain_password
用戶密碼,在 CKafka 控制台實例詳情頁面的用戶管理創建用戶時設置。
topic_name
Topic 名稱,您可以在控制台上 topic管理頁面復制。

CARoot.pem
採用 SASL_SSL 方式接入時,所需的證書兄瞎路徑。
編譯並運行 procer.py。
查看運行結果。

在 CKafka 控制台 的 topic管理頁面,選擇對應的 Topic , 單擊更多 > 消息查詢,查看剛剛發送的消息。

步驟3:消費消息
修改消費消息程序 consumer.py 中配置參數。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),

#
# SASL_SSL 公網接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,

)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
參數
描述
bootstrap_servers
接入網路,在控制台的實例詳情頁面接入方式模塊的網路列復制。

group_id
消費者的組 ID,根據業務需求自定義。
sasl_plain_username
用戶名,格式為 實例 ID + # + 用戶名。實例 ID 在CKafka 控制台的實例詳情頁面的基本信息獲取,用戶在用戶管理創建用戶時設置。
sasl_plain_password
用戶名密碼,在 CKafka 控制台實例詳情頁面的用戶管理創建用戶時設置
topic_name
Topic 名稱,您可以在控制台上 topic管理頁面復制。

CARoot.pem
採用 SASL_SSL 方式接入時,所需的證書路徑。
編譯並運行 consumer.py。
查看運行結果。

在 CKafka 控制台 的 Consumer Group 頁面,選擇對應的消費組名稱,在主題名稱輸入 Topic 名稱,單擊查詢詳情,查看消費詳情。

上一篇: 公網 SASL_PLAINTEXT 方式接入下一篇: VPC 網路接入
文檔內容是否對您有幫助?
有幫助沒幫助
如果遇到產品相關問題,您可咨詢 在線客服 尋求幫助。

消息隊列 CKafka 相關文檔
API 概覽
創建主題
簽名方法
獲取實例列表
刪除主題白名單
增加主題白名單
錯誤返回結果
產品概述
獲取主題屬性
點擊搜索騰訊雲文檔
取消
清除查詢

F. Hadoop生態架構之kafka

1、定位:分布式的消息隊列系統,同時提供數據分布式緩存態卜功能(默認7天)
2、消息持久化到磁碟,達到O(1)訪問速度,預讀和後寫,對磁碟的順序訪問(比內存訪問還要快)
3、Storm(分布式的實時計算框架)
Kafka目標成為隊列平台
4、基本組件:
Broker:每一台機器是一個Broker
Procer:日誌消息生產者,主要寫數據
Consumer:日誌消息消費者,主要讀數據
Topic:是虛擬概念,不同的consumer去指定的topic去讀數據,不同procer可以往不同的topic去寫
Partition:是實際概念,文件夾,是在Topic的基礎上做了進一步分層
5、Partition功能:負載均衡,需要保證消息的順序性
順序性拿爛的保證:訂閱消息是從頭往後讀取的,寫消息是尾部追加,所以整體消息是順序的
如果有多個partiton存在,可能會出現順序不一致帆敏穗的情況,原因:每個Partition相互獨立
6、Topic:邏輯概念
一個或多個Partition組成一個Topic
7、Partition以文件夾的形式存在
8、Partition有兩部分組成:
(1)index log:(存儲索引信息,快速定位segment文件)
(2)message log:(真實數據的所在)
9、HDFS多副本的方式來完成數據高可用
如果設置一個Topic,假設這個Topic有5個Partition,3個replication
Kafka分配replication的演算法
假設:
將第i個Partition分配到(i % N)個Broker上
將第i個Partition的第j個replication分配到( (i+j) % N)個Broker上
雖然Partition裡面有多個replication
如果裡面有M個replication,其中有一個是Leader,其他M-1個follower
10、zookeeper包系統的可用性,zk中會保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分開存儲的
12、偏移量——offset:用來定位數據讀取的位置
13、kafka內部最基本的消息單位——message
14、傳輸最大消息message的size不能超過1M,可以通過配置來修改
15、Consumer Group
16、傳輸效率:zero-
0拷貝:減少Kernel和User模式上下文的切換
直接把disk上的data傳輸給socket,而不是通過應用程序來傳輸
17、Kafka的消息是無狀態的,消費者必須自己維護已消費的狀態信息(offset)
減輕Kafka的實現難度
18、Kafka內部有一個時間策略:SLA——消息保留策略(消息超過一定時間後,會自動刪除)
19、交付保證:
at least once:至少一次(會有重復、但不丟失)
at most once:最多發送一次(不重復、但可能丟失)
exactly once:只有一次(最理想),目前不支持,只能靠客戶端維護
20、Kafka集群裡面,topic內部由多個partition(包含多個replication),達到高可用的目的:
日誌副本:保證可靠性
角色:主、從
ISR:是一個集合,只有在集合中的follower,才有機會被選為leader
如何讓leader知道follower是否成功接收數據(心跳,ack)
如果心跳正常,代表節點活著
21、怎麼算「活著」
(1)心跳
(2)如果follower能夠緊隨leader的更新,不至於被落的太遠
如果一旦掛掉,從ISR集合把該節點刪除掉

前提:需要把zookeeper提前啟動好
一、單機版
1、啟動進程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、創建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、procer發送數據:
]# ./bin/kafka-console-procer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收數據:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、刪除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定設置不同
分別在slave1和slave2上開啟進程:
./bin/kafka-server-start.sh config/server.properties

創建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

1、實現一個consumer group
首先在不同的終端分別開啟consumer,保證groupid一致
]# python consumer_kafka.py

執行一次procer:
]# python procer_kafka.py

2、指定partition發送數據
]# python procer_kafka_2.py

3、指定partition讀出數據
]# python consumer_kafka_2.py
consumer_kafka.py:

procer_kafka.py:

consumer_kafka_2.py:

procer_kafka_2.py:

1.新建./conf/kafka_test/flume_kafka.conf

2.啟動flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
啟動成功,如下圖:

3.測試:
1.1flume監控產生的數據:
]# for i in seq 1 100 ; do echo '====> '$i >> 1.log ; done
1.2kafka消費數據:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消費結果如下圖:

G. pyflink消費kafka-connect-jdbc消息(帶schema)

1、數據接入

通過kafka的restFul介面創建連接mysql的連接器並啟動。跡山

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州則baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

    }

}

2.kafka-connect創建主題中的猛棚默認數據格式為

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消費帶schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema為字元串, 用 ROW 函數封裝 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行數據'

)comment '從kafkaConnect獲取帶模式的數據'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示讀取最早的消息 | latest-offset 表示讀取消息隊列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.執行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.執行結果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,執行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

H. PHP開發人員的Python基礎知識

PHP(外文名:PHP: Hypertext Preprocessor,中文名:「超文本預處理器」)是一種通用開源腳本語言。語法吸收了C語言、Java和Perl的特點,利於學習,使用廣泛,主要適用於Web開發領域。那麼PHP開發人員的Python基礎知識都有哪些呢?以下僅供參考!

常用縮略語

Ajax:非同步 JavaScript + XML

XML:可擴展標記語言(Extensible Markup Language)

什麼是 Python?

Python 的定義是一種 「通用的高級編程語言」。它以簡潔性和易用性著稱,而且是少有的幾種對空格和縮進有要求的語言之一。Python 的主要作者 Guido Van Rossum 在社區中仍然非常活躍,並且被人們戲稱為仁慈的領導。

Python 的靈活性和緊湊性是值得稱贊的。它支持面向對象編程、結構化編程、面向方面編程以及函數編程等。Python 採用小內核設計,但具備大量擴展庫,從而確保了該語言的緊湊性和靈活性。

從語法的角度來說,您會發現 Python 的簡潔性異常突出——幾乎可以說是一種純粹的境界。PHP 開發人員要麼會對這種方法的語法深深陶醉,要麼會發現它的局限性。這主要取決於您自己的見解。Python 社區推動這種美感的態度是非常明確的,它們更加重視的是美學和簡潔性,而不是靈動的技巧。已形成 Perl 傳統(「可以通過多種方式實現它」)的 PHP 開發人員(像我自己)將面對一種完全相反的哲學(「應該只有一種方法可以實現它」)。

事實上,該社區定義了一種特有的代碼風格術語,即 Python 化(pythonic)。您可以說您的代碼是 Python 化,這是對 Python 術語的良好運用,同時還可展現語言的自然特性。本文並不打算成為 Pythonista(或 Pythoneer),但如果您想繼續 Python 之路,那麼千萬不能錯過本文的知識點。就像 PHP 有自己的編程風格,Perl 有自己的概念方法,學習 Python 語言必然也需要開始用該語言來思考問題。

另一個要點:在撰寫本文時,Python 的最新版本是 V3.0,但本文主要側重於 Python V2.6。Python V3.0 並不能向後兼容之前的版本,而且 V2.6 是使用最為廣泛的版本。當然,您可以根據需求使用自己喜好的版本。

Python 與 PHP 有何不同?

一般來說,PHP 是一種 Web 開發語言。是的,它提供了一個命令行介面,並且甚至可用於開發嵌入式應用程序,但它主要還是用於 Web 開發。相反,Python 是一種腳本語言,並且也可用於 Web 開發。從這方面來說,我知道我會這樣說——它比 PHP 更加接近 Perl。(當然,在其他方面,它們之間並無實際不同。我們繼續往下看。)

PHP 的語法中充斥著美元符號($)和大括弧({}),而 Python 相對來說則更加簡潔和干凈。PHP 支持 switch 和 do...while 結構,而 Python 則不盡然。PHP 使用三元操作符(foo?bar:baz)和冗長的函數名列表,而命名約定更是無所不有;相反,您會發現 Python 要簡潔多了。PHP 的數組類型可同時支持簡單列表和字典或散列,但 Python 卻將這兩者分開。

Python 同時使用可變性和不變性的概念:舉例來說,tuple 就是一個不可變的列表。您可以創建 tuple,但在創建之後不能修改它。這一概念可能要花些時間來熟悉,但對於避免錯誤極為有效。當然,更改 tuple 的惟一方法是復制它。因此,如果您發現對不可變對象執行了大量更改,則應該重新考量自己的方法。

之前提到,Python 中的縮進是有含義的:您在剛開始學習該語言時會對此非常難以適應。您還可以創建使用關鍵字作為參數的函數和方法——這與 PHP 中的標准位置參數迥然不同。面向對象的追隨者會對 Python 中真正的面向對象思想感到欣喜,當然還包括它的 「一級」 類和函數。如果您使用非英語語言,則會鍾愛於 Python 強大的.國際化和 Unicode 支持。您還會喜歡 Python 的多線程功能;這也是最開始令我為之著迷的特性之一。

綜上所述,PHP 和 Python 在許多方面都彼此類似。您可以方便地創建變數、循環,使用條件和創建函數。您甚至可以輕松地創建可重用的模塊。兩種語言的用戶社區都充滿活力和激情。PHP 的用戶群體更加龐大,但這主要歸因於它在託管伺服器及 Web 專注性方面的優勢和普及性。

很好 簡要介紹到此為止。我們開始探索之旅。

使用 Python

清單 1 展示了一個基本的 Python 腳本。

清單 1. 一個簡單的 Python 腳本

for i in range(20):

print(i)

清單 2 展示了腳本的必然結果。

清單 2. 清單 1 的結果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

在深入探索之前,我們先來了解一些預備知識。首先從變數開始。

變數

可以看到,表示變數並不需要任何特殊的字元。變數 i 就是一個純粹的 i——毫無特殊之處。表示代碼塊或語言結束也不需要任何特殊字元(比如分號和括弧);只需要在 for 行使用一個簡單的冒號即可(:)。還需注意,縮進會向 Python 指示哪些內容屬於 for 循環。舉例來說,清單 3 中的代碼會在循環中為各編號輸出一個說明。

清單 3. 為各循環添加一條語句

for i in range(20):

print(i)

print('all done?')

相反,清單 4 中的代碼會在循環結束處輸出一條說明。

清單 4. 在循環後添加一條語句

for i in range(20):

print(i)

print('all done!')

現在,我第一次看到這樣的代碼時,我認為這完全是無稽之談。什麼?讓我相信換行和縮進能保證代碼的結構和運行?請相信我,不用多久,您就會習慣它(但我需要承認必須到達到分號處才會結束語句的運行)。如果您與其他開發人員共同開發 Python 項目,則會發現這種可讀性的用處是多麼大了。您不再像以前那樣總是猜測 「這個聰明的傢伙在這里究竟想幹些什麼?」

在 PHP,您使用 = 操作符為變數分配值(參見 清單 5)。在 Python 中,您使用相同的操作符,只是需要標記或指向值。對於我來說,它就是賦值操作而已,我不需要過多擔心專門的術語。

清單 5. 創建變數

yorkie = 'Marlowe' #meet our Yorkie Marlowe!

mutt = 'Kafka' #meet our mutt Kafka

print(mutt) #prints Kafka

Python 的變數名稱約定與 PHP 類似:您在創建變數名時只能使用字母、數字和下劃線(_)。同樣,變數名的第一個字元不能是數字。Python 變數名是區分大小寫的,並且您不能使用特定的 Python 關鍵字(比如 if、else、while、def、or、and、not、in 和 is 開始符)作為變數名。這沒有什麼值得奇怪的。

Python 允許您隨意執行基於字元串的操作。清單 6 中的大多數操作應該都是您熟悉的。

清單 6. 常見的基於字元串的操作

yorkie = 'Marlowe'

mutt = 'Kafka'

ylen = len(yorkie) #length of variable yorkie

print(ylen) #prints 7

print(len(yorkie)) #does the same thing

len(yorkie) #also does the same thing, print is implicit

print(yorkie.lower()) #lower cases the string

print(yorkie.strip('aeiou')) #removes vowels from end of string

print(mutt.split('f')) #splits "Kafka" into ['Ka', 'ka']

print(mutt.count('a')) #prints 2, the number of a's in string

yorkie.replace('a','4') #replace a's with 4's

條件語句

您已經了解了如何使用 for 循環;現在,我們來討論條件語句。您會發現 Phyon 中的條件語句與 PHP 基本相同:您可以使用熟悉的 if/else型結構,如清單 7 所示。

清單 7. 一個簡單的條件測試

yorkie = 'Marlowe'

mutt = 'Kafka'

if len(yorkie) > len(mutt):

print('The yorkie wins!')

else:

print('The mutt wins!')

您還可以使用 if/elif/else(elif,等價於 PHP 中的 elseif)創建更加復雜的條件測試,如清單 8 所示。

清單 8. 一個比較復雜的條件測試

yorkie = 'Marlowe'

mutt = 'Kafka'

if len(yorkie) + len(mutt) > 15:

print('The yorkie and the mutt win!')

elif len(yorkie) + len(mutt) > 10:

print('Too close to tell!')

else:

print('Nobody wins!')

您可能會說,目前為止並沒有什麼與眾不同的地方:甚本上和想像中沒有太大區別。現在,我們來看 Python 處理列表的方式,您會發現兩種語言之間的不同之處。

列表

一種常用的列表類型是 tuple,它是不可變的。在 tuple 中載入一系列值之後,您不會更改它。Tuple 可以包含數字、字元串、變數,甚至其他 tuples。Tuples 從 0 開始建立索引,這很正常;您可以使用 -1 索引訪問最後一個項目。您還可以對 tuple 運行一些函數(請參見清單 9)。

清單 9. Tuples

items = (1, mutt, 'Honda', (1,2,3))

print items[1] #prints Kafka

print items[-1] #prints (1,2,3)

items2 = items[0:2] #items2 now contains (1, 'Kafka') thanks to slice operation

'Honda' in items #returns TRUE

len(items) #returns 4

items.index('Kafka') #returns 1, because second item matches this index location

列表與 tuple 類似,只不過它們是可變的。創建列表之後,您可以添加、刪除和更新列表中的值。列表使用方括弧,而不是圓括弧(()),如清單 10 所示。

清單 10. 列表

groceries = ['ham','spam','eggs']

len(groceries) #returns 3

print groceries[1] #prints spam

for x in groceries:

print x.upper() #prints HAM SPAM EGGS

groceries[2] = 'bacon'

groceries #list is now ['ham','spam','bacon']

groceries.append('eggs')

groceries #list is now ['ham', 'spam', 'bacon', 'eggs']

groceries.sort()

groceries #list is now ['bacon', 'eggs', 'ham', 'spam']

字典類似於關聯數組或散列;它使用鍵值對來存儲和限制信息。但它不使用方括弧和圓括弧,而是使用尖括弧。與列表類似,字典是可變的,這意味著您可以添加、刪除和更新其中的值(請參見清單 11)。

清單 11. 字典

colorvalues = {'red' : 1, 'blue' : 2, 'green' : 3, 'yellow' : 4, 'orange' : 5}

colorvalues #prints {'blue': 2, 'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}

colorvalues['blue'] #prints 2

colorvalues.keys() #retrieves all keys as a list:

#['blue', 'orange', 'green', 'yellow', 'red']

colorvalues.pop('blue') #prints 2 and removes the blue key/value pair

colorvalues #after pop, we have:

#{'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}

在 Python 中創建一個簡單的腳本

現在,您已經對 Python 有了一定的了解。接下來,我們將創建一個簡單的 Python 腳本。該腳本將讀取位於您的伺服器 /tmp 目錄下的 PHP 會話文件的數量,並在日誌文件中寫入摘要報告。在該腳本中,您將學習如何導入特定函數的模塊,如何使用文件,以及如何寫入日誌文件。您還將設置一系列變數來跟蹤所收集的信息。

清單 12 展示了整個腳本。打開一個編輯器,並將代碼粘貼到其中,然後在系統中將該文件保存為 tmp.py。然後,對該文件運行 chmod + x,使它成為可執行文件(假定您使用 UNIX? 系統)。

清單 12. tmp.py

#!/usr/bin/python

import os

from time import strftime

stamp = strftime("%Y-%m-%d %H:%M:%S")

logfile = '/path/to/your/logfile.log'

path = '/path/to/tmp/directory/'

files = os.listdir(path)

bytes = 0

numfiles = 0

for f in files:

if f.startswith('sess_'):

info = os.stat(path + f)

numfiles += 1

bytes += info[6]

if numfiles > 1:

title = 'files'

else:

title = 'file'

string = stamp + " -- " + str(numfiles) + " session "

+ title +", " + str(bytes) + " bytes "

file = open(logfile,"a")

file.writelines(string)

file.close()

在第一行中,您可以看到一個 hash-bang 行:它用於標識 Python 解釋器的位置。在我的系統中,它位於 /usr/bin/python。請根據系統需求調整這一行。

接下來的兩行用於導入特定的模塊,這些模塊將幫助您執行作業。考慮到腳本需要處理文件夾和文件,因此您需要導入 os 模塊,因為其中包含各種函數和方法,可幫助您列出文件、讀取文件和操作文件夾。您還需要寫入一個日誌文件,因此可以為條目添加一個時間戳 — 這就需要使用時間函數。您不需要所有時間函數,只需要導入 strftime函數即可。

在接下來的六行中,您設置了一些變數。第一個變數是 stamp,其中包含一個日期字元串。然後,您使用 strftime 函數創建了一個特定格式的時間戳 — 在本例中,時間戳的格式為 2010-01-03 12:43:03。

接下來,創建一個 logfile 變數,並在文件中添加一個實際存儲日誌文件消息的路徑(該文件不需要實際存在)。為簡單起見,我在 /logs 文件夾中放置了一個日誌文件,但您也可以將它放置在別處。同樣,path 變數包含到 /tmp 目錄的路徑。您可以使用任何路徑,只要使用斜杠作為結束即可 (/)。

接下來的三個變數也非常簡單:files 列表包含指定路徑中的所有文件和文件夾,另外還包含 bytes 和 numfiles 兩個變數。這兩個變數都設置為 0;腳本會在處理文件時遞增這些值。

完成所有這些定義之後,接下來就是腳本的核心了:一個簡單的 for 循環,用於處理文件列表中的各文件。每次運行循環時,腳本都會計算文件名;如果它以 sess_ 開頭,則腳本會對該文件運行 os.stat(),提取文件數據(比如創建時間、修改時間和位元組大小),遞增 numfiles 計數器並將該文件的位元組大小累計到總數中。

當循環完成運行後,腳本會檢查 numfiles 變數中的值是否大於 1。如果大於 1,則會將一個新的 title 變數設置為 files;否則,title 將被設置為單數形式的 file。

腳本的最後部分也非常簡單:您創建了一個 string 變數,並在該變數中添加了一行以時間戳開始的數據,並且其後還包含 numfiles(已轉換為字元串)和位元組(也已轉換為字元串)。請注意繼續字元();該字元可允許代碼運行到下一行。它是一個提高可讀性的小技巧。

然後,您使用 open() 函數以附加模式打開日誌文件(畢竟始終需要在該文件中添加內容),writelines() 函數會將字元串添加到日誌文件中,而 close() 函數用於關閉該文件。

現在,您已經創建了一個簡單的 Python 腳本。該腳本可用於完成許多任務,舉例來說,您可以設置一個 cron作業來每小時運行一次這個腳本,以幫助您跟蹤 24 小時內所使用的 PHP 會話的數量。您還可以使用 jQuery 或其他一些 JavaScript 框架通過 Ajax 連接這個腳本,用於為您提供日誌文件提要(如果採用這種方式,則需要使用 print命令來返回數據)。

I. 如何在kafka-python和confluent-kafka之間做出選擇

kafka-python:蠻荒的西部
kafka-python是最受歡迎的Kafka Python客戶端。我們過去使用時從未出現過任何問題,在我的《敏捷數據科學2.0》一書中我也用過它。然而在最近這個項目中,它卻出現了一個嚴重的問題。我們發現,當以文檔化的方式使用KafkaConsumer、Consumer迭代式地從消息隊列中獲取消息時,最終到達主題topic的由Consumer攜帶的消息通常會丟失。我們通過控制台Consumer的分析驗證了這一點。
需要更詳細說明的是,kafka-python和KafkaConsumer是與一個由SSL保護的Kafka服務(如Aiven Kafka)一同使用的,如下面這樣:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)

for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...

當以這樣的推薦方式使用時,KafkaConsumer會丟失消息。但有一個變通方案,就是保留所有消息。這個方案是Kafka服務提供商Aiven support提供給我們的。它看起來像這樣:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...

雖然這個變通方案可能有用,但README中的方法會丟棄消息使我對其失去興趣。所以我找到了一個替代方案。
confluent-kafka:企業支持
發現coufluent-kafka Python模塊時,我感到無比驚喜。它既能做librdkafka的外封裝,又非常小巧。librdkafka是一個用C語言寫的kafka庫,它是Go和.NET的基礎。更重要的是,它由Confluent公司支持。我愛開源,但是當「由非正式社區擁有或支持」這種方式效果不行的時候,或許該考慮給替代方案印上公章、即該由某個公司擁有或支持了。不過,我們並未購買商業支持。我們知道有人會維護這個庫的軟體質量,而且可以選擇買或不買商業支持,這一點真是太棒了。
用confluent-kafka替換kafka-python非常簡單。confluent-kafka使用poll方法,它類似於上面提到的訪問kafka-python的變通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())

kafka_consumer.close()

現在我們能收到所有消息了。我並不是說kafka-python工具不好,我相信社區會對它的問題做出反應並解決。但從現在開始,我會一直堅持使用confluent-kafka。
開源治理
開源是強大的,但是涉及到復雜的「大數據」和NoSQL工具時,通常需要有一家大公司在背後推動工具的開發。這樣你就知道,如果那個公司可以使用工具,那麼該工具應該擁有很好的基本功能。它的出現可能是非正式的,就像某公司發布類似FOSS的項目一樣,但也可能是正式的,就像某公司為工具提供商業支持一樣。當然,從另一個角度來看,如果一家與開源社區作對的公司負責開發某個工具,你便失去了控制權。你的意見可能無關緊要,除非你是付費客戶。
理想情況是採取開源治理,就像Apache基金會一樣,還有就是增加可用的商業支持選項。這對互聯網上大部分的免費軟體來說根本不可能。限制自己只使用那些公司蓋章批准後的工具將非常限制你的自由。這對於一些商店可能是正確選擇,但對於我們不是。我喜歡工具測試,如果工具很小,而且只專心做一件事,我就會使用它。
信任開源
對於更大型的工具,以上決策評估過程更為復雜。通常,我會看一下提交問題和貢獻者的數量,以及最後一次commit的日期。我可能會問朋友某個工具的情況,有時也會在推特上問。當你進行嗅探檢查後從Github選擇了一個項目,即說明你信任社區可以產出好的工具。對於大多數工具來說,這是沒問題的。
但信任社區可能存在問題。對於某個特定的工具,可能並沒有充分的理由讓你信任社區可以產出好的軟體。社區在目標、經驗和開源項目的投入時間方面各不相同。選擇工具時保持審慎態度十分重要,不要讓理想蒙蔽了判斷。

閱讀全文

與kafkapython教程相關的資料

熱點內容
華為amd雲伺服器 瀏覽:495
漢化編程卡是什麼意思 瀏覽:126
python學習pdf 瀏覽:313
祝緒丹程序員那麼可愛拍吻戲 瀏覽:198
asp源碼會員消費系統 瀏覽:113
java反射設置 瀏覽:152
python一行文 瀏覽:439
排序演算法優缺點 瀏覽:563
惡搞加密文件pdf 瀏覽:674
gif怎麼壓縮圖片大小 瀏覽:217
命令選擇當前不可用 瀏覽:158
歐幾里得演算法如何求逆元 瀏覽:506
男中學生上課解壓神器 瀏覽:373
加密狗拔掉之後怎麼辦 瀏覽:27
雲儲存平台源碼 瀏覽:847
解壓文件蘋果手機rar 瀏覽:149
centos開機命令行模式 瀏覽:697
遍歷所有listpython 瀏覽:660
力控加密文件夾 瀏覽:517
如何更改移動伺服器密碼 瀏覽:686