Clickhouse: A股分时线bar数据python实验-批量导入[上]

批量导入有两种方式,一种是本地的csv文件集合,一个个读进来,这个比较象生产环境下;另一种是模拟环境的,只需要用随机数生成相应的模拟数据。




from operator import truediv
import sys
#from xmlrpc.client import Boolean
dir_path = "/home/click/pyclick/minute"
sys.path.append("/home/click/pyclick")
sys.path.append("/usr/local/lib/python3.8/site-packages")
from clickhouse_driver import Client
import pandas as pd
import os
import datetime
import time
import math
import random

def get_all_files_by_root_sub_dirs(directory, file_type):
   data = list()
   if os.path.isdir(directory):  # 是目录
      dir_list = os.walk(directory)  # os.listdir(directory)
      for (root, sub, files) in dir_list:
          for file in files:
             path = os.path.join(root, file)  # root +'\\'+file
             if path.endswith(file_type):
                data.append(path)
   else:  # 不是目录,直接输出
      if directory.endswith(file_type):
          data.append(directory)
   return data
def get_code_from_csv_file(file):
    # .csv; .h5 file
    # D:\join_quant_data\futures\minute\A.XDCE\A1909.XDCE_2019-07-25_2019-08-12.CSV
    # D:\join_quant_data\futures\minute\A.XDCE.h5
    s = os.path.basename(file)  # A1909.XDCE_2019-07-25_2019-08-12.CSV
    sp = s.split('_')[0]

    if sp.endswith(".csv") or sp.endswith(".CSV"):
        code = sp[:-4]
    else:
        code = sp
    return code  # A1909.XDCE

## 创建表的SQL
## 注意,先库后表
# CREATE TABLE my_db.stock_min 
# (
#     `code` String,
#     `datetime` DateTime,
#     `open` Float32,
#     `close` Float32,
#     `low` Float32,
#     `high` Float32,
#     `volume` Float64,
#     `money` Float64,
#     `factor` Float32,
#     `high_limit` Float32,
#     `low_limit` Float32,
#     `avg` Float32,
#     `pre_close` Float32,
#     `paused` Float32,
#     `open_interest` Float64
# )
# ENGINE = MergeTree
# PARTITION BY code
# ORDER BY datetime

def get_block_insert_data_from_file(_file): # 把一个csv文件读成一个block_insert_data
    df = pd.read_csv(_file)
    code = get_code_from_csv_file(_file)
    block_insert_data = [] # 每个文件当批量insert的单元
    for row in df.itertuples():
        _row = list(row)[0:15]
        _row[0] = code                                            # code
        _row[1] = datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S')   # datetime: 
        _row[2] = float(row.open)                                 # open: ,
        _row[3] = float(row.close)                                # close: ,
        _row[4] = float(row.low)                                  # low:    
        _row[5] = float(row.high)                                 # high: ,
        _row[6] = float(row.volume)                               # volume: ,
        _row[7] = float(row.money)                                # money: ,
        _row[8] = float(row.factor)                               # factor: ,
        _row[9] = float(row.high_limit)                           # high_limit:,
        _row[10] = float(row.low_limit)                           # low_limit:,
        _row[11] = float(row.avg)                                 # avg:  ,
        _row[12] = float(row.pre_close)                           # pre_close:,
        _row[13] = float(row.paused)                              # paused: ,
        if math.isnan(row.open_interest):
            _row[14] = 0.0 
        else:
            _row[14] = float(row.open_interest)                   #open_interest:
        block_insert_data.append(_row)
    return block_insert_data

def is_trade_day(dt):
# >>> datetime.date.today()
# datetime.date(2022, 8, 5)
# >>> datetime.date.today().weekday() ->4
    if dt.weekday() not in [5,6]:
        return True
    else:
        return False

