导航:首页 > 编程语言 > kafkapython教程

kafkapython教程

发布时间:2023-03-22 12:11:49

A. 如何使用python 连接kafka 并获取数据

连接
kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka

brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel

Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer

kafka

topic
里面提交数据后才能连接,否则会出错。

Kafka
中一个
consumer
需要指定
groupname

groue
中保存着
offset
等信息,新开启一个
group
会从
offset
0
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。

B. python每个线程消费不用数据

关于python每个线程消费不用数据相关资料如下
python kafka多线程消费数据

1、打印每个线粗搏程id,满足预期,开启了8个线程,每个线程号都不一样凯凳嫌;

 

2、查看kafka状态,也能满足预期,每个分区的消费者id都是不一样的,下面第二个图盯手是开启一个消费者时的状态,每个分区的消费者id都是相同的;对比之下能满足需求;

C. filebeat利用kafka进行日志实时传输

选择安装目录:例如安装在/usr/local/或者/opt/下都可以。

创建一个软链接:

filebeat的配置很简单,只需要指定input和output就可以了。

由于kafka server高低版本的客户端API区别较大,因此推荐同时使用高版本的filebeat和kafka server。 注意 高版本的filebeat配置使用低版本的kafka server会出现kafka server接受不到消息的情况。这里我使用的kafka server版本是:2.12-0.11.0.3,可参考 快速搭建kafka

编辑filebeat安装目录下 filebeat.yml 文件:

配置Filebeat inputs:

