A. 如何使用python 连接kafka 并获取数据
连接
kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel
库
Procer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
print
msg
Tip
consumer
必需在
procer
向
kafka
的
topic
里面提交数据后才能连接,否则会出错。
在
Kafka
中一个
consumer
需要指定
groupname
,
groue
中保存着
offset
等信息,新开启一个
group
会从
offset
0
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。
B. python每个线程消费不用数据
关于python每个线程消费不用数据相关资料如下
python kafka多线程消费数据
1、打印每个线粗搏程id,满足预期,开启了8个线程,每个线程号都不一样凯凳嫌;
2、查看kafka状态,也能满足预期,每个分区的消费者id都是不一样的,下面第二个图盯手是开启一个消费者时的状态,每个分区的消费者id都是相同的;对比之下能满足需求;
C. filebeat利用kafka进行日志实时传输
选择安装目录:例如安装在/usr/local/或者/opt/下都可以。
创建一个软链接:
filebeat的配置很简单,只需要指定input和output就可以了。
由于kafka server高低版本的客户端API区别较大,因此推荐同时使用高版本的filebeat和kafka server。 注意 高版本的filebeat配置使用低版本的kafka server会出现kafka server接受不到消息的情况。这里我使用的kafka server版本是:2.12-0.11.0.3,可参考 快速搭建kafka
编辑filebeat安装目录下 filebeat.yml 文件:
配置Filebeat inputs:
上面 /opt/test/*.log 是我要传输的日志,根据实际情况改成你自己的值。
配置Filebeat outputs:
"111.11.1.243:9092" 是我的单机kafka broker,如果你是kafka集群,请用 , 分隔。 test 是kafka topic,请改成你自己实际情况的值。另外以下信胡这段需要删除:
因为我并没有用到Elasticsearch,所以有多个输出在启动filebeat时会报错。到这里filebeat配置kafka就完成了,是不是很简单,让我们启动它测试一下。
启动,进入filebeat的安装目录:
查看是否启动:
很好,已经启动了。如果没有启动,请查看启动日志睁旦文件nohup.out。
停止:
随机生成日志脚本:
执行这段python脚本,开启一个kafka消费者如果成功消费日志消息:
哈哈,大功告成。 注 上面这段脚本要适时手动停止,因为它是个死循环,如果忘记手动停止那么就杯具了,我就是这样悉坦扰把机器写宕机的。
D. python 消费kafka 写入es 小记
# -*- coding: utf8 -*-
# __author__ = '小红帽'
# Date: 2020-05-11
"""Naval Fate.
Usage:
py_kafka_protobuf_consume.py --bootstrap-servers=<host:port,host2:port2..> --groupId=<groupId> --topic=<topic_name> --es-servers=<host:port> --index=<schema> --type=<doc> --id=<order_id>
py_kafka_protobuf_consume.py -h | --help
py_kafka_protobuf_consume.py --version
Options:
-h --help 打印帮助信息.
--bootstrap_servers=<host:port,host2:port2..> kafka servers
--groupId=<groupId> kafka消费组
--topic=<topic_name> topic名称
--es-servers=<host:port> ES 地址
--index=<index_name> ES 索引
--type=<doc> ES type
--id=<order_id> 指定id主键,快速更新
"""
import json
from kafka import KafkaConsumer
from docopt import docopt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Kafka_consumer():
def __init__(self,args):
self.topic = args['--topic']
self.bootstrapServers = args['--bootstrap-servers']
self.groupId = args['--groupId']
self.id = args['--id']
self.es_host = args['--es-servers'].split(':')[0]
self.es_port = args['--es-servers'].split(':')[1]
self.es_index = args['--index']
self.es_type = args['--type']
self.consumer = KafkaConsumer(
bootstrap_servers=self.bootstrapServers,
group_id=self.groupId,
enable_auto_commit = True,
auto_commit_interval_ms=5000,
consumer_timeout_ms=5000
)
def consume_data_es(self):
while True:
try:
es = Elasticsearch([{'host': self.es_host, 'port': self.es_port}], timeout=3600)
self.consumer.subscribe([self.topic])
actions=[]
for message in self.consumer:
if message is not None:
query = json.loads(message.value)['data'][0]
action = {
"_index": self.es_index,
"_type": self.es_type,
"_id": json.loads(message.value)['data'][0][self.id],
"_source": query
}
actions.append(action)
if len(actions) > 50:
helpers.bulk(client=es, actions=actions)
print("插入es %s 条数据" % len(actions))
actions = []
if len(actions) > 0:
helpers.bulk(client=es, actions=actions)
print("等待超时时间,插入es %s 条数据" % len(actions))
actions=[]
except BaseException as e:
print(e)
if __name__ == '__main__':
arguments = docopt(__doc__,version='sbin 1.0')
consumer = Kafka_consumer(arguments)
consumer.consume_data_es()
E. python怎么安装acl包
搜索本产品内容
查询
文档中心 > 消息队列 CKafka > SDK 文档 > Python SDK > 公网 SASL_SSL 方式接入
公网 SASL_SSL 方式接入
最近更新时间:2021-12-28 09:54:00
操作场景
前提条件基尘竖
操作步骤
步骤1:准备工作
步骤2:生产消息
步骤3:消费消息
操作场景
该任务以 Python 客户端为例,指导您使用公网 SASL_SSL 方式接入消息队列 CKafka 并收发消息。
前提条件
安装 Python
安装 pip
配置 ACL 策略
下载 Demo
下载 SASL_SSL 证书
操作步骤
步骤1:准备工作
创建接入点。
在 实例列表 页面,单击目标实例 ID,进入实例详情页。
在 基本信息 > 接入方式 中,单击添加路由策略,在打开窗口中选择:路由类型:公网域名接入,接入方式:SASL_SSL。
创建角色。
在用户管理页面新建角色,设置密码。
创建 Topic。
在控制台 topic 管理页面新建 Topic(参见 创建 Topic)。
添加 Python 依赖库。
执行以下命令安装:
pip install kafka-python
步骤2:生产消息
修改生产消息程序 procer.py 中配置参数。
procer = KafkaProcer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),
#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
message = "Hello World! Hello Ckafka!"
msg = json.mps(message).encode()
procer.send('topic_name', value = msg)
print("proce message " + message + " success.")
procer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方搏大式模块的网络列复制。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置。
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书兄瞎路径。
编译并运行 procer.py。
查看运行结果。
在 CKafka 控制台 的 topic管理页面,选择对应的 Topic , 单击更多 > 消息查询,查看刚刚发送的消息。
步骤3:消费消息
修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),
#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
group_id
消费者的组 ID,根据业务需求自定义。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户名密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
编译并运行 consumer.py。
查看运行结果。
在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。
上一篇: 公网 SASL_PLAINTEXT 方式接入下一篇: VPC 网络接入
文档内容是否对您有帮助?
有帮助没帮助
如果遇到产品相关问题,您可咨询 在线客服 寻求帮助。
消息队列 CKafka 相关文档
API 概览
创建主题
签名方法
获取实例列表
删除主题白名单
增加主题白名单
错误返回结果
产品概述
获取主题属性
点击搜索腾讯云文档
取消
清除查询
F. Hadoop生态架构之kafka
1、定位:分布式的消息队列系统,同时提供数据分布式缓存态卜功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Procer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同procer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性拿烂的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致帆敏穗的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉
前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、procer发送数据:
]# ./bin/kafka-console-procer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定设置不同
分别在slave1和slave2上开启进程:
./bin/kafka-server-start.sh config/server.properties
创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafka.py
执行一次procer:
]# python procer_kafka.py
2、指定partition发送数据
]# python procer_kafka_2.py
3、指定partition读出数据
]# python consumer_kafka_2.py
consumer_kafka.py:
procer_kafka.py:
consumer_kafka_2.py:
procer_kafka_2.py:
1.新建./conf/kafka_test/flume_kafka.conf
2.启动flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动成功,如下图:
3.测试:
1.1flume监控产生的数据:
]# for i in seq 1 100 ; do echo '====> '$i >> 1.log ; done
1.2kafka消费数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:
G. pyflink消费kafka-connect-jdbc消息(带schema)
1、数据接入
通过kafka的restFul接口创建连接mysql的连接器并启动。迹山
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
}
}
2.kafka-connect创建主题中的猛棚默认数据格式为
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}
3.使用pyflink消费带schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema为字符串, 用 ROW 函数封装 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行数据'
)comment '从kafkaConnect获取带模式的数据'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.执行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.执行结果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,执行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
H. PHP开发人员的Python基础知识
PHP(外文名:PHP: Hypertext Preprocessor,中文名:“超文本预处理器”)是一种通用开源脚本语言。语法吸收了C语言、Java和Perl的特点,利于学习,使用广泛,主要适用于Web开发领域。那么PHP开发人员的Python基础知识都有哪些呢?以下仅供参考!
常用缩略语
Ajax:异步 JavaScript + XML
XML:可扩展标记语言(Extensible Markup Language)
什么是 Python?
Python 的定义是一种 “通用的高级编程语言”。它以简洁性和易用性着称,而且是少有的几种对空格和缩进有要求的语言之一。Python 的主要作者 Guido Van Rossum 在社区中仍然非常活跃,并且被人们戏称为仁慈的领导。
Python 的灵活性和紧凑性是值得称赞的。它支持面向对象编程、结构化编程、面向方面编程以及函数编程等。Python 采用小内核设计,但具备大量扩展库,从而确保了该语言的紧凑性和灵活性。
从语法的角度来说,您会发现 Python 的简洁性异常突出——几乎可以说是一种纯粹的境界。PHP 开发人员要么会对这种方法的语法深深陶醉,要么会发现它的局限性。这主要取决于您自己的见解。Python 社区推动这种美感的态度是非常明确的,它们更加重视的是美学和简洁性,而不是灵动的技巧。已形成 Perl 传统(“可以通过多种方式实现它”)的 PHP 开发人员(像我自己)将面对一种完全相反的哲学(“应该只有一种方法可以实现它”)。
事实上,该社区定义了一种特有的代码风格术语,即 Python 化(pythonic)。您可以说您的代码是 Python 化,这是对 Python 术语的良好运用,同时还可展现语言的自然特性。本文并不打算成为 Pythonista(或 Pythoneer),但如果您想继续 Python 之路,那么千万不能错过本文的知识点。就像 PHP 有自己的编程风格,Perl 有自己的概念方法,学习 Python 语言必然也需要开始用该语言来思考问题。
另一个要点:在撰写本文时,Python 的最新版本是 V3.0,但本文主要侧重于 Python V2.6。Python V3.0 并不能向后兼容之前的版本,而且 V2.6 是使用最为广泛的版本。当然,您可以根据需求使用自己喜好的版本。
Python 与 PHP 有何不同?
一般来说,PHP 是一种 Web 开发语言。是的,它提供了一个命令行接口,并且甚至可用于开发嵌入式应用程序,但它主要还是用于 Web 开发。相反,Python 是一种脚本语言,并且也可用于 Web 开发。从这方面来说,我知道我会这样说——它比 PHP 更加接近 Perl。(当然,在其他方面,它们之间并无实际不同。我们继续往下看。)
PHP 的语法中充斥着美元符号($)和大括号({}),而 Python 相对来说则更加简洁和干净。PHP 支持 switch 和 do...while 结构,而 Python 则不尽然。PHP 使用三元操作符(foo?bar:baz)和冗长的函数名列表,而命名约定更是无所不有;相反,您会发现 Python 要简洁多了。PHP 的数组类型可同时支持简单列表和字典或散列,但 Python 却将这两者分开。
Python 同时使用可变性和不变性的概念:举例来说,tuple 就是一个不可变的列表。您可以创建 tuple,但在创建之后不能修改它。这一概念可能要花些时间来熟悉,但对于避免错误极为有效。当然,更改 tuple 的惟一方法是复制它。因此,如果您发现对不可变对象执行了大量更改,则应该重新考量自己的方法。
之前提到,Python 中的缩进是有含义的:您在刚开始学习该语言时会对此非常难以适应。您还可以创建使用关键字作为参数的函数和方法——这与 PHP 中的标准位置参数迥然不同。面向对象的追随者会对 Python 中真正的面向对象思想感到欣喜,当然还包括它的 “一级” 类和函数。如果您使用非英语语言,则会钟爱于 Python 强大的.国际化和 Unicode 支持。您还会喜欢 Python 的多线程功能;这也是最开始令我为之着迷的特性之一。
综上所述,PHP 和 Python 在许多方面都彼此类似。您可以方便地创建变量、循环,使用条件和创建函数。您甚至可以轻松地创建可重用的模块。两种语言的用户社区都充满活力和激情。PHP 的用户群体更加庞大,但这主要归因于它在托管服务器及 Web 专注性方面的优势和普及性。
很好 简要介绍到此为止。我们开始探索之旅。
使用 Python
清单 1 展示了一个基本的 Python 脚本。
清单 1. 一个简单的 Python 脚本
for i in range(20):
print(i)
清单 2 展示了脚本的必然结果。
清单 2. 清单 1 的结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在深入探索之前,我们先来了解一些预备知识。首先从变量开始。
变量
可以看到,表示变量并不需要任何特殊的字符。变量 i 就是一个纯粹的 i——毫无特殊之处。表示代码块或语言结束也不需要任何特殊字符(比如分号和括号);只需要在 for 行使用一个简单的冒号即可(:)。还需注意,缩进会向 Python 指示哪些内容属于 for 循环。举例来说,清单 3 中的代码会在循环中为各编号输出一个说明。
清单 3. 为各循环添加一条语句
for i in range(20):
print(i)
print('all done?')
相反,清单 4 中的代码会在循环结束处输出一条说明。
清单 4. 在循环后添加一条语句
for i in range(20):
print(i)
print('all done!')
现在,我第一次看到这样的代码时,我认为这完全是无稽之谈。什么?让我相信换行和缩进能保证代码的结构和运行?请相信我,不用多久,您就会习惯它(但我需要承认必须到达到分号处才会结束语句的运行)。如果您与其他开发人员共同开发 Python 项目,则会发现这种可读性的用处是多么大了。您不再像以前那样总是猜测 “这个聪明的家伙在这里究竟想干些什么?”
在 PHP,您使用 = 操作符为变量分配值(参见 清单 5)。在 Python 中,您使用相同的操作符,只是需要标记或指向值。对于我来说,它就是赋值操作而已,我不需要过多担心专门的术语。
清单 5. 创建变量
yorkie = 'Marlowe' #meet our Yorkie Marlowe!
mutt = 'Kafka' #meet our mutt Kafka
print(mutt) #prints Kafka
Python 的变量名称约定与 PHP 类似:您在创建变量名时只能使用字母、数字和下划线(_)。同样,变量名的第一个字符不能是数字。Python 变量名是区分大小写的,并且您不能使用特定的 Python 关键字(比如 if、else、while、def、or、and、not、in 和 is 开始符)作为变量名。这没有什么值得奇怪的。
Python 允许您随意执行基于字符串的操作。清单 6 中的大多数操作应该都是您熟悉的。
清单 6. 常见的基于字符串的操作
yorkie = 'Marlowe'
mutt = 'Kafka'
ylen = len(yorkie) #length of variable yorkie
print(ylen) #prints 7
print(len(yorkie)) #does the same thing
len(yorkie) #also does the same thing, print is implicit
print(yorkie.lower()) #lower cases the string
print(yorkie.strip('aeiou')) #removes vowels from end of string
print(mutt.split('f')) #splits "Kafka" into ['Ka', 'ka']
print(mutt.count('a')) #prints 2, the number of a's in string
yorkie.replace('a','4') #replace a's with 4's
条件语句
您已经了解了如何使用 for 循环;现在,我们来讨论条件语句。您会发现 Phyon 中的条件语句与 PHP 基本相同:您可以使用熟悉的 if/else型结构,如清单 7 所示。
清单 7. 一个简单的条件测试
yorkie = 'Marlowe'
mutt = 'Kafka'
if len(yorkie) > len(mutt):
print('The yorkie wins!')
else:
print('The mutt wins!')
您还可以使用 if/elif/else(elif,等价于 PHP 中的 elseif)创建更加复杂的条件测试,如清单 8 所示。
清单 8. 一个比较复杂的条件测试
yorkie = 'Marlowe'
mutt = 'Kafka'
if len(yorkie) + len(mutt) > 15:
print('The yorkie and the mutt win!')
elif len(yorkie) + len(mutt) > 10:
print('Too close to tell!')
else:
print('Nobody wins!')
您可能会说,目前为止并没有什么与众不同的地方:甚本上和想象中没有太大区别。现在,我们来看 Python 处理列表的方式,您会发现两种语言之间的不同之处。
列表
一种常用的列表类型是 tuple,它是不可变的。在 tuple 中载入一系列值之后,您不会更改它。Tuple 可以包含数字、字符串、变量,甚至其他 tuples。Tuples 从 0 开始建立索引,这很正常;您可以使用 -1 索引访问最后一个项目。您还可以对 tuple 运行一些函数(请参见清单 9)。
清单 9. Tuples
items = (1, mutt, 'Honda', (1,2,3))
print items[1] #prints Kafka
print items[-1] #prints (1,2,3)
items2 = items[0:2] #items2 now contains (1, 'Kafka') thanks to slice operation
'Honda' in items #returns TRUE
len(items) #returns 4
items.index('Kafka') #returns 1, because second item matches this index location
列表与 tuple 类似,只不过它们是可变的。创建列表之后,您可以添加、删除和更新列表中的值。列表使用方括号,而不是圆括号(()),如清单 10 所示。
清单 10. 列表
groceries = ['ham','spam','eggs']
len(groceries) #returns 3
print groceries[1] #prints spam
for x in groceries:
print x.upper() #prints HAM SPAM EGGS
groceries[2] = 'bacon'
groceries #list is now ['ham','spam','bacon']
groceries.append('eggs')
groceries #list is now ['ham', 'spam', 'bacon', 'eggs']
groceries.sort()
groceries #list is now ['bacon', 'eggs', 'ham', 'spam']
字典类似于关联数组或散列;它使用键值对来存储和限制信息。但它不使用方括号和圆括号,而是使用尖括号。与列表类似,字典是可变的,这意味着您可以添加、删除和更新其中的值(请参见清单 11)。
清单 11. 字典
colorvalues = {'red' : 1, 'blue' : 2, 'green' : 3, 'yellow' : 4, 'orange' : 5}
colorvalues #prints {'blue': 2, 'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}
colorvalues['blue'] #prints 2
colorvalues.keys() #retrieves all keys as a list:
#['blue', 'orange', 'green', 'yellow', 'red']
colorvalues.pop('blue') #prints 2 and removes the blue key/value pair
colorvalues #after pop, we have:
#{'orange': 5, 'green': 3, 'yellow': 4, 'red': 1}
在 Python 中创建一个简单的脚本
现在,您已经对 Python 有了一定的了解。接下来,我们将创建一个简单的 Python 脚本。该脚本将读取位于您的服务器 /tmp 目录下的 PHP 会话文件的数量,并在日志文件中写入摘要报告。在该脚本中,您将学习如何导入特定函数的模块,如何使用文件,以及如何写入日志文件。您还将设置一系列变量来跟踪所收集的信息。
清单 12 展示了整个脚本。打开一个编辑器,并将代码粘贴到其中,然后在系统中将该文件保存为 tmp.py。然后,对该文件运行 chmod + x,使它成为可执行文件(假定您使用 UNIX? 系统)。
清单 12. tmp.py
#!/usr/bin/python
import os
from time import strftime
stamp = strftime("%Y-%m-%d %H:%M:%S")
logfile = '/path/to/your/logfile.log'
path = '/path/to/tmp/directory/'
files = os.listdir(path)
bytes = 0
numfiles = 0
for f in files:
if f.startswith('sess_'):
info = os.stat(path + f)
numfiles += 1
bytes += info[6]
if numfiles > 1:
title = 'files'
else:
title = 'file'
string = stamp + " -- " + str(numfiles) + " session "
+ title +", " + str(bytes) + " bytes "
file = open(logfile,"a")
file.writelines(string)
file.close()
在第一行中,您可以看到一个 hash-bang 行:它用于标识 Python 解释器的位置。在我的系统中,它位于 /usr/bin/python。请根据系统需求调整这一行。
接下来的两行用于导入特定的模块,这些模块将帮助您执行作业。考虑到脚本需要处理文件夹和文件,因此您需要导入 os 模块,因为其中包含各种函数和方法,可帮助您列出文件、读取文件和操作文件夹。您还需要写入一个日志文件,因此可以为条目添加一个时间戳 — 这就需要使用时间函数。您不需要所有时间函数,只需要导入 strftime函数即可。
在接下来的六行中,您设置了一些变量。第一个变量是 stamp,其中包含一个日期字符串。然后,您使用 strftime 函数创建了一个特定格式的时间戳 — 在本例中,时间戳的格式为 2010-01-03 12:43:03。
接下来,创建一个 logfile 变量,并在文件中添加一个实际存储日志文件消息的路径(该文件不需要实际存在)。为简单起见,我在 /logs 文件夹中放置了一个日志文件,但您也可以将它放置在别处。同样,path 变量包含到 /tmp 目录的路径。您可以使用任何路径,只要使用斜杠作为结束即可 (/)。
接下来的三个变量也非常简单:files 列表包含指定路径中的所有文件和文件夹,另外还包含 bytes 和 numfiles 两个变量。这两个变量都设置为 0;脚本会在处理文件时递增这些值。
完成所有这些定义之后,接下来就是脚本的核心了:一个简单的 for 循环,用于处理文件列表中的各文件。每次运行循环时,脚本都会计算文件名;如果它以 sess_ 开头,则脚本会对该文件运行 os.stat(),提取文件数据(比如创建时间、修改时间和字节大小),递增 numfiles 计数器并将该文件的字节大小累计到总数中。
当循环完成运行后,脚本会检查 numfiles 变量中的值是否大于 1。如果大于 1,则会将一个新的 title 变量设置为 files;否则,title 将被设置为单数形式的 file。
脚本的最后部分也非常简单:您创建了一个 string 变量,并在该变量中添加了一行以时间戳开始的数据,并且其后还包含 numfiles(已转换为字符串)和字节(也已转换为字符串)。请注意继续字符();该字符可允许代码运行到下一行。它是一个提高可读性的小技巧。
然后,您使用 open() 函数以附加模式打开日志文件(毕竟始终需要在该文件中添加内容),writelines() 函数会将字符串添加到日志文件中,而 close() 函数用于关闭该文件。
现在,您已经创建了一个简单的 Python 脚本。该脚本可用于完成许多任务,举例来说,您可以设置一个 cron作业来每小时运行一次这个脚本,以帮助您跟踪 24 小时内所使用的 PHP 会话的数量。您还可以使用 jQuery 或其他一些 JavaScript 框架通过 Ajax 连接这个脚本,用于为您提供日志文件提要(如果采用这种方式,则需要使用 print命令来返回数据)。
I. 如何在kafka-python和confluent-kafka之间做出选择
kafka-python:蛮荒的西部
kafka-python是最受欢迎的Kafka Python客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer迭代式地从消息队列中获取消息时,最终到达主题topic的由Consumer携带的消息通常会丢失。我们通过控制台Consumer的分析验证了这一点。
需要更详细说明的是,kafka-python和KafkaConsumer是与一个由SSL保护的Kafka服务(如Aiven Kafka)一同使用的,如下面这样:
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...
当以这样的推荐方式使用时,KafkaConsumer会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是Kafka服务提供商Aiven support提供给我们的。它看起来像这样:
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...
虽然这个变通方案可能有用,但README中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。
confluent-kafka:企业支持
发现coufluent-kafka Python模块时,我感到无比惊喜。它既能做librdkafka的外封装,又非常小巧。librdkafka是一个用C语言写的kafka库,它是Go和.NET的基础。更重要的是,它由Confluent公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。
用confluent-kafka替换kafka-python非常简单。confluent-kafka使用poll方法,它类似于上面提到的访问kafka-python的变通方案。
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()
现在我们能收到所有消息了。我并不是说kafka-python工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用confluent-kafka。
开源治理
开源是强大的,但是涉及到复杂的“大数据”和NoSQL工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似FOSS的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。
理想情况是采取开源治理,就像Apache基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。
信任开源
对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次commit的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从Github选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。
但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。