大批量数据快速写入数据库

最近由于项目需要存储传感器采集的海量数据,考虑使用clickhouse进行存储和分析。

于是打算做一下性能测试。做测试之前,肯定得有数据,所以就打算造些数据。于是编写java代码模拟一天的数据(约5千万条)插入。

首先是导入驱动

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.1</version>
        </dependency>

一开始使用拼接sql的方式,每个sql拼接18000条数据,sql语句大致如下

insert into tab_test values(*,*,*),(*,*,*)

然后通过如下方法执行

Statement.execute(sql);

执行一次插入需要4到5秒种,也就是一秒钟写入大概6K的数据,感觉这个写入速度有点低,照这个样子,一天的测试数据要好几个小时呢。

后来在网上找,看到有人用prepareStatement.executeBatch()来做,于是就想着试试看,最后测试结果,写入性能稳定在10W/s左右,一天的数据500多秒就写完了。

于是网上查了下原因,是因为prepareStatement采用的预编译机制,也就是数据库只编译一次sql,后面就只发送参数就可以了。而Statement.execute每次都要发送全量的sql语句,数据也要解析语句,所以性能就慢了。这个是jdbc的特性,对所有的数据库都这样,看来还是数据库基础不扎实啊。所以大家以后如果在mysql又大批量数据需要写入的时候,尽量采用prepareStatement来执行,不过要注意的是mysql4.1之前的版本事不支持prepare Statement特性的,使用prepareStatement反而会降低性能。

最后,在写入clickhouse数据的时候,最好采用批量数据插入,官方建议一次不少于1000条,不然性能很差。

完整代码如下

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.DateUtils;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.except.ClickHouseUnknownException;
import java.net.SocketException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;

@Slf4j
public class ClickhouseJDBC {
    static {
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List<Logger> loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> {
            logger.setLevel(Level.INFO);
        });
    }
    private static Connection conn ;
    private static Statement stmt;
    private static PreparedStatement prepareStatement = null;

    public static void init()throws Exception{
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String url = "jdbc:clickhouse://localhost:8123/default";
        ... 设置用户名和密码
        conn = DriverManager.getConnection(url, user, password);
        stmt = conn.createStatement();
        String sql="insert into tab_bracket_data values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
        conn.setAutoCommit(false);
        prepareStatement = conn.prepareStatement(sql);
    }

    /**
     * 通过凭借sql进行数据插入
     * @param max
     */
    public static void insert(int max){
        try {
            init();
            String insert="insert into tab_test values ";
            Date date=new Date();
            String values="";
            long time=System.currentTimeMillis();
            for(int i=0;i<max;i++){
                date=org.apache.commons.lang3.time.DateUtils.addSeconds(date,1);
                String sql=insertBatch(date);
                if(StringUtils.isNotBlank(values)){
                    values=values+",";
                }
                values=values+sql;
                if(i%30==0){
                    execute(insert+values,0);
                    values="";
                    log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",i,i*100/max,(System.currentTimeMillis()-time)/1000);
                }
            }
            if(StringUtils.isNotBlank(values)){
                execute(insert+values,0);
                values="";
            }
            log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",max,100,(System.currentTimeMillis()-time)/1000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 执行sql插入语句,遇到网络错误等进行重试
     * @param sql
     * @param i
     * @throws Exception
     */
    public static void execute(String sql,int i) throws Exception{
        try{
            stmt.execute(sql);
        }catch (Exception e){
            if(e instanceof ClickHouseUnknownException||e instanceof SocketException){
                log.warn("网络出问题了,正在重试",e);
                if(i>=3){
                    throw e;
                }
                try{
                    init();
                }catch (Exception ex){
                    throw ex;
                }
                execute(sql,i++);
            }
        }
    }

    /**
     * 拼接sql
     * @param date
     * @return
     */
    public static String insertBatch(Date date){
        Random random=new Random();
        StringBuilder insert=new StringBuilder("");
        for(int i=100;i<700;i++){
            StringBuilder sb=new StringBuilder("(");
            String uuid= UUID.randomUUID().toString();
            sb.append("'"+uuid+"'");
            String bracketId="ff44c960098e11ed808900ff297c6"+i;
            sb.append(",'"+bracketId+"'");
            sb.append(","+random.nextInt());
            ...上面的语句复制16次
            sb.append(",'"+ DateUtils.formatDate(date,"yyyy-MM-dd HH:mm:ss"));
            sb.append("')");
            if(i>100){
                insert.append(",");
            }
            insert.append(sb);
        }
        return insert.toString();
    }


    /**
     * 通过prepareStatement进行插入。在本机环境上batchSize设置为80时速度最快
     * @param max
     * @param batchSize
     */
    public static void insert2(int max,int batchSize){
        try {
            init();
            Date date=new Date();
            long time=System.currentTimeMillis();
            for(int i=0;i<max;i++){
                date=org.apache.commons.lang3.time.DateUtils.addSeconds(date,1);
                insertBatch2(prepareStatement,date);
                if(i%batchSize==0){
                    execute2(0);
                    log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",i,i*100/max,(System.currentTimeMillis()-time)/1000);
                }
            }
            execute2(0);
            log.info("已完成秒数{},已完成百分比:{}%,耗时:{}s",max,100,(System.currentTimeMillis()-time)/1000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 设置参数
     * @param prepareStatement
     * @param date
     * @throws Exception
     */
    public static void insertBatch2(PreparedStatement prepareStatement,Date date)throws Exception{
        Random random=new Random();
        for(int i=100;i<700;i++){
            StringBuilder sb=new StringBuilder("(");
            String uuid= UUID.randomUUID().toString();
            prepareStatement.setObject(1,uuid);
            String bracketId="ff44c960098e11ed808900ff297c6"+i;
            prepareStatement.setObject(2,bracketId);
            prepareStatement.setObject(3,random.nextInt());
           ...上面的语句复制16次,注意修改标号
            prepareStatement.setObject(20,DateUtils.formatDate(date,"yyyy-MM-dd HH:mm:ss"));
            prepareStatement.addBatch();
        }
    }

    /**
     * 执行插入,由于clickhouse没有事务,提交就等于插入
     * @param i
     * @throws Exception
     */
    public static void execute2(int i) throws Exception{
        try{
            prepareStatement.executeBatch();
            conn.commit();
        }catch (Exception e){
            log.warn("执行出问题了,正在重试",e);
            if(i>=3){
                throw e;
            }
            try{
                init();
            }catch (Exception ex){
                throw ex;
            }
            execute2(i++);
        }
    }

    public static void main(String[] args) {
        Random random=new Random();
        for(int i=0;i<300;i++){
            System.out.println(random.nextInt());
        }
    }
}