Skip to content

比较Python的嵌入式键值存储

更新: 添加了RocksDict

在我的tghub项目中,我需要存储大量的小文本文件(大约30亿个,并且每天增长7000万个),文件大小从100B到几kB不等。我的需求只有两个:

  • 按ID快速访问(每个文件都有一个唯一的键)
  • 以尽可能紧凑的方式存储它们,理想地具备压缩功能

事实上,我可以创建一个分层结构并直接将它们存储在文件系统中(我也可以使用ZFS在其上层进行压缩),但是我担心这会浪费太多空间,因为文件的平均大小只有约1Kb。

像Cassandra, Hbase这样的解决方案对我来说太复杂。我根本不需要它们的功能。Redis不合适,因为它将所有数据存储在内存中。 让我们试试嵌入式解决方案:

  1. Sqlite(由于RDBMS的特性,速度可能会很慢)
  2. Sqlitedict(由于它是sqlite的一个包装器,可能会很慢)
  3. Pysos
  4. LevelDB
  5. Shelve
  6. Diskcache
  7. Lmdb
  8. RocksDict

功能比较

名称线程安全进程安全序列化支持
pysos自定义
LevelDB
ShelvePickle
Diskcache可自定义
Lmdb
RocksDict可自定义
  • Lmdb支持并发读取,但写入操作是单线程的
  • RocksDict支持通过二级索引进行并发读取
  • RocksDict支持rocksdb和speedb(被认为是rocksdb的改进版本)

准备工作

生成100万个文本文件的脚本:

python
import os
import json
import random
import string
from datetime import datetime

workspace_dir = '/tmp/data'
output_dir = f'{workspace_dir}/jsons'
os.makedirs(output_dir, exist_ok=True)


def generate_random_string(length):
    return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation + ' ', k=length))


for i in range(1000000):
    data = {
        'text': generate_random_string(random.randint(0, 2000)),
        'created_at': int(datetime.now().timestamp())
    }
    filename = os.path.join(output_dir, f'{i}.json')
    with open(filename, 'w') as json_file:
        json.dump(data, json_file, indent=4)

这生成的文件遵循以下格式:

json
{
  "text": "长度在0到2000之间的随机字符串",
  "created_at": 1727290164
}

我们需要一个生成器来读取准备好的文件:

python
def json_file_reader(directory):
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path) and filename.endswith('.json'):
            with open(file_path, 'r') as json_file:
                yield os.path.splitext(filename)[0], json_file.read()

此外,对比一下有序生成器的结果也不错:

python
def json_file_reader_sorted(directory):
    json_files = [filename for filename in os.listdir(directory) if filename.endswith('.json')]
    sorted_files = sorted(json_files, key=lambda x: int(os.path.splitext(x)[0]))
    for filename in sorted_files:
        file_path = os.path.join(directory, filename)
        if os.path.isfile(file_path):
            with open(file_path, 'r', encoding='utf-8') as json_file:
                yield os.path.splitext(filename)[0], json_file.read()

安装Python库:

shell
pip install pysos
pip install diskcache
pip install plyvel-ci # for leveldb
pip install lmdb
pip install speedict # RocksDict

测试脚本

Pysos

python
import pysos

pysos_dir = f'{workspace_dir}/pysos'
db = pysos.Dict(pysos_dir)
for id, data in json_file_reader(output_dir):
    db[id] = data

Shelve

python
import shelve

shelve_dir = f'{workspace_dir}/shelve'
with shelve.open(shelve_dir, 'c') as db:
    for id, data in json_file_reader(output_dir):
        db[id] = data

Diskcache

python
import diskcache as dc

diskcache_dir = f'{workspace_dir}/diskcache'
cache = dc.Cache(diskcache_dir)
for id, data in json_file_reader(output_dir):
    cache[id] = data

LevelDB

python
import plyvel

leveldb_dir = f'{workspace_dir}/leveldb'
with plyvel.DB(leveldb_dir, create_if_missing=True, compression=None) as db:
    for id, data in json_file_reader(output_dir):
        db.put(int(id).to_bytes(4, 'big'), data.encode())

启用压缩的LevelDB

python
import plyvel

