大批量数据快速写入数据库
最近由于项目需要存储传感器采集的海量数据,考虑使用clickhouse进行存储和分析。
于是打算做一下性能测试。做测试之前,肯定得有数据,所以就打算造些数据。于是编写java代码模拟一天的数据(约5千万条)插入。
首先是导入驱动
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
一开始使用拼接sql的方式,每个sql拼接18000条数据,sql语句大致如下
insert into tab_test values(*,*,*),(*,*,*)
然后通过如下方法执行
Statement.execute(sql);
执行一次插入需要4到5秒种,也就是一秒钟写入大概6K的数据,感觉这个写入速度有点低,照这个样子,一天的测试数据要好几个小时呢。
后来在网上找,看到有人用prepareStatement.executeBatch()来做,于是就想着试试看,最后测试结果,写入性能稳定在10W/s左右,一天的数据500多秒就写完了。
于是网上查了下原因,是因为prepareStatement采用的预编译机制,也就是数据库只编译一次sql,后面就只发送参数就可以了。而Statement.execute每次都要发送全量的sql语句,数据也要解析语句,所以性能就慢了。这个是jdbc的特性,对所有的数据库都这样,看来还是数据库基础不扎实啊。所以大家以后如果在mysql又大批量数据需要写入的时候,尽量采用prepareStatement来执行,不过要注意的是mysql4.1之前的版本事不支持prepare Statement特性的,使用prepareStatement反而会降低性能。
最后,在写入clickhouse数据的时候,最好采用批量数据插入,官方建议一次不少于1000条,不然性能很差。
完整代码如下
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.DateUtils;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;
import java.net.SocketException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@Slf4j
public class ClickhouseJDBC {
static {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List<Logger> loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> {
logger.setLevel(Level.INFO);
});
}
private static Connection conn ;
private static Statement stmt;
private static PreparedStatement prepareStatement = null;
public static void init()throws Exception{
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String url = "jdbc:clickhouse://localhost:8123/default";
... 设置用户名和密码
conn = DriverManager.getConnection(url, user, password);
stmt = conn.createStatement();
String sql="insert into tab_bracket_data values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
conn.setAutoCommit(false);
prepareStatement = conn.prepareStatement(sql);
}
/**
* 通过凭借sql进行数据插入
* @param max
*/
public static void insert(int max){
try {
init();
String insert="insert into tab_test values ";
Date date=new Date();
String values="";
long time=System.currentTimeMillis();
for(int i=0;i<max;i++){
date=org.apache.commons.lang3.time.DateUtils.addSeconds(date,1);
String sql=insertBatch(date);
if(StringUtils.isNotBlank(values)){
values=values+",";
}
values=values+sql;
if(i%30==0){
execute(insert+values,0);
values="";
log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",i,i*100/max,(System.currentTimeMillis()-time)/1000);
}
}
if(StringUtils.isNotBlank(values)){
execute(insert+values,0);
values="";
}
log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",max,100,(System.currentTimeMillis()-time)/1000);
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 执行sql插入语句,遇到网络错误等进行重试
* @param sql
* @param i
* @throws Exception
*/
public static void execute(String sql,int i) throws Exception{
try{
stmt.execute(sql);
}catch (Exception e){
if(e instanceof ClickHouseUnknownException||e instanceof SocketException){
log.warn("网络出问题了,正在重试",e);
if(i>=3){
throw e;
}
try{
init();
}catch (Exception ex){
throw ex;
}
execute(sql,i++);
}
}
}
/**
* 拼接sql
* @param date
* @return
*/
public static String insertBatch(Date date){
Random random=new Random();
StringBuilder insert=new StringBuilder("");
for(int i=100;i<700;i++){
StringBuilder sb=new StringBuilder("(");
String uuid= UUID.randomUUID().toString();
sb.append("'"+uuid+"'");
String bracketId="ff44c960098e11ed808900ff297c6"+i;
sb.append(",'"+bracketId+"'");
sb.append(","+random.nextInt());
...上面的语句复制16次
sb.append(",'"+ DateUtils.formatDate(date,"yyyy-MM-dd HH:mm:ss"));
sb.append("')");
if(i>100){
insert.append(",");
}
insert.append(sb);
}
return insert.toString();
}
/**
* 通过prepareStatement进行插入。在本机环境上batchSize设置为80时速度最快
* @param max
* @param batchSize
*/
public static void insert2(int max,int batchSize){
try {
init();
Date date=new Date();
long time=System.currentTimeMillis();
for(int i=0;i<max;i++){
date=org.apache.commons.lang3.time.DateUtils.addSeconds(date,1);
insertBatch2(prepareStatement,date);
if(i%batchSize==0){
execute2(0);
log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",i,i*100/max,(System.currentTimeMillis()-time)/1000);
}
}
execute2(0);
log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",max,100,(System.currentTimeMillis()-time)/1000);
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 设置参数
* @param prepareStatement
* @param date
* @throws Exception
*/
public static void insertBatch2(PreparedStatement prepareStatement,Date date)throws Exception{
Random random=new Random();
for(int i=100;i<700;i++){
StringBuilder sb=new StringBuilder("(");
String uuid= UUID.randomUUID().toString();
prepareStatement.setObject(1,uuid);
String bracketId="ff44c960098e11ed808900ff297c6"+i;
prepareStatement.setObject(2,bracketId);
prepareStatement.setObject(3,random.nextInt());
...上面的语句复制16次,注意修改标号
prepareStatement.setObject(20,DateUtils.formatDate(date,"yyyy-MM-dd HH:mm:ss"));
prepareStatement.addBatch();
}
}
/**
* 执行插入,由于clickhouse没有事务,提交就等于插入
* @param i
* @throws Exception
*/
public static void execute2(int i) throws Exception{
try{
prepareStatement.executeBatch();
conn.commit();
}catch (Exception e){
log.warn("执行出问题了,正在重试",e);
if(i>=3){
throw e;
}
try{
init();
}catch (Exception ex){
throw ex;
}
execute2(i++);
}
}
public static void main(String[] args) {
Random random=new Random();
for(int i=0;i<300;i++){
System.out.println(random.nextInt());
}
}
}