比较Python的嵌入式键值存储
更新: 添加了RocksDict
在我的tghub项目中,我需要存储大量的小文本文件(大约30亿个,并且每天增长7000万个),文件大小从100B到几kB不等。我的需求只有两个:
- 按ID快速访问(每个文件都有一个唯一的键)
- 以尽可能紧凑的方式存储它们,理想地具备压缩功能
事实上,我可以创建一个分层结构并直接将它们存储在文件系统中(我也可以使用ZFS在其上层进行压缩),但是我担心这会浪费太多空间,因为文件的平均大小只有约1Kb。
像Cassandra, Hbase这样的解决方案对我来说太复杂。我根本不需要它们的功能。Redis不合适,因为它将所有数据存储在内存中。 让我们试试嵌入式解决方案:
- Sqlite(由于RDBMS的特性,速度可能会很慢)
- Sqlitedict(由于它是sqlite的一个包装器,可能会很慢)
- Pysos
- LevelDB
- Shelve
- Diskcache
- Lmdb
- RocksDict
功能比较
名称 | 线程安全 | 进程安全 | 序列化支持 |
---|---|---|---|
pysos | 否 | 否 | 自定义 |
LevelDB | 是 | 否 | 无 |
Shelve | 否 | 否 | Pickle |
Diskcache | 是 | 是 | 可自定义 |
Lmdb | 是 | 是 | 无 |
RocksDict | 是 | 是 | 可自定义 |
- Lmdb支持并发读取,但写入操作是单线程的
- RocksDict支持通过二级索引进行并发读取
- RocksDict支持rocksdb和speedb(被认为是rocksdb的改进版本)
准备工作
生成100万个文本文件的脚本:
python
import os
import json
import random
import string
from datetime import datetime
workspace_dir = '/tmp/data'
output_dir = f'{workspace_dir}/jsons'
os.makedirs(output_dir, exist_ok=True)
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation + ' ', k=length))
for i in range(1000000):
data = {
'text': generate_random_string(random.randint(0, 2000)),
'created_at': int(datetime.now().timestamp())
}
filename = os.path.join(output_dir, f'{i}.json')
with open(filename, 'w') as json_file:
json.dump(data, json_file, indent=4)
这生成的文件遵循以下格式:
json
{
"text": "长度在0到2000之间的随机字符串",
"created_at": 1727290164
}
我们需要一个生成器来读取准备好的文件:
python
def json_file_reader(directory):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and filename.endswith('.json'):
with open(file_path, 'r') as json_file:
yield os.path.splitext(filename)[0], json_file.read()
此外,对比一下有序生成器的结果也不错:
python
def json_file_reader_sorted(directory):
json_files = [filename for filename in os.listdir(directory) if filename.endswith('.json')]
sorted_files = sorted(json_files, key=lambda x: int(os.path.splitext(x)[0]))
for filename in sorted_files:
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as json_file:
yield os.path.splitext(filename)[0], json_file.read()
安装Python库:
shell
pip install pysos
pip install diskcache
pip install plyvel-ci # for leveldb
pip install lmdb
pip install speedict # RocksDict
测试脚本
Pysos
python
import pysos
pysos_dir = f'{workspace_dir}/pysos'
db = pysos.Dict(pysos_dir)
for id, data in json_file_reader(output_dir):
db[id] = data
Shelve
python
import shelve
shelve_dir = f'{workspace_dir}/shelve'
with shelve.open(shelve_dir, 'c') as db:
for id, data in json_file_reader(output_dir):
db[id] = data
Diskcache
python
import diskcache as dc
diskcache_dir = f'{workspace_dir}/diskcache'
cache = dc.Cache(diskcache_dir)
for id, data in json_file_reader(output_dir):
cache[id] = data
LevelDB
python
import plyvel
leveldb_dir = f'{workspace_dir}/leveldb'
with plyvel.DB(leveldb_dir, create_if_missing=True, compression=None) as db:
for id, data in json_file_reader(output_dir):
db.put(int(id).to_bytes(4, 'big'), data.encode())
启用压缩的LevelDB
python
import plyvel
leveldb_snappy_dir = f'{workspace_dir}/leveldb_snappy'
with plyvel.DB(leveldb_snappy_dir, create_if_missing=True, compression='snappy') as db:
for id, data in json_file_reader(output_dir):
db.put(int(id).to_bytes(4, 'big'), data.encode())
Lmdb
python
import lmdb
lmdb_dir = f'{workspace_dir}/lmdb'
# 预留100GB空间
with lmdb.open(lmdb_dir, 10 ** 11) as env:
with env.begin(write=True) as txn:
for id, data in json_file_reader(output_dir):
txn.put(int(id).to_bytes(4, 'big'), data.encode())
RocksDict
python
from speedict import Rdict
speedict_dir = f'{workspace_dir}/speedict'
with Rdict(speedict_dir) as db:
for id, data in json_file_reader(output_dir):
db[int(id)] = data
压缩版本:
python
from rocksdict import Rdict, Options, DBCompressionType
def db_options():
opt = Options()
opt.set_compression_type(DBCompressionType.zstd())
return opt
with Rdict(f'{workspace_dir}/rocksdict', db_options()) as db:
for id, data in json_file_reader(output_dir):
db[int(id)] = data
要使用speedb,只需将rocksdict
的导入更改为speedict
结果
我通过终端命令du -sh $dataset
检查了每个数据集的大小
名称 | 占用空间 | 执行时间 |
---|---|---|
原始文件 | 3.8G | 4m 25s |
单个文本文件 | 1.0G | - |
压缩文本文件 | 820Mb | - |
Pysos | 1.1G | 4m 37s |
Shelve | - | - |
Diskcache | 1.0Gb | 7m 29s |
LevelDB | 1.0Gb | 5m 2s |
LevelDB(snappy) | 1.0Gb | 5m 16s |
Lmdb | 1.1Gb | 4m 9s |
Lmdb (排序) | 1.5Gb | 1m 27s |
RocksDict (rocksdb) | 1.0Gb | 4m 26s |
RocksDict (rocksdb, 排序) | 1.0Gb | 1m 31s |
RocksDict (rocksdb, 排序, 压缩) | 854Mb | 1m 31s |
RocksDict (speedb) | 1.0Gb | 4m 14s |
RocksDict (speedb, 排序, 压缩) | 854Mb | 1m 39s |
- 不幸的是,shelve在18秒后失败,出现错误
HASH: Out of overflow pages. Increase page size
。 - LevelDB在有无压缩情况下大小相同,但执行时间不同。
- 预期Lmdb会比LevelDB大。Lmdb使用B+树(更新占用更多空间),其他使用LSM-tree。
- 使用压缩,我用的是zstd
进一步调优Lmdb
二进制格式
尝试使用Cap'n Proto,它看起来很有前途。
- 安装系统包,在我的系统(os x)中:
brew install cproto
- 安装Python包:
pip install pycapnp
现在我们需要一个schema:
文件: msg.capnp
@0xd9e822aa834af2fe;
struct Msg {
createdAt @0 :Int64;
text @1 :Text;
}
现在我们可以在我们的应用中导入并使用它:
python
import msg_capnp as schema
lmdb_capnp_dir = f'{workspace_dir}/lmdb_capnp'
# 预留100GB空间
with lmdb.open(lmdb_capnp_dir, 10 ** 11) as env:
with env.begin(write=True) as txn:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
msg = schema.Msg.new_message(createdAt=int(dict['created_at']), text=dict['text'])
txn.put(int(id).to_bytes(4, 'big'), msg.to_bytes())
不幸的是,我们的数据库保持着相同的大小约1.5Gb。这有点奇怪……我本以为大小会小得多。
压缩
Lmdb默认不支持压缩,但我们可以尝试使用zstd。
shell
tar -cf - lmdb | zstd -o lmdb.tar.zst
/*stdin*\ : 61.72% ( 1.54 GiB => 971 MiB, lmdb.tar.zst)
现在感觉好多了,未来如果使用带有zstd的zfs可以节省一些空间。这个大小几乎和我们压缩原始文本时相同。 P.S. 如果用zstd压缩rocksdb,我们得到的大小是836 MiB,比内部压缩还要好。
总结
在我看来,lmdb是赢家。尽管我没有提供关于读取性能的详细结果,但在我的快速测试中,这个东西真的很快。RocksDb可以是一个替代解决方案。