导航:首页 > 编程语言 > flink的python版本

flink的python版本

发布时间:2024-12-14 10:00:07

㈠ 官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效

Flink 1.13版本的发布标志着其在流处理应用上的重大进展,为用户提供了更简单、更自然的使用体验,同时也带来了丰富的性能优化与新特性。

在被动扩缩容方面,Flink 1.13实现了让用户仅通过调整并发度即可轻松进行流作业的扩缩容,这一功能极大地简化了流处理应用的资源管理过程,使其与普通应用的使用体验一致。

针对性能分析,Flink 1.13引入了一系列工具,如负载和反压可视化、CPU火焰图和State访问性能指标,帮助用户更直观地理解流作业的性能瓶颈,优化应用效率。

为了提升应用性能,Flink 1.13增强了瓶颈检测与反压监控机制,引入了描述作业繁忙度与反压程度的指标,并优化了反压检测逻辑,使得用户能够更精确地定位性能问题。

此外,Flink 1.13还提供了火焰图这一有效的可视化工具,帮助用户识别瓶颈算子中的高耗能计算逻辑,进一步提升应用性能。

在State访问延迟指标方面,Flink 1.13通过优化State Backend的性能指标,帮助用户识别State访问性能瓶颈,特别是当状态超过内存容量时,指导用户合理配置内存和I/O资源。

为了提供更灵活的部署方式,Flink 1.13支持用户自定义Pod模板进行Kubernetes部署,以及通过Savepoint切换State Backend,使得用户可以根据需求调整状态后端,以优化性能。

在SQL和Table API方面,Flink 1.13增强了SQL Client功能,简化了初始化脚本和语句集,支持更多的配置项,提高了SQL Client与SQL脚本的兼容性和互操作性。

此外,Flink 1.13还支持使用Hive SQL语法,优化了SQL时间函数,引入了用户自定义窗口和基于行的操作,增强了python DataStream和Table API的功能,以及提供了支持Batch执行模式的DataStream API。

在优化与增强功能方面,Flink 1.13改进了文档系统,支持展示历史异常,优化了失败Checkpoint的异常报告,提供了‘恰好一次’一致性支持的JDBC Sink,以及在Group窗口上支持用户自定义的聚合函数等。

为了帮助用户顺利升级到Flink 1.13,本文提供了关键改动的注意事项,包括理解被动扩缩容机制、性能分析工具的使用、SQL Client功能的优化等内容。

㈡ pyflink消费kafka-connect-jdbc消息(带schema)

1、数据接入

通过kafka的restFul接口创建连接mysql的连接器并启动。迹山

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

    }

}

2.kafka-connect创建主题中的猛棚默认数据格式为

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消费带schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema为字符串, 用 ROW 函数封装 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行数据'

)comment '从kafkaConnect获取带模式的数据'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.执行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.执行结果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,执行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

㈢ 怎么在java的flink中调用python程序

1. 在java类中直接执行python语句
此方法需要引用 org.python包,需要下载Jpython。在这里先介绍一下Jpython。下面引入网络的解释:

Jython是一种完整的语言,而不是一个Java翻译器或仅仅是一个Python编译器,它是一个Python语言在Java中的完全实现。Jython也有很多从CPython中继承的模块库。最有趣的事情是Jython不像CPython或其他任何高级语言,它提供了对其实现语言的一切存取。所以Jython不仅给你提供了Python的库,同时也提供了所有的Java类。这使其有一个巨大的资源库。
这里我建议下载最新版本的Jpython,因为可以使用的python函数库会比老版本的多些,目前最新版本为2.7。
下载jar包请点击Download Jython 2.7.0 - Standalone Jar
下载安装程序请点击Download Jython 2.7.0 - Installer
如果使用maven依赖添加的话,使用下面的语句
<dependency>
<groupId>org.python</groupId>
<artifactId>jython-standalone</artifactId>
<version>2.7.0</version>
</dependency>
以上准备好了,就可以直接在java类中写python语句了,具体代码如下:
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec("a=[5,2,3,9,4,0]; ");
interpreter.exec("print(sorted(a));"); //此处python语句是3.x版本的语法
interpreter.exec("print sorted(a);"); //此处是python语句是2.x版本的语法
输出结果如下:这里会看到输出的结果都是一样的,也就是说Jpython兼容python2.x和3.x版本的语句,运行速度会比直接运行python程序稍慢一点。
但是每次运行结果都会提示console: Failed to install ”: java.nio.charset.UnsupportedCharsetException: cp0. 这样看起来很烦,因为每次运行结果都会出现红色的提示语句,以为是错误,程序员应该都不愿意看到这一幕,得想个办法解决。
解决方法如下:
在要执行的代码上右键, Run As>Run Configurations,选择第二个页签Arguments,在VM arguments中添加以下语句
-Dpython.console.encoding=UTF-8
然后Apply->Run就可以了。

阅读全文

与flink的python版本相关的资料

热点内容
如何在app上看到自己的车贷还款 浏览:220
大金压缩机技术 浏览:581
冷库压缩机视频吧 浏览:796
linux光标移动命令 浏览:914
抖音里解压是什么意思 浏览:364
sogou输入法forlinux 浏览:765
phplinuxfopen 浏览:389
华为手机sns文件夹 浏览:423
如何建立教育app视频课 浏览:977
python中的a是什么意思 浏览:626
阿里程序员被判不合格 浏览:841
纵向加密装置随机数 浏览:456
苹果系统请授权文件夹 浏览:565
解压缩全能王浏览历史在哪里 浏览:127
游戏显示服务器异常是什么情况 浏览:661
银行app如何查询余额 浏览:915
使用sha256对文件加密 浏览:37
u盘加密软件怎么防止员工使用 浏览:672
编译cpu云主机 浏览:894
android去掉edittext边框 浏览:640