def get_someday_stock_minute_data_from_random(dt,code):
    block_insert_data =[]
    if is_trade_day(dt) ==False:
        return block_insert_data
    s_time = dt + datetime.timedelta(hours = 9) # 每天从9点开始,但中间不模仿有休市的情况
    for j in range(300):
        dtime = s_time + datetime.timedelta(minutes = j)
        _row = []
        _row.append(code)    # code
        _row.append(dtime)   # datetime: 
        _row.append(11.0+random.random())      # open: ,
        _row.append(11.0+random.random())      # close: ,
        _row.append(11.0+random.random())      # low:    
        _row.append(11.0+random.random())      # high: ,
        _row.append(110000.0+random.random())  # volume: ,
        _row.append(1100000.0+random.random()) # money: ,
        _row.append(1.0+random.random())       # factor: ,
        _row.append(11.0+random.random())      # high_limit:,
        _row.append(11.0+random.random())      # low_limit:,
        _row.append(11.0+random.random())      # avg:  ,
        _row.append(11.0+random.random())      # pre_close:,
        _row.append(1.0+random.random())       # paused: ,
        _row.append(11.0+random.random())      #open_interest:
        block_insert_data.append(_row)
    return block_insert_data

# 根据某个代码来生产相应的模拟数据
def get_code_block_insert_minute_data_from_random(code,start_date,simulate_days,dailyminutes=300):
    # code : 比如,600036.XSHE
    # start_date : 模拟股票的起止时间,格式"2010-01-01"
    # ndays: 100个交易日
    # dailyminutes: 一天的交易分钟数300条
    starttime  = datetime.datetime.strptime(start_date,'%Y-%m-%d')
    block_insert_data = [] # 每个文件当批量insert的单元
    for nday in range(simulate_days):
        dt = starttime + datetime.timedelta(days = nday)
        if is_trade_day(dt) ==False:
            continue
        s_time = dt + datetime.timedelta(hours = 9) # 每天从9点开始,但中间不模仿有休市的情况
        for j in range(dailyminutes):
            dtime = s_time + datetime.timedelta(minutes = j)
            _row = []
            _row.append(code)                                            # code
            _row.append(dtime)   # datetime: 
            _row.append(11.0+random.random())                                # open: ,
            _row.append(11.0+random.random())                                # close: ,
            _row.append(11.0+random.random())                                  # low:    
            _row.append(11.0+random.random())                                # high: ,
            _row.append(110000.0+random.random())                               # volume: ,
            _row.append(1100000.0+random.random())                               # money: ,
            _row.append(1.0+random.random())                               # factor: ,
            _row.append(11.0+random.random())                          # high_limit:,
            _row.append(11.0+random.random())                           # low_limit:,
            _row.append(11.0+random.random())                                # avg:  ,
            _row.append(11.0+random.random())                          # pre_close:,
            _row.append(1.0+random.random())                            # paused: ,
            _row.append(11.0+random.random())                   #open_interest:
            block_insert_data.append(_row)
    return block_insert_data

def insert_data_by_files():
    client = Client('localhost')
    database_name = "my_db"
    table_name = 'stock_data'
    # dir_path = "/home/click/pyclick/minute"
    # sys.path.append("/home/click/pyclick")
    files = get_all_files_by_root_sub_dirs(dir_path,".csv") 
    t0 = time.time()
    file_num = 0 
    for _file in files:
        t_file = time.time()
        print(f"{_file}  => 第{file_num}个文件, 总共:{len(files)}个!")
        block_insert_data = get_block_insert_data_from_file(_file)
        # 逐条也可以insert json
        # sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSONEachRow {json.dumps(row_data) * 1}"
        # 批量insert 
        client.execute(f'INSERT INTO {database_name}.{table_name}  VALUES', block_insert_data,types_check=True)
        table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
        print(f"clickhouse stock_tb 表信息: {table_info}")
        print(f"第{file_num}个文件 总共:{len(files)}个 => {_file}读写完成! cost time:{time.time()-t_file}")
        file_num = file_num +1

    print(f"文件总共:{file_num}读写完成! cost time:{time.time()-t0}")

