㈠ 官宣|Apache Flink 1.13.0 正式發布,流處理應用更加簡單高效
Flink 1.13版本的發布標志著其在流處理應用上的重大進展,為用戶提供了更簡單、更自然的使用體驗,同時也帶來了豐富的性能優化與新特性。
在被動擴縮容方面,Flink 1.13實現了讓用戶僅通過調整並發度即可輕松進行流作業的擴縮容,這一功能極大地簡化了流處理應用的資源管理過程,使其與普通應用的使用體驗一致。
針對性能分析,Flink 1.13引入了一系列工具,如負載和反壓可視化、CPU火焰圖和State訪問性能指標,幫助用戶更直觀地理解流作業的性能瓶頸,優化應用效率。
為了提升應用性能,Flink 1.13增強了瓶頸檢測與反壓監控機制,引入了描述作業繁忙度與反壓程度的指標,並優化了反壓檢測邏輯,使得用戶能夠更精確地定位性能問題。
此外,Flink 1.13還提供了火焰圖這一有效的可視化工具,幫助用戶識別瓶頸運算元中的高耗能計算邏輯,進一步提升應用性能。
在State訪問延遲指標方面,Flink 1.13通過優化State Backend的性能指標,幫助用戶識別State訪問性能瓶頸,特別是當狀態超過內存容量時,指導用戶合理配置內存和I/O資源。
為了提供更靈活的部署方式,Flink 1.13支持用戶自定義Pod模板進行Kubernetes部署,以及通過Savepoint切換State Backend,使得用戶可以根據需求調整狀態後端,以優化性能。
在SQL和Table API方面,Flink 1.13增強了SQL Client功能,簡化了初始化腳本和語句集,支持更多的配置項,提高了SQL Client與SQL腳本的兼容性和互操作性。
此外,Flink 1.13還支持使用Hive SQL語法,優化了SQL時間函數,引入了用戶自定義窗口和基於行的操作,增強了python DataStream和Table API的功能,以及提供了支持Batch執行模式的DataStream API。
在優化與增強功能方面,Flink 1.13改進了文檔系統,支持展示歷史異常,優化了失敗Checkpoint的異常報告,提供了『恰好一次』一致性支持的JDBC Sink,以及在Group窗口上支持用戶自定義的聚合函數等。
為了幫助用戶順利升級到Flink 1.13,本文提供了關鍵改動的注意事項,包括理解被動擴縮容機制、性能分析工具的使用、SQL Client功能的優化等內容。
㈡ pyflink消費kafka-connect-jdbc消息(帶schema)
1、數據接入
通過kafka的restFul介面創建連接mysql的連接器並啟動。跡山
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州則baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
}
}
2.kafka-connect創建主題中的猛棚默認數據格式為
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}
3.使用pyflink消費帶schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema為字元串, 用 ROW 函數封裝 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行數據'
)comment '從kafkaConnect獲取帶模式的數據'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示讀取最早的消息 | latest-offset 表示讀取消息隊列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.執行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.執行結果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,執行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
㈢ 怎麼在java的flink中調用python程序
1. 在java類中直接執行python語句
此方法需要引用 org.python包,需要下載Jpython。在這里先介紹一下Jpython。下面引入網路的解釋:
Jython是一種完整的語言,而不是一個Java翻譯器或僅僅是一個Python編譯器,它是一個Python語言在Java中的完全實現。Jython也有很多從CPython中繼承的模塊庫。最有趣的事情是Jython不像CPython或其他任何高級語言,它提供了對其實現語言的一切存取。所以Jython不僅給你提供了Python的庫,同時也提供了所有的Java類。這使其有一個巨大的資源庫。
這里我建議下載最新版本的Jpython,因為可以使用的python函數庫會比老版本的多些,目前最新版本為2.7。
下載jar包請點擊Download Jython 2.7.0 - Standalone Jar
下載安裝程序請點擊Download Jython 2.7.0 - Installer
如果使用maven依賴添加的話,使用下面的語句
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.0</version>
</dependency>
以上准備好了,就可以直接在java類中寫python語句了,具體代碼如下:
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec("a=[5,2,3,9,4,0]; ");
interpreter.exec("print(sorted(a));"); //此處python語句是3.x版本的語法
interpreter.exec("print sorted(a);"); //此處是python語句是2.x版本的語法
輸出結果如下:這里會看到輸出的結果都是一樣的,也就是說Jpython兼容python2.x和3.x版本的語句,運行速度會比直接運行python程序稍慢一點。
但是每次運行結果都會提示console: Failed to install 」: java.nio.charset.UnsupportedCharsetException: cp0. 這樣看起來很煩,因為每次運行結果都會出現紅色的提示語句,以為是錯誤,程序員應該都不願意看到這一幕,得想個辦法解決。
解決方法如下:
在要執行的代碼上右鍵, Run As>Run Configurations,選擇第二個頁簽Arguments,在VM arguments中添加以下語句
-Dpython.console.encoding=UTF-8
然後Apply->Run就可以了。