㈠ 官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效
Flink 1.13版本的发布标志着其在流处理应用上的重大进展,为用户提供了更简单、更自然的使用体验,同时也带来了丰富的性能优化与新特性。
在被动扩缩容方面,Flink 1.13实现了让用户仅通过调整并发度即可轻松进行流作业的扩缩容,这一功能极大地简化了流处理应用的资源管理过程,使其与普通应用的使用体验一致。
针对性能分析,Flink 1.13引入了一系列工具,如负载和反压可视化、CPU火焰图和State访问性能指标,帮助用户更直观地理解流作业的性能瓶颈,优化应用效率。
为了提升应用性能,Flink 1.13增强了瓶颈检测与反压监控机制,引入了描述作业繁忙度与反压程度的指标,并优化了反压检测逻辑,使得用户能够更精确地定位性能问题。
此外,Flink 1.13还提供了火焰图这一有效的可视化工具,帮助用户识别瓶颈算子中的高耗能计算逻辑,进一步提升应用性能。
在State访问延迟指标方面,Flink 1.13通过优化State Backend的性能指标,帮助用户识别State访问性能瓶颈,特别是当状态超过内存容量时,指导用户合理配置内存和I/O资源。
为了提供更灵活的部署方式,Flink 1.13支持用户自定义Pod模板进行Kubernetes部署,以及通过Savepoint切换State Backend,使得用户可以根据需求调整状态后端,以优化性能。
在SQL和Table API方面,Flink 1.13增强了SQL Client功能,简化了初始化脚本和语句集,支持更多的配置项,提高了SQL Client与SQL脚本的兼容性和互操作性。
此外,Flink 1.13还支持使用Hive SQL语法,优化了SQL时间函数,引入了用户自定义窗口和基于行的操作,增强了python DataStream和Table API的功能,以及提供了支持Batch执行模式的DataStream API。
在优化与增强功能方面,Flink 1.13改进了文档系统,支持展示历史异常,优化了失败Checkpoint的异常报告,提供了‘恰好一次’一致性支持的JDBC Sink,以及在Group窗口上支持用户自定义的聚合函数等。
为了帮助用户顺利升级到Flink 1.13,本文提供了关键改动的注意事项,包括理解被动扩缩容机制、性能分析工具的使用、SQL Client功能的优化等内容。
㈡ pyflink消费kafka-connect-jdbc消息(带schema)
1、数据接入
通过kafka的restFul接口创建连接mysql的连接器并启动。迹山
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
}
}
2.kafka-connect创建主题中的猛棚默认数据格式为
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}
3.使用pyflink消费带schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema为字符串, 用 ROW 函数封装 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行数据'
)comment '从kafkaConnect获取带模式的数据'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.执行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.执行结果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,执行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
㈢ 怎么在java的flink中调用python程序
1. 在java类中直接执行python语句
此方法需要引用 org.python包,需要下载Jpython。在这里先介绍一下Jpython。下面引入网络的解释:
Jython是一种完整的语言,而不是一个Java翻译器或仅仅是一个Python编译器,它是一个Python语言在Java中的完全实现。Jython也有很多从CPython中继承的模块库。最有趣的事情是Jython不像CPython或其他任何高级语言,它提供了对其实现语言的一切存取。所以Jython不仅给你提供了Python的库,同时也提供了所有的Java类。这使其有一个巨大的资源库。
这里我建议下载最新版本的Jpython,因为可以使用的python函数库会比老版本的多些,目前最新版本为2.7。
下载jar包请点击Download Jython 2.7.0 - Standalone Jar
下载安装程序请点击Download Jython 2.7.0 - Installer
如果使用maven依赖添加的话,使用下面的语句
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.0</version>
</dependency>
以上准备好了,就可以直接在java类中写python语句了,具体代码如下:
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec("a=[5,2,3,9,4,0]; ");
interpreter.exec("print(sorted(a));"); //此处python语句是3.x版本的语法
interpreter.exec("print sorted(a);"); //此处是python语句是2.x版本的语法
输出结果如下:这里会看到输出的结果都是一样的,也就是说Jpython兼容python2.x和3.x版本的语句,运行速度会比直接运行python程序稍慢一点。
但是每次运行结果都会提示console: Failed to install ”: java.nio.charset.UnsupportedCharsetException: cp0. 这样看起来很烦,因为每次运行结果都会出现红色的提示语句,以为是错误,程序员应该都不愿意看到这一幕,得想个办法解决。
解决方法如下:
在要执行的代码上右键, Run As>Run Configurations,选择第二个页签Arguments,在VM arguments中添加以下语句
-Dpython.console.encoding=UTF-8
然后Apply->Run就可以了。