大数据ClickHouse(十九):Flink 写入 ClickHouse API

2023-01-11 0 991

Flink 载入 ClickHouse API

大数据ClickHouse(十九):Flink 写入 ClickHouse API

能透过Flink原生植物jdbc Connector包将Flink结论载入ClickHouse中,Flink在1.11.0版对其JDBC Connnector展开了解构:

解构以后(1.10.x 及以后版),包名叫 flink-jdbc 。解构后(1.11.x 及后版),包名叫 flink-connector-jdbc 。

两者对 Flink 中以相同形式载入 ClickHouse Sink 的全力支持情形如下表所示:

API中文名称

flink-jdbc

flink-connector-jdbc

DataStream

不全力支持

全力支持

Table API

全力支持

不全力支持

一、Flink 1.10.x以后版采用flink-jdbc,只全力支持Table API

实例

1、maven中须要引入下列包

org.apache.flinkflink-table-planner-blink_2.111.9.1org.apache.flinkflink-table-api-scala-bridge_2.111.9.1org.apache.flinkflink-table-common1.9.1org.apache.flinkflink-jdbc_2.111.9.1ru.yandex.clickhouseclickhouse-jdbc0.2.4

2、代码

* 透过 flink-jdbc API 将 Flink 数据结论载入到ClickHouse中,只全力支持Table API* 注意:* 1.由于 ClickHouse 单次插入的延迟比较高,我们须要设置 BatchSize 来批量插入数据,提高性能。* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。case class PersonInfo(id:Int,name:String,age:Int)object FlinkWriteToClickHouse1 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度为1,后期每个并行度满批次须要的条数时,会插入click中env.setParallelism(1)val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)//引入隐式转换import org.apache.flink.streaming.api.scala._//读取Socket中的数据val sourceDS: DataStream[String] = env.socketTextStream(“node5”,9999)val ds: DataStream[PersonInfo] = sourceDS.map(line => {val arr: Array[String] = line.split(“,”)PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)//将 ds 转换成 table 对象import org.apache.flink.table.api.scala._val table: Table = tableEnv.fromDataStream(ds,id,name,age)//将table 对象载入ClickHouse中//须要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;val insertIntoCksql = “insert into flink_result (id,name,age) values (?,?,?)”//准备ClickHouse table sinkval sink: JDBCAppendTableSink = JDBCAppendTableSink.builder().setDrivername(“ru.yandex.clickhouse.ClickHouseDriver”).setDBUrl(“jdbc:clickhouse://node1:8123/default”).setUsername(“default”).setPassword(“”).setQuery(insertIntoCkSql).setBatchSize(2) //设置批次量,默认5000条.setParameterTypes(Types.INT, Types.STRING, Types.INT).build()//注册ClickHouse table Sink,设置sink 数据的字段及Schema信息tableEnv.registerTableSink(“ck-sink”,sink.configure(Array(“id”, “name”, “age”),Array(Types.INT, Types.STRING, Types.INT)))//将数据插入到 ClickHouse Sink 中tableEnv.insertInto(table,”ck-sink”)//触发以上执行env.execute(“Flink Table API to ClickHouse Example”)

二、​​​​​​Flink 1.11.x后版采用flink-connector-jdbc,只全力支持DataStream API

实例

1、在Maven中引入下列依赖包

org.apache.flinkflink-clients_2.111.11.3org.apache.flinkflink-table-planner-blink_2.111.11.3org.apache.flinkflink-table-api-scala-bridge_2.111.11.3org.apache.flinkflink-table-common1.11.3org.apache.flinkflink-connector-jdbc_2.111.11.3ru.yandex.clickhouseclickhouse-jdbc0.2.4

2、代码

* Flink 透过 flink-connector-jdbc 将数据载入ClickHouse ,目前只全力支持DataStream APIobject FlinkWriteToClickHouse2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度为1env.setParallelism(1)import org.apache.flink.streaming.api.scala._val ds: DataStream[String] = env.socketTextStream(“node5”,9999)val result: DataStream[(Int, String, Int)] = ds.map(line => {val arr: Array[String] = line.split(“,”)(arr(0).toInt, arr(1), arr(2).toInt)//准备向ClickHouse中插入数据的sqlval insetIntoCkSql = “insert into flink_result (id,name,age) values (?,?,?)”//设置ClickHouse Sinkval ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(//插入数据SQLinsetIntoCkSql,//设置插入ClickHouse数据的参数new JdbcStatementBuilder[(Int, String, Int)] {override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {ps.setInt(1, tp._1)ps.setString(2, tp._2)ps.setInt(3, tp._3)},//设置批次插入数据new JdbcExecutionOptions.Builder().withBatchSize(5).build(),//设置连接ClickHouse的配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(“ru.yandex.clickhouse.ClickHouseDriver”).withUrl(“jdbc:clickhouse://node1:8123/default”).withUsername(“default”).withUsername(“”).build()//针对数据加入sinkresult.addSink(ckSink)env.execute(“Flink DataStream to ClickHouse Example”)

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务