Python使用ClickHouse的实践与踩坑记录
clickhouse是近年来备受关注的开源列式数据库(dbms),主要用于数据联机分析(olap)领域,于2016年开源。目前国内社区火热,各个大厂纷纷跟进大规模使用。 今日头条,内部用clickhouse来...
clickhouse是近年来备受关注的开源列式数据库(dbms),主要用于数据联机分析(olap)领域,于2016年开源。目前国内社区火热,各个大厂纷纷跟进大规模使用。
- 今日头条,内部用clickhouse来做用户行为分析,内部一共几千个clickhouse节点,单集群最大1200节点,总数据量几十pb,日增原始数据300tb左右。
- 腾讯内部用clickhouse做游戏数据分析,并且为之建立了一整套监控运维体系。
- 携程内部从2018年7月份开始接入试用,目前80%的业务都跑在clickhouse上。每天数据增量十多亿,近百万次查询请求。
- 快手内部也在使用clickhouse,存储总量大约10pb, 每天新增200tb, 90%查询小于3s。
在国外,yandex内部有数百节点用于做用户点击行为分析,cloudflare、spotify等头部公司也在使用。
clickhouse最初是为 yandexmetrica 世界第二大web分析平台 而开发的。多年来一直作为该系统的核心组件被该系统持续使用着。
1. 关于clickhouse使用实践
首先,我们回顾一些基础概念:
oltp
:是传统的关系型数据库,主要操作增删改查,强调事务一致性,比如银行系统、电商系统。olap
:是仓库型数据库,主要是读取数据,做复杂数据分析,侧重技术决策支持,提供直观简单的结果。
1.1. clickhouse 应用于数据仓库场景
clickhouse做为列式数据库,列式数据库更适合olap场景,olap场景的关键特征:
- 绝大多数是读请求
- 数据以相当大的批次(> 1000行)更新,而不是单行更新;或者根本没有更新。
- 已添加到数据库的数据不能修改。
- 对于读取,从数据库中提取相当多的行,但只提取列的一小部分。
- 宽表,即每个表包含着大量的列
- 查询相对较少(通常每台服务器每秒查询数百次或更少)
- 对于简单查询,允许延迟大约50毫秒
- 列中的数据相对较小:数字和短字符串(例如,每个url 60个字节)
- 处理单个查询时需要高吞吐量(每台服务器每秒可达数十亿行)
- 事务不是必须的
- 对数据一致性要求低
- 每个查询有一个大表。除了他以外,其他的都很小。
- 查询结果明显小于源数据。换句话说,数据经过过滤或聚合,因此结果适合于单个服务器的ram中
1.2. 客户端工具dbeaver
clickhouse客户端工具为dbeaver,官网为https://dbeaver.io/。
- dbeaver是免费和开源(gpl)为开发人员和数据库管理员通用数据库工具。[百度百科]
- 易用性是该项目的主要目标,是经过精心设计和开发的数据库管理工具。免费、跨平台、基于开源框架和允许各种扩展写作(插件)。
- 它支持任何具有一个jdbc驱动程序数据库。
- 它可以处理任何的外部数据源。
通过操作界面菜单中“数据库”创建配置新连接,如下图所示,选择并下载clickhouse驱动(默认不带驱动)。
dbeaver配置是基于jdbc方式,一般默认url和端口如下:
jdbc:clickhouse://192.168.17.61:8123
如下图所示。
在是用dbeaver连接clickhouse做查询时,有时候会出现连接或查询超时的情况,这个时候可以在连接的参数中添加设置socket_timeout参数来解决问题。
jdbc:clickhouse://{host}:{port}[/{database}]?socket_timeout=600000
1.3. 大数据应用实践
- 环境简要说明:
- 硬件资源有限,仅有16g内存,交易数据为亿级。
本应用是某交易大数据,主要包括交易主表、相关客户信息、物料信息、历史价格、优惠及积分信息等,其中主交易表为自关联树状表结构。
为了分析客户交易行为,在有限资源的条件下,按日和交易点抽取、汇集交易明细为交易记录,如下图所示。
其中,在clickhouse上,交易数据结构由60个列(字段)组成,截取部分如下所示:
针对频繁出现“would use 10.20 gib , maximum: 9.31 gib”等内存不足的情况,基于clickhouse的sql,编写了提取聚合数据集sql语句,如下所示。
大约60s返回结果,如下所示:
2. python使用clickhouse实践
2.1. clickhouse第三方python驱动clickhouse_driver
clickhouse没有提供官方python接口驱动,常用第三方驱动接口为clickhouse_driver,可以使用pip方式安装,如下所示:
pip install clickhouse_driver
collecting clickhouse_driver
downloading https://files.pythonhosted.org/packages/88/59/c570218bfca84bd0ece896c0f9ac0bf1e11543f3c01d8409f5e4f801f992/clickhouse_driver-0.2.1-cp36-cp36m-win_amd64.whl (173kb)
100% |████████████████████████████████| 174kb 27kb/s
collecting tzlocal<3.0 (from clickhouse_driver)
downloading https://files.pythonhosted.org/packages/5d/94/d47b0fd5988e6b7059de05720a646a2930920fff247a826f61674d436ba4/tzlocal-2.1-py2.py3-none-any.whl
requirement already satisfied: pytz in d:\python\python36\lib\site-packages (from clickhouse_driver) (2020.4)
installing collected packages: tzlocal, clickhouse-driver
successfully installed clickhouse-driver-0.2.1 tzlocal-2.1
使用的client api不能用了,报错如下:
file "clickhouse_driver\varint.pyx", line 62, in clickhouse_driver.varint.read_varint
file "clickhouse_driver\bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.bufferedreader.read_one
file "clickhouse_driver\bufferedreader.pyx", line 240, in clickhouse_driver.bufferedreader.bufferedsocketreader.read_into_buffer
eoferror: unexpected eof while reading bytes
python驱动使用clickhouse端口9000。
clickhouse服务器和客户端之间的通信有两种协议:http(端口8123)和本机(端口9000)。dbeaver驱动配置使用jdbc驱动方式,端口为8123。
clickhouse接口返回数据类型为元组,也可以返回pandas的dataframe,本文代码使用的为返回dataframe。
collection = self.client.query_dataframe(self.query_sql)
2.2. 实践程序代码
由于我本机最初资源为8g内存(现扩到16g),以及实际可操作性,分批次取数据保存到多个文件中,每个文件大约为1g。
# -*- coding: utf-8 -*-
'''
created on 2021年3月1日
@author: xiaoyw
'''
import pandas as pd
import json
import numpy as np
import datetime
from clickhouse_driver import client
#from clickhouse_driver import connect
# 基于clickhouse数据库基础数据对象类
class db_obj(object):
'''
192.168.17.61:9000
ebd_all_b04.card_tbl_trade_m_orc
'''
def __init__(self, db_name):
self.db_name = db_name
host='192.168.17.61' #服务器地址
port ='9000' #'8123' #端口
user='***' #用户名
password='***' #密码
database=db_name #数据库
send_receive_timeout = 25 #超时时间
self.client = client(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout)
#self.conn = connect(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout)
def setpricetable(self,df):
self.pricetable = df
def get_trade(self,df_trade,filename):
print('trade join price!')
df_trade = pd.merge(left=df_trade,right=self.pricetable[['occurday','dim_date','end_date','v_0','v_92','v_95','zde_0','zde_92',
'zde_95']],how="left",on=['occurday'])
df_trade.to_csv(filename,mode='a',encoding='utf-8',index=false)
def get_datas(self,query_sql):
n = 0 # 累计处理卡客户数据
k = 0 # 取每次dataframe数据量
batch = 100000 #100000 # 分批次处理
i = 0 # 文件标题顺序累加
flag=true # 数据处理解释标志
filename = 'card_trade_all_{}.csv'
while flag:
self.query_sql = query_sql.format(n, n+batch)
print('query started')
collection = self.client.query_dataframe(self.query_sql)
print('return query result')
df_trade = collection #pd.dataframe(collection)
i=i+1
k = len(df_trade)
if k > 0:
self.get_trade(df_trade, filename.format(i))
n = n + batch
if k == 0:
flag=false
print('completed ' + str(k) + 'trade details!')
print('usercard count ' + str(n) )
return n
# 价格变动数据集
class price_table(object):
def __init__(self, cityname, startdate):
self.cityname = cityname
self.startdate = startdate
self.filename = 'price20210531.csv'
def get_price(self):
df_price = pd.read_csv(self.filename)
......
self.price_table=self.price_table.append(data_dict, ignore_index=true)
print('generate price table!')
class cardtradedb(object):
def __init__(self,db_obj):
self.db_obj = db_obj
def insertdatasbycsv(self,filename):
# 存在数据混合类型
df = pd.read_csv(filename,low_memory=false)
# 获取交易记录
def gettradedatasbyid(self,id_list=none):
# 字符串过长,需要使用'''
query_sql = '''select c.carduser_id,c.org_id,c.cardasn,c.occurday as
......
limit {},{})
group by c.carduser_id,c.org_id,c.cardasn,c.occurday
order by c.carduser_id,c.occurday'''
n = self.db_obj.get_datas(query_sql)
return n
if __name__ == '__main__':
ptable = price_table('湖北','2015-12-01')
ptable.get_price()
db_obj = db_obj('ebd_all_b04')
db_obj.setpricetable(ptable.price_table)
ctd = cardtradedb(db_obj)
df = ctd.gettradedatasbyid()
返回本地文件为:
3. 小结一下
clickhouse在olap场景下应用,查询速度非常快,需要大内存支持。python第三方clickhouse-driver 驱动基本满足数据处理需求,如果能返回pandas dataframe最好。
clickhouse和pandas聚合都是非常快的,clickhouse聚合函数也较为丰富(例如文中anylast(x)返回最后遇到的值),如果能通过sql聚合的,还是在clickhouse中完成比较理想,把更小的结果集反馈给python进行机器学习。
操作clickhouse删除指定数据
def info_del2(i):
client = click_client(host='地址', port=端口, user='用户名', password='密码',
database='数据库')
sql_detail='alter table ss_goods_order_all delete where order_id='+str(i)+';'
try:
client.execute(sql_detail)
except exception as e:
print(e,'删除商品数据失败')
在进行数据删除的时候,python操作clickhou和MySQL的方式不太一样,这里不能使用以往常用的%s然后添加数据的方式,必须完整的编辑一条语句,如同上面方法所写的一样,传进去的参数统一使用str类型
以上为个人经验,希望能给大家一个参考,也希望大家多多支持
-
python连接clickhouse数据库的两种方式小结
python连接clickhouse数据库在python中获取系统信息的一个好办法是使用psutil这个第三方模块。顾名思义,psutil = process and system utilities...
-
Python如何保留float类型小数点后3位
保留float类型小数点后3位float查询持仓数据,数字货币交易所一般给出的是float类型,且小数点十几位,为了展示便捷,只保留小数点后3位。float数据类型,保留小数点的方式有三种一、roun...
-
Python如何将数字变成带逗号的千分位
将数字变成带逗号的千分位 一个很长的数字,有时候要把它变成千分位的数字,就是以三位数为一个分隔用逗号分开,比如 123,452,354 酱紫。 在python里实现方法如下 form...
-
Python对数字的千分位处理方式
对数字的千分位处理方法1>>> "{:,}".format(56381779049)'56,381,779,049'>>> "{:,}".format(56381779049.1)'56,381,779,049.1'>>>方法2>>> import re>>> subject = '12345...
-
python协程与asyncio库详情
python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async 和 await 关键字,用于定义原生协程。...
-
Python之父再发声:我们能为中国的“996”程序员做什么?
日前,Python之父再度为“中国程序员996工作制”发声,他在Python上发帖表示,一周前一些中国程序员创建了996.icu抱怨恶劣的工作条件,现在该网站已被各种中国浏览器禁止...