博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink高性能写入关系型数据库Oracle或者MySql
阅读量:3960 次
发布时间:2019-05-24

本文共 4903 字,大约阅读时间需要 16 分钟。

相信从事大数据开发的人员,越来越多的人从事实时计算方向,flink技术就显示十分重要,说该技术重要,不仅仅是因为它的流式计算,更多的是和其他技术的整合比较强大,在开发过程中,除了写入消息中间件等场景,有的时候也需要写入传统的数据库,如Oracle或者MySql

我们习惯于连接关系型数据库的时候采用一些连接池如c3p0,在传统的业务开发或者数据量不是很大的时候,是没有问题的,但是在大数据量的情况,这种方式写入速率是远远不够的。说到这里,博主相信很多人会说增大并行度,通过牺牲资源来提升效率,这种方式虽然可以实现效率的提升,但是会过多的消耗flink的slot,而且,连接池的大小也不是很好掌控,连接过多,数据库的连接压力太大,还要注意的是,连接池关闭情况是否良好,当flink任务重启或者集群重启的时候,连接池的连接是否释放,这些问题博主在开发的过程中都遇到过,所以在此奉上博主的解决办法,本人采用的是优化sql多线程的方式

本人使用Oracle场景:根据当前数据查询Oracle,如果Oracle有当前数据,对当前数据更新,如果没有当前数据,把当前数据插入Oracle。

sql优化

我相信,大多数非专门从事ETL的人都会先select,再进行insert或者update,这样是没有问题的,但是数据的压力会很大,本人再用merger into的方式,将压力从数据库转移到sql本身的计算上,在后面博主会贴上本人的s’q’l。

采用多线程代替连接池

连接池可以避免频繁的创建连接,但是f’lin’k的open方法同样可以实现该功能,因为该方法只加载一次

话不多说,上代码

import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple5;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.SQLException;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ***OracleSinkMutlThread extends RichSinkFunction
> { private static final Logger LOGGER = LoggerFactory.getLogger(***OracleSinkMultThread.class); private List
> connectionList = new ArrayList<>(); private int index = 0; private ExecutorService executorService; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("oracle.jdbc.driver.OracleDriver"); for (int i = 0; i < 10; i++) { Connection connection = DriverManager.getConnection("***", "***", "***"); PreparedStatement statement = connection.prepareStatement("merge 表名 a using (select ? DAY_TIME,? HOUR_TIME,? PROV_NAME,? INTERFACE_NAME,? TOTAL_COUNT,? FAIL_COUNT,? TOTAL_TIME,? FAIL_TIME from dual) b on(a.DAY_TIME = b.DAY_TIME and a.HOUR_TIME=b.HOUR_TIME and a.PROV_NAME=b.PROV_NAME and a.INTERFACE_NAME=b.INTERFACE_NAME) when matched then update set a.TOTAL_COUNT=a.TOTAL_COUNT+b.TOTAL_COUNT, a.FAIL_COUNT=a.FAIL_COUNT+b.FAIL_COUNT,a.TOTAL_TIME=a.TOTAL_TIME+b.TOTAL_TIME,a.FAIL_TIME=a.FAIL_TIME+b.FAIL_TIME where DAY_TIME=? and HOUR_TIME=? and PROV_NAME=? and INTERFACE_NAME=? when not matched then insert values (b.DAY_TIME,b.HOUR_TIME,b.PROV_NAME,b.INTERFACE_NAME,b.TOTAL_COUNT,b.FAIL_COUNT,b.TOTAL_TIME,b.FAIL_TIME,?)"); connectionList.add(new Tuple2(connection, statement)); } executorService = Executors.newFixedThreadPool(10); } @Override public void invoke(Tuple5
value, Context context) throws Exception { String[] split = value.f0.split("_"); String[] time = split[2].split("\t"); String day_time = time[1]; String hour_time = time[2]; String provinceName = split[3]; String INTERFACE_NAME = split[4]; PreparedStatement statement; try { statement = connectionList.get(index).f1; statement.setString(1,day_time); statement.setString(2,hour_time); statement.setString(3,provinceName); statement.setString(4,INTERFACE_NAME); statement.setString(5,value.f1+""); statement.setString(6,value.f2+""); statement.setString(7,value.f3+""); statement.setString(8,value.f4+""); statement.setString(9,day_time); statement.setString(10,hour_time); statement.setString(11,provinceName); statement.setString(12,INTERFACE_NAME); long current_time = new Date().getTime(); java.sql.Date dateSql = new java.sql.Date(current_time); statement.setDate(13,dateSql); executorService.execute(new Runnable() { @Override public void run() { try { statement.execute(); } catch (SQLException e) { e.printStackTrace(); } } }); index += 1; if (index == 10) { index = 0; } }catch (SQLException e){ e.printStackTrace(); LOGGER.error(String.format("%s -> *** !", "***")); } } @Override public void close() throws Exception { super.close(); if (executorService != null) { executorService.shutdown(); } for (Tuple2
tuple2 : connectionList) { if (tuple2.f0 != null) { tuple2.f0.close(); } if (tuple2.f1 != null) { tuple2.f1.close(); } } }}

该方法实现了richSinkFunction,再程序刚执行的时候,此种实现方式初始化很快,因为传统的连接池会花费大量时间进行连接池的初始化,可以通过控制循环的大小控制程序的并行度,减少集群slot的消耗,同时,再任务关闭的时候,连接释放的也没有问题。

转载地址:http://rnmzi.baihongyu.com/

你可能感兴趣的文章
cookie、session、sessionid 与jsessionid[转]
查看>>
常见Oracle HINT的用法
查看>>
JAVA中各类CACHE机制实现的比较 [转]
查看>>
PL/SQL Developer技巧
查看>>
3-python之PyCharm如何新建项目
查看>>
15-python之while循环嵌套应用场景
查看>>
17-python之for循环
查看>>
18-python之while循环,for循环与else的配合
查看>>
19-python之字符串简单介绍
查看>>
20-python之切片详细介绍
查看>>
P24-c++类继承-01详细的例子演示继承的好处
查看>>
P8-c++对象和类-01默认构造函数详解
查看>>
P1-c++函数详解-01函数的默认参数
查看>>
P3-c++函数详解-03函数模板详细介绍
查看>>
P4-c++函数详解-04函数重载,函数模板和函数模板重载,编译器选择使用哪个函数版本?
查看>>
P5-c++内存模型和名称空间-01头文件相关
查看>>
P6-c++内存模型和名称空间-02存储连续性、作用域和链接性
查看>>
P9-c++对象和类-02构造函数和析构函数总结
查看>>
P10-c++对象和类-03this指针详细介绍,详细的例子演示
查看>>
bat备份数据库
查看>>