導航:首頁 > 編程語言 > flink的python版本

flink的python版本

發布時間:2024-12-14 10:00:07

㈠ 官宣|Apache Flink 1.13.0 正式發布,流處理應用更加簡單高效

Flink 1.13版本的發布標志著其在流處理應用上的重大進展,為用戶提供了更簡單、更自然的使用體驗,同時也帶來了豐富的性能優化與新特性。

在被動擴縮容方面,Flink 1.13實現了讓用戶僅通過調整並發度即可輕松進行流作業的擴縮容,這一功能極大地簡化了流處理應用的資源管理過程,使其與普通應用的使用體驗一致。

針對性能分析,Flink 1.13引入了一系列工具,如負載和反壓可視化、CPU火焰圖和State訪問性能指標,幫助用戶更直觀地理解流作業的性能瓶頸,優化應用效率。

為了提升應用性能,Flink 1.13增強了瓶頸檢測與反壓監控機制,引入了描述作業繁忙度與反壓程度的指標,並優化了反壓檢測邏輯,使得用戶能夠更精確地定位性能問題。

此外,Flink 1.13還提供了火焰圖這一有效的可視化工具,幫助用戶識別瓶頸運算元中的高耗能計算邏輯,進一步提升應用性能。

在State訪問延遲指標方面,Flink 1.13通過優化State Backend的性能指標,幫助用戶識別State訪問性能瓶頸,特別是當狀態超過內存容量時,指導用戶合理配置內存和I/O資源。

為了提供更靈活的部署方式,Flink 1.13支持用戶自定義Pod模板進行Kubernetes部署,以及通過Savepoint切換State Backend,使得用戶可以根據需求調整狀態後端,以優化性能。

在SQL和Table API方面,Flink 1.13增強了SQL Client功能,簡化了初始化腳本和語句集,支持更多的配置項,提高了SQL Client與SQL腳本的兼容性和互操作性。

此外,Flink 1.13還支持使用Hive SQL語法,優化了SQL時間函數,引入了用戶自定義窗口和基於行的操作,增強了python DataStream和Table API的功能,以及提供了支持Batch執行模式的DataStream API。

在優化與增強功能方面,Flink 1.13改進了文檔系統,支持展示歷史異常,優化了失敗Checkpoint的異常報告,提供了『恰好一次』一致性支持的JDBC Sink,以及在Group窗口上支持用戶自定義的聚合函數等。

為了幫助用戶順利升級到Flink 1.13,本文提供了關鍵改動的注意事項,包括理解被動擴縮容機制、性能分析工具的使用、SQL Client功能的優化等內容。

㈡ pyflink消費kafka-connect-jdbc消息(帶schema)

1、數據接入

通過kafka的restFul介面創建連接mysql的連接器並啟動。跡山

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州則baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

    }

}

2.kafka-connect創建主題中的猛棚默認數據格式為

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消費帶schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema為字元串, 用 ROW 函數封裝 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行數據'

)comment '從kafkaConnect獲取帶模式的數據'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示讀取最早的消息 | latest-offset 表示讀取消息隊列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.執行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.執行結果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,執行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

㈢ 怎麼在java的flink中調用python程序

1. 在java類中直接執行python語句
此方法需要引用 org.python包,需要下載Jpython。在這里先介紹一下Jpython。下面引入網路的解釋:

Jython是一種完整的語言,而不是一個Java翻譯器或僅僅是一個Python編譯器,它是一個Python語言在Java中的完全實現。Jython也有很多從CPython中繼承的模塊庫。最有趣的事情是Jython不像CPython或其他任何高級語言,它提供了對其實現語言的一切存取。所以Jython不僅給你提供了Python的庫,同時也提供了所有的Java類。這使其有一個巨大的資源庫。
這里我建議下載最新版本的Jpython,因為可以使用的python函數庫會比老版本的多些,目前最新版本為2.7。
下載jar包請點擊Download Jython 2.7.0 - Standalone Jar
下載安裝程序請點擊Download Jython 2.7.0 - Installer
如果使用maven依賴添加的話,使用下面的語句
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.0</version>
</dependency>
以上准備好了,就可以直接在java類中寫python語句了,具體代碼如下:
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec("a=[5,2,3,9,4,0]; ");
interpreter.exec("print(sorted(a));"); //此處python語句是3.x版本的語法
interpreter.exec("print sorted(a);"); //此處是python語句是2.x版本的語法
輸出結果如下:這里會看到輸出的結果都是一樣的,也就是說Jpython兼容python2.x和3.x版本的語句,運行速度會比直接運行python程序稍慢一點。
但是每次運行結果都會提示console: Failed to install 」: java.nio.charset.UnsupportedCharsetException: cp0. 這樣看起來很煩,因為每次運行結果都會出現紅色的提示語句,以為是錯誤,程序員應該都不願意看到這一幕,得想個辦法解決。
解決方法如下:
在要執行的代碼上右鍵, Run As>Run Configurations,選擇第二個頁簽Arguments,在VM arguments中添加以下語句
-Dpython.console.encoding=UTF-8
然後Apply->Run就可以了。

閱讀全文

與flink的python版本相關的資料

熱點內容
linux游標移動命令 瀏覽:912
抖音里解壓是什麼意思 瀏覽:362
sogou輸入法forlinux 瀏覽:765
phplinuxfopen 瀏覽:389
華為手機sns文件夾 瀏覽:423
如何建立教育app視頻課 瀏覽:977
python中的a是什麼意思 瀏覽:626
阿里程序員被判不合格 瀏覽:841
縱向加密裝置隨機數 瀏覽:456
蘋果系統請授權文件夾 瀏覽:565
解壓縮全能王瀏覽歷史在哪裡 瀏覽:127
游戲顯示伺服器異常是什麼情況 瀏覽:661
銀行app如何查詢余額 瀏覽:915
使用sha256對文件加密 瀏覽:37
u盤加密軟體怎麼防止員工使用 瀏覽:672
編譯cpu雲主機 瀏覽:894
android去掉edittext邊框 瀏覽:640
cad的放大命令 瀏覽:433
修馬自達3壓縮機 瀏覽:117
天地偉業nvr視頻加密 瀏覽:692