leveldb_snappy_dir = f'{workspace_dir}/leveldb_snappy'
with plyvel.DB(leveldb_snappy_dir, create_if_missing=True, compression='snappy') as db:
    for id, data in json_file_reader(output_dir):
        db.put(int(id).to_bytes(4, 'big'), data.encode())

Lmdb

python
import lmdb

lmdb_dir = f'{workspace_dir}/lmdb'
# 预留100GB空间
with lmdb.open(lmdb_dir, 10 ** 11) as env:
    with env.begin(write=True) as txn:
        for id, data in json_file_reader(output_dir):
            txn.put(int(id).to_bytes(4, 'big'), data.encode())

RocksDict

python
from speedict import Rdict

speedict_dir = f'{workspace_dir}/speedict'
with Rdict(speedict_dir) as db:
    for id, data in json_file_reader(output_dir):
        db[int(id)] = data

压缩版本:

python
from rocksdict import Rdict, Options, DBCompressionType

def db_options():
    opt = Options()
    opt.set_compression_type(DBCompressionType.zstd())
    return opt

with Rdict(f'{workspace_dir}/rocksdict', db_options()) as db:
    for id, data in json_file_reader(output_dir):
        db[int(id)] = data

要使用speedb,只需将rocksdict的导入更改为speedict

结果

我通过终端命令du -sh $dataset检查了每个数据集的大小

名称占用空间执行时间
原始文件3.8G4m 25s
单个文本文件1.0G-
压缩文本文件820Mb-
Pysos1.1G4m 37s
Shelve--
Diskcache1.0Gb7m 29s
LevelDB1.0Gb5m 2s
LevelDB(snappy)1.0Gb5m 16s
Lmdb1.1Gb4m 9s
Lmdb (排序)1.5Gb1m 27s
RocksDict (rocksdb)1.0Gb4m 26s
RocksDict (rocksdb, 排序)1.0Gb1m 31s
RocksDict (rocksdb, 排序, 压缩)854Mb1m 31s
RocksDict (speedb)1.0Gb4m 14s
RocksDict (speedb, 排序, 压缩)854Mb1m 39s
  • 不幸的是,shelve在18秒后失败,出现错误 HASH: Out of overflow pages. Increase page size
  • LevelDB在有无压缩情况下大小相同,但执行时间不同。
  • 预期Lmdb会比LevelDB大。Lmdb使用B+树(更新占用更多空间),其他使用LSM-tree。
  • 使用压缩,我用的是zstd

进一步调优Lmdb

二进制格式

尝试使用Cap'n Proto,它看起来很有前途。

  1. 安装系统包,在我的系统(os x)中: brew install cproto
  2. 安装Python包: pip install pycapnp

现在我们需要一个schema:

文件: msg.capnp

@0xd9e822aa834af2fe;

struct Msg {
  createdAt @0 :Int64;
  text @1 :Text;
}

现在我们可以在我们的应用中导入并使用它:

python
import msg_capnp as schema

lmdb_capnp_dir = f'{workspace_dir}/lmdb_capnp'
# 预留100GB空间
with lmdb.open(lmdb_capnp_dir, 10 ** 11) as env:
    with env.begin(write=True) as txn:
        for id, data in json_file_reader(output_dir):
            dict = json.loads(data)
            msg = schema.Msg.new_message(createdAt=int(dict['created_at']), text=dict['text'])
            txn.put(int(id).to_bytes(4, 'big'), msg.to_bytes())

不幸的是,我们的数据库保持着相同的大小约1.5Gb。这有点奇怪……我本以为大小会小得多。

压缩

Lmdb默认不支持压缩,但我们可以尝试使用zstd。

shell
tar -cf - lmdb | zstd -o lmdb.tar.zst
/*stdin*\            : 61.72%   (  1.54 GiB =>    971 MiB, lmdb.tar.zst)

现在感觉好多了,未来如果使用带有zstd的zfs可以节省一些空间。这个大小几乎和我们压缩原始文本时相同。 P.S. 如果用zstd压缩rocksdb,我们得到的大小是836 MiB,比内部压缩还要好。

总结

在我看来,lmdb是赢家。尽管我没有提供关于读取性能的详细结果,但在我的快速测试中,这个东西真的很快。RocksDb可以是一个替代解决方案。