Flink 载入 ClickHouse API
能透过Flink原生植物jdbc Connector包将Flink结论载入ClickHouse中,Flink在1.11.0版对其JDBC Connnector展开了解构:
解构以后(1.10.x 及以后版),包名叫 flink-jdbc 。解构后(1.11.x 及后版),包名叫 flink-connector-jdbc 。两者对 Flink 中以相同形式载入 ClickHouse Sink 的全力支持情形如下表所示:
API中文名称
flink-jdbc
flink-connector-jdbc
DataStream
不全力支持
全力支持
Table API
全力支持
不全力支持
一、Flink 1.10.x以后版采用flink-jdbc,只全力支持Table API
实例1、maven中须要引入下列包
org.apache.flinkflink-table-planner-blink_2.111.9.1org.apache.flinkflink-table-api-scala-bridge_2.111.9.1org.apache.flinkflink-table-common1.9.1org.apache.flinkflink-jdbc_2.111.9.1ru.yandex.clickhouseclickhouse-jdbc0.2.4
2、代码
* 透过 flink-jdbc API 将 Flink 数据结论载入到ClickHouse中,只全力支持Table API* 注意:* 1.由于 ClickHouse 单次插入的延迟比较高,我们须要设置 BatchSize 来批量插入数据,提高性能。* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。case class PersonInfo(id:Int,name:String,age:Int)object FlinkWriteToClickHouse1 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度为1,后期每个并行度满批次须要的条数时,会插入click中env.setParallelism(1)val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)//引入隐式转换import org.apache.flink.streaming.api.scala._//读取Socket中的数据val sourceDS: DataStream[String] = env.socketTextStream(“node5”,9999)val ds: DataStream[PersonInfo] = sourceDS.map(line => {val arr: Array[String] = line.split(“,”)PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)//将 ds 转换成 table 对象import org.apache.flink.table.api.scala._val table: Table = tableEnv.fromDataStream(ds,id,name,age)//将table 对象载入ClickHouse中//须要在ClickHouse中创建表:create table flink_result(id Int,name String,age Int) engine = MergeTree() order by id;val insertIntoCksql = “insert into flink_result (id,name,age) values (?,?,?)”//准备ClickHouse table sinkval sink: JDBCAppendTableSink = JDBCAppendTableSink.builder().setDrivername(“ru.yandex.clickhouse.ClickHouseDriver”).setDBUrl(“jdbc:clickhouse://node1:8123/default”).setUsername(“default”).setPassword(“”).setQuery(insertIntoCkSql).setBatchSize(2) //设置批次量,默认5000条.setParameterTypes(Types.INT, Types.STRING, Types.INT).build()//注册ClickHouse table Sink,设置sink 数据的字段及Schema信息tableEnv.registerTableSink(“ck-sink”,sink.configure(Array(“id”, “name”, “age”),Array(Types.INT, Types.STRING, Types.INT)))//将数据插入到 ClickHouse Sink 中tableEnv.insertInto(table,”ck-sink”)//触发以上执行env.execute(“Flink Table API to ClickHouse Example”)
二、Flink 1.11.x后版采用flink-connector-jdbc,只全力支持DataStream API
实例1、在Maven中引入下列依赖包
org.apache.flinkflink-clients_2.111.11.3org.apache.flinkflink-table-planner-blink_2.111.11.3org.apache.flinkflink-table-api-scala-bridge_2.111.11.3org.apache.flinkflink-table-common1.11.3org.apache.flinkflink-connector-jdbc_2.111.11.3ru.yandex.clickhouseclickhouse-jdbc0.2.4
2、代码
* Flink 透过 flink-connector-jdbc 将数据载入ClickHouse ,目前只全力支持DataStream APIobject FlinkWriteToClickHouse2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度为1env.setParallelism(1)import org.apache.flink.streaming.api.scala._val ds: DataStream[String] = env.socketTextStream(“node5”,9999)val result: DataStream[(Int, String, Int)] = ds.map(line => {val arr: Array[String] = line.split(“,”)(arr(0).toInt, arr(1), arr(2).toInt)//准备向ClickHouse中插入数据的sqlval insetIntoCkSql = “insert into flink_result (id,name,age) values (?,?,?)”//设置ClickHouse Sinkval ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(//插入数据SQLinsetIntoCkSql,//设置插入ClickHouse数据的参数new JdbcStatementBuilder[(Int, String, Int)] {override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {ps.setInt(1, tp._1)ps.setString(2, tp._2)ps.setInt(3, tp._3)},//设置批次插入数据new JdbcExecutionOptions.Builder().withBatchSize(5).build(),//设置连接ClickHouse的配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(“ru.yandex.clickhouse.ClickHouseDriver”).withUrl(“jdbc:clickhouse://node1:8123/default”).withUsername(“default”).withUsername(“”).build()//针对数据加入sinkresult.addSink(ckSink)env.execute(“Flink DataStream to ClickHouse Example”)