#insert_data_by_files()
# 对n个标的,从期初开始,模拟ndays的分钟数据
def insert_minute_data_by_random(startdate,simulate_codes_num,simulate_days):
    # startdate ="2020-01-01"
    # codenum : 1000,或其它
    print("start")
    client = Client('localhost')
    database_name = "my_db"
    table_name = 'stock_min'
    code_num = 0 
    # 假定clickhouse的写入性能可以考虑一次按100万来insert,故不进行更小层次的拆分insert
    for i in range(simulate_codes_num):
        code = str(1000000+i) ## 代码号从100000开始
        code_time = time.time()
        block_insert_data = get_code_block_insert_minute_data_from_random(code,startdate,simulate_days)
        print(f"block_insert_data 的条数: {len(block_insert_data)}")
        client.execute(f'INSERT INTO {database_name}.{table_name}  VALUES', block_insert_data,types_check=True)
        table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
        print(f"clickhouse stock_tb 表信息: {table_info}")
        print(f"第{code_num}个文件 总共:{simulate_codes_num}个 => {code}写完成! cost time:{time.time()-code_time}")
        code_num = code_num +1

def get_data_from_ch(sql_mode=3): # startdate : 2010-01-01
    client = Client('localhost')
    database_name = "my_db"
    table_name = 'stock_min'
    code = "1000001"
    if sql_mode ==1:
        query_sql = f'SELECT * FROM {database_name}.{table_name}  LIMIT 10000000'
    elif sql_mode==2:
        query_sql = f"SELECT * FROM {database_name}.{table_name} WHERE code = '{code}' LIMIT 10"
    elif sql_mode ==3:
        startdate = "2018-01-01"
        enddate = "2022-02-02"
        query_sql = f"SELECT * FROM {database_name}.{table_name} WHERE code ='{code}' AND toDate(datetime) >= toDate('{startdate}')  AND toDate(datetime) <= toDate('{enddate}') "
    else:
        pass
    print(query_sql)
    data = client.execute(query_sql)
    return data

process_mode = "fetch" # fetch or 
t0 = time.time()
if process_mode =="insert": 
    # data = get_code_block_insert_data_from_random("600036.XSHE","2020-01-01",6)
    # print(data)
    insert_minute_data_by_random("2010-01-01",10,4000) # 10个标的,4000天的分钟数据
elif process_mode =="fetch":
    data = get_data_from_ch(sql_mode=3)
    print(f"data row num: {len(data)}, data type: {type(data)} ")
    print(f"{data[0]}")
else:
    print("no process data!")
t1 = time.time()
print(f"process_data cost time : {t1-t0} s! ")

下面是模拟insert10个标的,每个标的是从2010年初开始,共4000个交易日的分钟数据情况。
注意:每次一次性insert的条数为856800条。总体上看,一次insert大约需要16-17秒的时间。

运行情况如下:
click@iZ9ni05fy7agndgpndc7gsZ:~/pyclick$ python click_test.py
start
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(1091100,)]
第0个文件 总共:10个 => 1000000写完成! cost time:15.92285680770874
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(1947900,)]
第1个文件 总共:10个 => 1000001写完成! cost time:16.453603744506836
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(2804700,)]
第2个文件 总共:10个 => 1000002写完成! cost time:15.867053985595703
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(3661500,)]
第3个文件 总共:10个 => 1000003写完成! cost time:16.036518335342407
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(4518300,)]
第4个文件 总共:10个 => 1000004写完成! cost time:16.282844305038452

期间clickhouse表数据的动态情况:[不是最后完成入库的情况]

SELECT
    table AS `表名`,
    sum(rows) AS `总行数`,
    formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
    formatReadableSize(sum(data_compressed_bytes)) AS `压缩大小`,
    round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `压缩率`
FROM system.parts
WHERE table IN ('stock_tb')
GROUP BY table

在这里插入图片描述数据量较大,文件有20万个csv,估计有199G左右,数据导入后,后面便于用于测试查询。
另外,可以看出,clickhouse的压缩率很高,这个优点是空间占用较小。

查看一下表的情况:
在这里插入图片描述折腾了一天一夜,20多万个文件终于导入clickhouse完成。
在这里插入图片描述在这里插入图片描述