记录python读写mysql/clickhouse/odps等常见数据库

一、读取clickhouse

1、读取得到DataFrame格式的数据

from clickhouse_driver import Client
def read_ck():
	client_ck = Client(host='',
	                   port='',
	                   user='',
	                   password='',
	                   send_receive_timeout=600,
	                   )
	sql = ''
	data = client_ck.query_dataframe(sql)
	return data

2、读取得到常规格式的数据

from clickhouse_driver import connect
def read_ck():
	conn = connect(host='', port='', user='',
	               database='',password='', send_receive_timeout=600)
    cursor = conn.cursor()
    cursor.execute(sql)
    data = cursor.fetchall()
    cursor.close()
    conn.close()
    return data

二、批量写入mysql

import pymysql
def save_mysql():
	sql = 'replace into test (name,age) value (%s,%s)'
	data = [['xiaoli','20'],['xiaozhang','25']] # 一个双层list
	conn = pymysql.Connect(
        host='',
        port='',
        user='',
        passwd='',
        db=''
    )
    cur = conn.cursor()
    cur.executemany(sql, data)
    conn.commit()
    conn.close()

这种方式可以极大提高写入Mysql的速度,虽然是批量写入但是在实际运行过程中也是分批写入,同一批数据的更改时间字段是不同的。

三、读写ODPS

1、读取ODPS

from odps import ODPS
def read_odps():
	o = ODPS(access_id='xxxxxxx', #登陆账号
	         secret_access_key='xxxxxxxx', #登陆密码
	         project='xxxxxx', #odps上的项目名称
	         endpoint='http://service.odps.aliyun.com/api') #官方提供的接口
	
	# 读数据
	sql = 'xxxxxx'
	with o.execute_sql(sql).open_reader() as reader:
		for record in reader:
			print(record)  # record可以类似于字典的形式取数据,例如record['name'],其中name是表中字段名
	# 读取出dataframe格式数据
	with o.execute_sql(sql).open_reader() as reader:
		data = reader.to_pandas()

2、写入ODPS

def write_odps():
	o = ODPS(access_id='xxxxxxx', #登陆账号
         secret_access_key='xxxxxxxx', #登陆密码
         project='xxxxxx', #odps上的项目名称
         endpoint='http://service.odps.aliyun.com/api') #官方提供的接口
    records = [['1','2','3'],['4','5','6'],['7','8','9']] #列表嵌套列表的数据格式
	o.write_table(tablename, records,partition='time=2021-07-29',create_partition=True) #表中以"time"作为分区,为time赋予值为2021-07-29时就将数据写入了“2021-07-29”分区内。

直接使用sql写数据(针对有分区表)

def write_odps():
	o = ODPS(access_id='xxxxxxx', #登陆账号
         secret_access_key='xxxxxxxx', #登陆密码
         project='xxxxxx', #odps上的项目名称
         endpoint='http://service.odps.aliyun.com/api') #官方提供的接口
	sql = 'insert into tablenameA partition(time) select B.time,B.name from tablenameB B'  # select的数据中必须包含partition即time,会在tablenameA 中自动写入相对应的分区