上面 /opt/test/*.log 是我要传输的日志,根据实际情况改成你自己的值。
配置Filebeat outputs:

"111.11.1.243:9092" 是我的单机kafka broker,如果你是kafka集群,请用 , 分隔。 test 是kafka topic,请改成你自己实际情况的值。另外以下信胡这段需要删除:

因为我并没有用到Elasticsearch,所以有多个输出在启动filebeat时会报错。到这里filebeat配置kafka就完成了,是不是很简单,让我们启动它测试一下。

启动,进入filebeat的安装目录:

查看是否启动:

很好,已经启动了。如果没有启动,请查看启动日志睁旦文件nohup.out。
停止:

随机生成日志脚本:

执行这段python脚本,开启一个kafka消费者如果成功消费日志消息:

哈哈,大功告成。 注 上面这段脚本要适时手动停止,因为它是个死循环,如果忘记手动停止那么就杯具了,我就是这样悉坦扰把机器写宕机的。

D. python 消费kafka 写入es 小记

# -*- coding: utf8 -*-

# __author__ = '小红帽'

# Date: 2020-05-11

"""Naval Fate.

Usage:

        py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>

        py_kafka_protobuf_consume.py -h | --help

        py_kafka_protobuf_consume.py --version

Options:

        -h --help                                      打印帮助信息.

        --bootstrap_servers=<host:port,host2:port2..>  kafka servers

        --groupId=<groupId>                            kafka消费组

        --topic=<topic_name>                            topic名称

        --es-servers=<host:port>                        ES 地址

        --index=<index_name>                            ES 索引

        --type=<doc> ES type

        --id=<order_id> 指定id主键,快速更新

"""

import json

from kafka import KafkaConsumer

from docopt import docopt

from elasticsearch import Elasticsearch

from elasticsearch import helpers

class Kafka_consumer():

    def __init__(self,args):

        self.topic = args['--topic']

        self.bootstrapServers = args['--bootstrap-servers']

        self.groupId = args['--groupId']

        self.id = args['--id']

        self.es_host = args['--es-servers'].split(':')[0]

        self.es_port = args['--es-servers'].split(':')[1]

        self.es_index = args['--index']

        self.es_type = args['--type']

        self.consumer = KafkaConsumer(

            bootstrap_servers=self.bootstrapServers,

            group_id=self.groupId,

            enable_auto_commit = True,

            auto_commit_interval_ms=5000,

            consumer_timeout_ms=5000

        )

    def consume_data_es(self):

        while True:

            try:

                es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)

                self.consumer.subscribe([self.topic])

                actions=[]

                for message in self.consumer:

                    if message is not None:

                        query = json.loads(message.value)['data'][0]

                        action = {

                            "_index": self.es_index,

                            "_type": self.es_type,

                            "_id": json.loads(message.value)['data'][0][self.id],

                            "_source": query

                        }

                        actions.append(action)

                        if len(actions) > 50:

                            helpers.bulk(client=es, actions=actions)

                            print("插入es %s 条数据" % len(actions))

                            actions = []

                if len(actions) > 0:

                    helpers.bulk(client=es, actions=actions)

                    print("等待超时时间,插入es %s 条数据" % len(actions))

                    actions=[]

            except BaseException as e:

                print(e)

if __name__ == '__main__':

    arguments = docopt(__doc__,version='sbin 1.0')

    consumer = Kafka_consumer(arguments)

    consumer.consume_data_es()

E. python怎么安装acl包

搜索本产品内容
查询
文档中心 > 消息队列 CKafka > SDK 文档 > Python SDK > 公网 SASL_SSL 方式接入
公网 SASL_SSL 方式接入
最近更新时间:2021-12-28 09:54:00

操作场景
前提条件基尘竖
操作步骤
步骤1:准备工作
步骤2:生产消息
步骤3:消费消息
操作场景
该任务以 Python 客户端为例,指导您使用公网 SASL_SSL 方式接入消息队列 CKafka 并收发消息。
前提条件
安装 Python
安装 pip
配置 ACL 策略
下载 Demo
下载 SASL_SSL 证书
操作步骤
步骤1:准备工作
创建接入点。
在 实例列表 页面,单击目标实例 ID,进入实例详情页。
在 基本信息 > 接入方式 中,单击添加路由策略,在打开窗口中选择:路由类型:公网域名接入,接入方式:SASL_SSL。

创建角色。
在用户管理页面新建角色,设置密码。

创建 Topic。
在控制台 topic 管理页面新建 Topic(参见 创建 Topic)。
添加 Python 依赖库。
执行以下命令安装:
pip install kafka-python
步骤2:生产消息
修改生产消息程序 procer.py 中配置参数。
procer = KafkaProcer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)

message = "Hello World! Hello Ckafka!"
msg = json.mps(message).encode()
procer.send('topic_name', value = msg)
print("proce message " + message + " success.")
procer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方搏大式模块的网络列复制。

sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置。
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。

CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书兄瞎路径。
编译并运行 procer.py。
查看运行结果。

在 CKafka 控制台 的 topic管理页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。

步骤3:消费消息
修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),

#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,

)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。

group_id
消费者的组 ID,根据业务需求自定义。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户名密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。

CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
编译并运行 consumer.py。
查看运行结果。

在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。

上一篇: 公网 SASL_PLAINTEXT 方式接入下一篇: VPC 网络接入
文档内容是否对您有帮助?
有帮助没帮助
如果遇到产品相关问题,您可咨询 在线客服 寻求帮助。

消息队列 CKafka 相关文档
API 概览
创建主题
签名方法
获取实例列表
删除主题白名单
增加主题白名单
错误返回结果
产品概述
获取主题属性
点击搜索腾讯云文档
取消
清除查询

F. Hadoop生态架构之kafka

1、定位:分布式的消息队列系统,同时提供数据分布式缓存态卜功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Procer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同procer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性拿烂的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致帆敏穗的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉

前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、procer发送数据:
]# ./bin/kafka-console-procer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定设置不同
分别在slave1和slave2上开启进程:
./bin/kafka-server-start.sh config/server.properties

创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafka.py

执行一次procer:
]# python procer_kafka.py

2、指定partition发送数据
]# python procer_kafka_2.py

3、指定partition读出数据
]# python consumer_kafka_2.py
consumer_kafka.py:

procer_kafka.py:

consumer_kafka_2.py:

procer_kafka_2.py:

1.新建./conf/kafka_test/flume_kafka.conf

2.启动flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动成功,如下图:

3.测试:
1.1flume监控产生的数据:
]# for i in seq 1 100 ; do echo '====> '$i >> 1.log ; done
1.2kafka消费数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:

G. pyflink消费kafka-connect-jdbc消息(带schema)

1、数据接入

通过kafka的restFul接口创建连接mysql的连接器并启动。迹山

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

    }

}

2.kafka-connect创建主题中的猛棚默认数据格式为

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消费带schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema为字符串, 用 ROW 函数封装 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行数据'

)comment '从kafkaConnect获取带模式的数据'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.执行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.执行结果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,执行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

H. PHP开发人员的Python基础知识

PHP(外文名:PHP: Hypertext Preprocessor,中文名:“超文本预处理器”)是一种通用开源脚本语言。语法吸收了C语言、Java和Perl的特点,利于学习,使用广泛,主要适用于Web开发领域。那么PHP开发人员的Python基础知识都有哪些呢?以下仅供参考!

常用缩略语

Ajax:异步 JavaScript + XML

XML:可扩展标记语言(Extensible Markup Language)

什么是 Python?

Python 的定义是一种 “通用的高级编程语言”。它以简洁性和易用性着称,而且是少有的几种对空格和缩进有要求的语言之一。Python 的主要作者 Guido Van Rossum 在社区中仍然非常活跃,并且被人们戏称为仁慈的领导。

Python 的灵活性和紧凑性是值得称赞的。它支持面向对象编程、结构化编程、面向方面编程以及函数编程等。Python 采用小内核设计,但具备大量扩展库,从而确保了该语言的紧凑性和灵活性。

从语法的角度来说,您会发现 Python 的简洁性异常突出——几乎可以说是一种纯粹的境界。PHP 开发人员要么会对这种方法的语法深深陶醉,要么会发现它的局限性。这主要取决于您自己的见解。Python 社区推动这种美感的态度是非常明确的,它们更加重视的是美学和简洁性,而不是灵动的技巧。已形成 Perl 传统(“可以通过多种方式实现它”)的 PHP 开发人员(像我自己)将面对一种完全相反的哲学(“应该只有一种方法可以实现它”)。

事实上,该社区定义了一种特有的代码风格术语,即 Python 化(pythonic)。您可以说您的代码是 Python 化,这是对 Python 术语的良好运用,同时还可展现语言的自然特性。本文并不打算成为 Pythonista(或 Pythoneer),但如果您想继续 Python 之路,那么千万不能错过本文的知识点。就像 PHP 有自己的编程风格,Perl 有自己的概念方法,学习 Python 语言必然也需要开始用该语言来思考问题。

另一个要点:在撰写本文时,Python 的最新版本是 V3.0,但本文主要侧重于 Python V2.6。Python V3.0 并不能向后兼容之前的版本,而且 V2.6 是使用最为广泛的版本。当然,您可以根据需求使用自己喜好的版本。

Python 与 PHP 有何不同?

一般来说,PHP 是一种 Web 开发语言。是的,它提供了一个命令行接口,并且甚至可用于开发嵌入式应用程序,但它主要还是用于 Web 开发。相反,Python 是一种脚本语言,并且也可用于 Web 开发。从这方面来说,我知道我会这样说——它比 PHP 更加接近 Perl。(当然,在其他方面,它们之间并无实际不同。我们继续往下看。)

PHP 的语法中充斥着美元符号($)和大括号({}),而 Python 相对来说则更加简洁和干净。PHP 支持 switch 和 do...while 结构,而 Python 则不尽然。PHP 使用三元操作符(foo?bar:baz)和冗长的函数名列表,而命名约定更是无所不有;相反,您会发现 Python 要简洁多了。PHP 的数组类型可同时支持简单列表和字典或散列,但 Python 却将这两者分开。

Python 同时使用可变性和不变性的概念:举例来说,tuple 就是一个不可变的列表。您可以创建 tuple,但在创建之后不能修改它。这一概念可能要花些时间来熟悉,但对于避免错误极为有效。当然,更改 tuple 的惟一方法是复制它。因此,如果您发现对不可变对象执行了大量更改,则应该重新考量自己的方法。

之前提到,Python 中的缩进是有含义的:您在刚开始学习该语言时会对此非常难以适应。您还可以创建使用关键字作为参数的函数和方法——这与 PHP 中的标准位置参数迥然不同。面向对象的追随者会对 Python 中真正的面向对象思想感到欣喜,当然还包括它的 “一级” 类和函数。如果您使用非英语语言,则会钟爱于 Python 强大的.国际化和 Unicode 支持。您还会喜欢 Python 的多线程功能;这也是最开始令我为之着迷的特性之一。

综上所述,PHP 和 Python 在许多方面都彼此类似。您可以方便地创建变量、循环,使用条件和创建函数。您甚至可以轻松地创建可重用的模块。两种语言的用户社区都充满活力和激情。PHP 的用户群体更加庞大,但这主要归因于它在托管服务器及 Web 专注性方面的优势和普及性。

很好 简要介绍到此为止。我们开始探索之旅。

使用 Python

清单 1 展示了一个基本的 Python 脚本。

清单 1. 一个简单的 Python 脚本

for i in range(20):

print(i)

清单 2 展示了脚本的必然结果。

清单 2. 清单 1 的结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

在深入探索之前,我们先来了解一些预备知识。首先从变量开始。

变量

可以看到,表示变量并不需要任何特殊的字符。变量 i 就是一个纯粹的 i——毫无特殊之处。表示代码块或语言结束也不需要任何特殊字符(比如分号和括号);只需要在 for 行使用一个简单的冒号即可(:)。还需注意,缩进会向 Python 指示哪些内容属于 for 循环。举例来说,清单 3 中的代码会在循环中为各编号输出一个说明。

清单 3. 为各循环添加一条语句

for i in range(20):

print(i)

print('all done?')

相反,清单 4 中的代码会在循环结束处输出一条说明。

清单 4. 在循环后添加一条语句

for i in range(20):

print(i)

print('all done!')

现在,我第一次看到这样的代码时,我认为这完全是无稽之谈。什么?让我相信换行和缩进能保证代码的结构和运行?请相信我,不用多久,您就会习惯它(但我需要承认必须到达到分号处才会结束语句的运行)。如果您与其他开发人员共同开发 Python 项目,则会发现这种可读性的用处是多么大了。您不再像以前那样总是猜测 “这个聪明的家伙在这里究竟想干些什么?”

在 PHP,您使用 = 操作符为变量分配值(参见 清单 5)。在 Python 中,您使用相同的操作符,只是需要标记或指向值。对于我来说,它就是赋值操作而已,我不需要过多担心专门的术语。

清单 5. 创建变量

yorkie = 'Marlowe' #meet our Yorkie Marlowe!

mutt = 'Kafka' #meet our mutt Kafka

print(mutt) #prints Kafka

Python 的变量名称约定与 PHP 类似:您在创建变量名时只能使用字母、数字和下划线(_)。同样,变量名的第一个字符不能是数字。Python 变量名是区分大小写的,并且您不能使用特定的 Python 关键字(比如 if、else、while、def、or、and、not、in 和 is 开始符)作为变量名。这没有什么值得奇怪的。

Python 允许您随意执行基于字符串的操作。清单 6 中的大多数操作应该都是您熟悉的。

清单 6. 常见的基于字符串的操作

yorkie = 'Marlowe'

mutt = 'Kafka'

ylen = len(yorkie) #length of variable yorkie

print(ylen) #prints 7

print(len(yorkie)) #does the same thing

len(yorkie) #also does the same thing, print is implicit

print(yorkie.lower()) #lower cases the string

print(yorkie.strip('aeiou')) #removes vowels from end of string

print(mutt.split('f')) #splits "Kafka" into ['Ka', 'ka']

print(mutt.count('a')) #prints 2, the number of a's in string

yorkie.replace('a','4') #replace a's with 4's

条件语句

您已经了解了如何使用 for 循环;现在,我们来讨论条件语句。您会发现 Phyon 中的条件语句与 PHP 基本相同:您可以使用熟悉的 if/else型结构,如清单 7 所示。

清单 7. 一个简单的条件测试

yorkie = 'Marlowe'

mutt = 'Kafka'

if len(yorkie) > len(mutt):

print('The yorkie wins!')

else:

print('The mutt wins!')

您还可以使用 if/elif/else(elif,等价于 PHP 中的 elseif)创建更加复杂的条件测试,如清单 8 所示。

清单 8. 一个比较复杂的条件测试

yorkie = 'Marlowe'

mutt = 'Kafka'

if len(yorkie) + len(mutt) > 15:

print('The yorkie and the mutt win!')

elif len(yorkie) + len(mutt) > 10:

print('Too close to tell!')

else:

print('Nobody wins!')

您可能会说,目前为止并没有什么与众不同的地方:甚本上和想象中没有太大区别。现在,我们来看 Python 处理列表的方式,您会发现两种语言之间的不同之处。

列表

一种常用的列表类型是 tuple,它是不可变的。在 tuple 中载入一系列值之后,您不会更改它。Tuple 可以包含数字、字符串、变量,甚至其他 tuples。Tuples 从 0 开始建立索引,这很正常;您可以使用 -1 索引访问最后一个项目。您还可以对 tuple 运行一些函数(请参见清单 9)。

清单 9. Tuples

items = (1, mutt, 'Honda', (1,2,3))

print items[1] #prints Kafka

print items[-1] #prints (1,2,3)

items2 = items[0:2] #items2 now contains (1, 'Kafka') thanks to slice operation

'Honda' in items #returns TRUE

len(items) #returns 4

items.index('Kafka') #returns 1, because second item matches this index location

列表与 tuple 类似,只不过它们是可变的。创建列表之后,您可以添加、删除和更新列表中的值。列表使用方括号,而不是圆括号(()),如清单 10 所示。

清单 10. 列表

groceries = ['ham','spam','eggs']

len(groceries) #returns 3

print groceries[1] #prints spam

for x in groceries:

print x.upper() #prints HAM SPAM EGGS

groceries[2] = 'bacon'

groceries #list is now ['ham','spam','bacon']

groceries.append('eggs')

groceries #list is now ['ham', 'spam', 'bacon', 'eggs']

groceries.sort()

groceries #list is now ['bacon', 'eggs', 'ham', 'spam']

字典类似于关联数组或散列;它使用键值对来存储和限制信息。但它不使用方括号和圆括号,而是使用尖括号。与列表类似,字典是可变的,这意味着您可以添加、删除和更新其中的值(请参见清单 11)。

清单 11. 字典

colorvalues = {'red' : 1, 'blue' : 2, 'green' : 3, 'yellow' : 4, 'orange' : 5}

colorvalues #prints {'blue': 2, 'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}

colorvalues['blue'] #prints 2

colorvalues.keys() #retrieves all keys as a list:

#['blue', 'orange', 'green', 'yellow', 'red']

colorvalues.pop('blue') #prints 2 and removes the blue key/value pair

colorvalues #after pop, we have:

#{'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}

在 Python 中创建一个简单的脚本

现在,您已经对 Python 有了一定的了解。接下来,我们将创建一个简单的 Python 脚本。该脚本将读取位于您的服务器 /tmp 目录下的 PHP 会话文件的数量,并在日志文件中写入摘要报告。在该脚本中,您将学习如何导入特定函数的模块,如何使用文件,以及如何写入日志文件。您还将设置一系列变量来跟踪所收集的信息。

清单 12 展示了整个脚本。打开一个编辑器,并将代码粘贴到其中,然后在系统中将该文件保存为 tmp.py。然后,对该文件运行 chmod + x,使它成为可执行文件(假定您使用 UNIX? 系统)。

清单 12. tmp.py

#!/usr/bin/python

import os

from time import strftime

stamp = strftime("%Y-%m-%d %H:%M:%S")

logfile = '/path/to/your/logfile.log'

path = '/path/to/tmp/directory/'

files = os.listdir(path)

bytes = 0

numfiles = 0

for f in files:

if f.startswith('sess_'):

info = os.stat(path + f)

numfiles += 1

bytes += info[6]

if numfiles > 1:

title = 'files'

else:

title = 'file'

string = stamp + " -- " + str(numfiles) + " session "

+ title +", " + str(bytes) + " bytes "

file = open(logfile,"a")

file.writelines(string)

file.close()

在第一行中,您可以看到一个 hash-bang 行:它用于标识 Python 解释器的位置。在我的系统中,它位于 /usr/bin/python。请根据系统需求调整这一行。

接下来的两行用于导入特定的模块,这些模块将帮助您执行作业。考虑到脚本需要处理文件夹和文件,因此您需要导入 os 模块,因为其中包含各种函数和方法,可帮助您列出文件、读取文件和操作文件夹。您还需要写入一个日志文件,因此可以为条目添加一个时间戳 — 这就需要使用时间函数。您不需要所有时间函数,只需要导入 strftime函数即可。

在接下来的六行中,您设置了一些变量。第一个变量是 stamp,其中包含一个日期字符串。然后,您使用 strftime 函数创建了一个特定格式的时间戳 — 在本例中,时间戳的格式为 2010-01-03 12:43:03。

接下来,创建一个 logfile 变量,并在文件中添加一个实际存储日志文件消息的路径(该文件不需要实际存在)。为简单起见,我在 /logs 文件夹中放置了一个日志文件,但您也可以将它放置在别处。同样,path 变量包含到 /tmp 目录的路径。您可以使用任何路径,只要使用斜杠作为结束即可 (/)。

接下来的三个变量也非常简单:files 列表包含指定路径中的所有文件和文件夹,另外还包含 bytes 和 numfiles 两个变量。这两个变量都设置为 0;脚本会在处理文件时递增这些值。

完成所有这些定义之后,接下来就是脚本的核心了:一个简单的 for 循环,用于处理文件列表中的各文件。每次运行循环时,脚本都会计算文件名;如果它以 sess_ 开头,则脚本会对该文件运行 os.stat(),提取文件数据(比如创建时间、修改时间和字节大小),递增 numfiles 计数器并将该文件的字节大小累计到总数中。

当循环完成运行后,脚本会检查 numfiles 变量中的值是否大于 1。如果大于 1,则会将一个新的 title 变量设置为 files;否则,title 将被设置为单数形式的 file。

脚本的最后部分也非常简单:您创建了一个 string 变量,并在该变量中添加了一行以时间戳开始的数据,并且其后还包含 numfiles(已转换为字符串)和字节(也已转换为字符串)。请注意继续字符();该字符可允许代码运行到下一行。它是一个提高可读性的小技巧。

然后,您使用 open() 函数以附加模式打开日志文件(毕竟始终需要在该文件中添加内容),writelines() 函数会将字符串添加到日志文件中,而 close() 函数用于关闭该文件。

现在,您已经创建了一个简单的 Python 脚本。该脚本可用于完成许多任务,举例来说,您可以设置一个 cron作业来每小时运行一次这个脚本,以帮助您跟踪 24 小时内所使用的 PHP 会话的数量。您还可以使用 jQuery 或其他一些 JavaScript 框架通过 Ajax 连接这个脚本,用于为您提供日志文件提要(如果采用这种方式,则需要使用 print命令来返回数据)。

I. 如何在kafka-python和confluent-kafka之间做出选择

kafka-python:蛮荒的西部
kafka-python是最受欢迎的Kafka Python客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)

for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...

当以这样的推荐方式使用时,KafkaConsumer会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。它看起来像这样:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...

虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。
confluent-kafka:企业支持
发现coufluent-kafka Python模块时,我感到无比惊喜。它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())

kafka_consumer.close()

现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用confluent-kafka。
开源治理
开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。
信任开源
对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。

阅读全文

与kafkapython教程相关的资料

热点内容
iphone13对wap3加密 浏览:553
pdf文件打开失败 浏览:911
dubbo怎么调用不同服务器接口 浏览:38
全能解压王app历史版本 浏览:73
优先队列与拓扑排序算法 浏览:279
pdf转换formacbook 浏览:869
pdf文件内容怎么编辑 浏览:46
134压缩机排气温度多少 浏览:254
unity等待编译后 浏览:804
黑鲨手机锁屏视频在哪个文件夹 浏览:779
wow地图解压后怎么压缩 浏览:819
有pdf却打不开 浏览:460
七星彩软件app怎么下载 浏览:217
32单片机的重映射哪里改 浏览:816
为什么前端不用刷算法题 浏览:708
对称加密系统和公钥加密系统 浏览:428
历史地理pdf 浏览:606
物联网云服务器框架 浏览:648
sybaseisql命令 浏览:183
android权威编程指南pdf 浏览:663