Clickhouse: A股分时线bar数据python实验-批量导入[上]
批量导入有两种方式,一种是本地的csv文件集合,一个个读进来,这个比较象生产环境下;另一种是模拟环境的,只需要用随机数生成相应的模拟数据。
from operator import truediv
import sys
#from xmlrpc.client import Boolean
dir_path = "/home/click/pyclick/minute"
sys.path.append("/home/click/pyclick")
sys.path.append("/usr/local/lib/python3.8/site-packages")
from clickhouse_driver import Client
import pandas as pd
import os
import datetime
import time
import math
import random
def get_all_files_by_root_sub_dirs(directory, file_type):
data = list()
if os.path.isdir(directory): # 是目录
dir_list = os.walk(directory) # os.listdir(directory)
for (root, sub, files) in dir_list:
for file in files:
path = os.path.join(root, file) # root +'\\'+file
if path.endswith(file_type):
data.append(path)
else: # 不是目录,直接输出
if directory.endswith(file_type):
data.append(directory)
return data
def get_code_from_csv_file(file):
# .csv; .h5 file
# D:\join_quant_data\futures\minute\A.XDCE\A1909.XDCE_2019-07-25_2019-08-12.CSV
# D:\join_quant_data\futures\minute\A.XDCE.h5
s = os.path.basename(file) # A1909.XDCE_2019-07-25_2019-08-12.CSV
sp = s.split('_')[0]
if sp.endswith(".csv") or sp.endswith(".CSV"):
code = sp[:-4]
else:
code = sp
return code # A1909.XDCE
## 创建表的SQL
## 注意,先库后表
# CREATE TABLE my_db.stock_min
# (
# `code` String,
# `datetime` DateTime,
# `open` Float32,
# `close` Float32,
# `low` Float32,
# `high` Float32,
# `volume` Float64,
# `money` Float64,
# `factor` Float32,
# `high_limit` Float32,
# `low_limit` Float32,
# `avg` Float32,
# `pre_close` Float32,
# `paused` Float32,
# `open_interest` Float64
# )
# ENGINE = MergeTree
# PARTITION BY code
# ORDER BY datetime
def get_block_insert_data_from_file(_file): # 把一个csv文件读成一个block_insert_data
df = pd.read_csv(_file)
code = get_code_from_csv_file(_file)
block_insert_data = [] # 每个文件当批量insert的单元
for row in df.itertuples():
_row = list(row)[0:15]
_row[0] = code # code
_row[1] = datetime.strptime(row[1],'%Y-%m-%d %H:%M:%S') # datetime:
_row[2] = float(row.open) # open: ,
_row[3] = float(row.close) # close: ,
_row[4] = float(row.low) # low:
_row[5] = float(row.high) # high: ,
_row[6] = float(row.volume) # volume: ,
_row[7] = float(row.money) # money: ,
_row[8] = float(row.factor) # factor: ,
_row[9] = float(row.high_limit) # high_limit:,
_row[10] = float(row.low_limit) # low_limit:,
_row[11] = float(row.avg) # avg: ,
_row[12] = float(row.pre_close) # pre_close:,
_row[13] = float(row.paused) # paused: ,
if math.isnan(row.open_interest):
_row[14] = 0.0
else:
_row[14] = float(row.open_interest) #open_interest:
block_insert_data.append(_row)
return block_insert_data
def is_trade_day(dt):
# >>> datetime.date.today()
# datetime.date(2022, 8, 5)
# >>> datetime.date.today().weekday() ->4
if dt.weekday() not in [5,6]:
return True
else:
return False
def get_someday_stock_minute_data_from_random(dt,code):
block_insert_data =[]
if is_trade_day(dt) ==False:
return block_insert_data
s_time = dt + datetime.timedelta(hours = 9) # 每天从9点开始,但中间不模仿有休市的情况
for j in range(300):
dtime = s_time + datetime.timedelta(minutes = j)
_row = []
_row.append(code) # code
_row.append(dtime) # datetime:
_row.append(11.0+random.random()) # open: ,
_row.append(11.0+random.random()) # close: ,
_row.append(11.0+random.random()) # low:
_row.append(11.0+random.random()) # high: ,
_row.append(110000.0+random.random()) # volume: ,
_row.append(1100000.0+random.random()) # money: ,
_row.append(1.0+random.random()) # factor: ,
_row.append(11.0+random.random()) # high_limit:,
_row.append(11.0+random.random()) # low_limit:,
_row.append(11.0+random.random()) # avg: ,
_row.append(11.0+random.random()) # pre_close:,
_row.append(1.0+random.random()) # paused: ,
_row.append(11.0+random.random()) #open_interest:
block_insert_data.append(_row)
return block_insert_data
# 根据某个代码来生产相应的模拟数据
def get_code_block_insert_minute_data_from_random(code,start_date,simulate_days,dailyminutes=300):
# code : 比如,600036.XSHE
# start_date : 模拟股票的起止时间,格式"2010-01-01"
# ndays: 100个交易日
# dailyminutes: 一天的交易分钟数300条
starttime = datetime.datetime.strptime(start_date,'%Y-%m-%d')
block_insert_data = [] # 每个文件当批量insert的单元
for nday in range(simulate_days):
dt = starttime + datetime.timedelta(days = nday)
if is_trade_day(dt) ==False:
continue
s_time = dt + datetime.timedelta(hours = 9) # 每天从9点开始,但中间不模仿有休市的情况
for j in range(dailyminutes):
dtime = s_time + datetime.timedelta(minutes = j)
_row = []
_row.append(code) # code
_row.append(dtime) # datetime:
_row.append(11.0+random.random()) # open: ,
_row.append(11.0+random.random()) # close: ,
_row.append(11.0+random.random()) # low:
_row.append(11.0+random.random()) # high: ,
_row.append(110000.0+random.random()) # volume: ,
_row.append(1100000.0+random.random()) # money: ,
_row.append(1.0+random.random()) # factor: ,
_row.append(11.0+random.random()) # high_limit:,
_row.append(11.0+random.random()) # low_limit:,
_row.append(11.0+random.random()) # avg: ,
_row.append(11.0+random.random()) # pre_close:,
_row.append(1.0+random.random()) # paused: ,
_row.append(11.0+random.random()) #open_interest:
block_insert_data.append(_row)
return block_insert_data
def insert_data_by_files():
client = Client('localhost')
database_name = "my_db"
table_name = 'stock_data'
# dir_path = "/home/click/pyclick/minute"
# sys.path.append("/home/click/pyclick")
files = get_all_files_by_root_sub_dirs(dir_path,".csv")
t0 = time.time()
file_num = 0
for _file in files:
t_file = time.time()
print(f"{_file} => 第{file_num}个文件, 总共:{len(files)}个!")
block_insert_data = get_block_insert_data_from_file(_file)
# 逐条也可以insert json
# sql = f"INSERT INTO {database_name}.{table_name} FORMAT JSONEachRow {json.dumps(row_data) * 1}"
# 批量insert
client.execute(f'INSERT INTO {database_name}.{table_name} VALUES', block_insert_data,types_check=True)
table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
print(f"clickhouse stock_tb 表信息: {table_info}")
print(f"第{file_num}个文件 总共:{len(files)}个 => {_file}读写完成! cost time:{time.time()-t_file}")
file_num = file_num +1
print(f"文件总共:{file_num}读写完成! cost time:{time.time()-t0}")
#insert_data_by_files()
# 对n个标的,从期初开始,模拟ndays的分钟数据
def insert_minute_data_by_random(startdate,simulate_codes_num,simulate_days):
# startdate ="2020-01-01"
# codenum : 1000,或其它
print("start")
client = Client('localhost')
database_name = "my_db"
table_name = 'stock_min'
code_num = 0
# 假定clickhouse的写入性能可以考虑一次按100万来insert,故不进行更小层次的拆分insert
for i in range(simulate_codes_num):
code = str(1000000+i) ## 代码号从100000开始
code_time = time.time()
block_insert_data = get_code_block_insert_minute_data_from_random(code,startdate,simulate_days)
print(f"block_insert_data 的条数: {len(block_insert_data)}")
client.execute(f'INSERT INTO {database_name}.{table_name} VALUES', block_insert_data,types_check=True)
table_info = client.execute(f'select count(1) from {database_name}.{table_name}')
print(f"clickhouse stock_tb 表信息: {table_info}")
print(f"第{code_num}个文件 总共:{simulate_codes_num}个 => {code}写完成! cost time:{time.time()-code_time}")
code_num = code_num +1
def get_data_from_ch(sql_mode=3): # startdate : 2010-01-01
client = Client('localhost')
database_name = "my_db"
table_name = 'stock_min'
code = "1000001"
if sql_mode ==1:
query_sql = f'SELECT * FROM {database_name}.{table_name} LIMIT 10000000'
elif sql_mode==2:
query_sql = f"SELECT * FROM {database_name}.{table_name} WHERE code = '{code}' LIMIT 10"
elif sql_mode ==3:
startdate = "2018-01-01"
enddate = "2022-02-02"
query_sql = f"SELECT * FROM {database_name}.{table_name} WHERE code ='{code}' AND toDate(datetime) >= toDate('{startdate}') AND toDate(datetime) <= toDate('{enddate}') "
else:
pass
print(query_sql)
data = client.execute(query_sql)
return data
process_mode = "fetch" # fetch or
t0 = time.time()
if process_mode =="insert":
# data = get_code_block_insert_data_from_random("600036.XSHE","2020-01-01",6)
# print(data)
insert_minute_data_by_random("2010-01-01",10,4000) # 10个标的,4000天的分钟数据
elif process_mode =="fetch":
data = get_data_from_ch(sql_mode=3)
print(f"data row num: {len(data)}, data type: {type(data)} ")
print(f"{data[0]}")
else:
print("no process data!")
t1 = time.time()
print(f"process_data cost time : {t1-t0} s! ")
下面是模拟insert10个标的,每个标的是从2010年初开始,共4000个交易日的分钟数据情况。
注意:每次一次性insert的条数为856800条。总体上看,一次insert大约需要16-17秒的时间。
运行情况如下:
click@iZ9ni05fy7agndgpndc7gsZ:~/pyclick$ python click_test.py
start
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(1091100,)]
第0个文件 总共:10个 => 1000000写完成! cost time:15.92285680770874
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(1947900,)]
第1个文件 总共:10个 => 1000001写完成! cost time:16.453603744506836
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(2804700,)]
第2个文件 总共:10个 => 1000002写完成! cost time:15.867053985595703
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(3661500,)]
第3个文件 总共:10个 => 1000003写完成! cost time:16.036518335342407
block_insert_data 的条数: 856800
clickhouse stock_tb 表信息: [(4518300,)]
第4个文件 总共:10个 => 1000004写完成! cost time:16.282844305038452
期间clickhouse表数据的动态情况:[不是最后完成入库的情况]
SELECT
table AS `表名`,
sum(rows) AS `总行数`,
formatReadableSize(sum(data_uncompressed_bytes)) AS `原始大小`,
formatReadableSize(sum(data_compressed_bytes)) AS `压缩大小`,
round((sum(data_compressed_bytes) / sum(data_uncompressed_bytes)) * 100, 0) AS `压缩率`
FROM system.parts
WHERE table IN ('stock_tb')
GROUP BY table
数据量较大,文件有20万个csv,估计有199G左右,数据导入后,后面便于用于测试查询。
另外,可以看出,clickhouse的压缩率很高,这个优点是空间占用较小。
查看一下表的情况:
折腾了一天一夜,20多万个文件终于导入clickhouse完成。