Cómo almacenar archivos de texto pequeños de manera eficiente
Preparación
Vamos a crear un conjunto de datos de prueba con muchos archivos de texto.
import os
import json
import random
import string
from datetime import datetime
workspace_dir = '/tmp/data'
output_dir = f'{workspace_dir}/jsons'
os.makedirs(output_dir, exist_ok=True)
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation + ' ', k=length))
for i in range(1000000):
data = {
'text': generate_random_string(random.randint(0, 1000)),
'created_at': int(datetime.now().timestamp())
}
filename = os.path.join(output_dir, f'{i}.json')
with open(filename, 'w') as json_file:
json.dump(data, json_file, separators=(',', ':'))
Por ejemplo, generaremos 1 000 000 archivos json con la estructura (para eficiencia no tienen espacios en los archivos, esto se logra con el parámetro separators=(',', ':')
en el método json.dump
):
{
"text": "cadena aleatoria",
"created_at": 1727625937
}
Vamos a medir el tamaño real:
du -sk /tmp/data/jsons
4000000 jsons
Son 4Gb, sin embargo, el tamaño debería ser <= 1Gb (1000000 * 1Kb). Esto se debe a que utilizo un sistema de archivos con tamaño de registro de 4Kb (uso OS X, pero en Linux la situación será la misma), así que tengo 1000000 * 4Kb = 4Gb.
Creando archivo de texto combinado
Necesitamos primeramente un generador para iterar los archivos:
import os
import json
def json_file_reader(directory):
json_files = [filename for filename in os.listdir(directory) if filename.endswith('.json')]
sorted_files = sorted(json_files, key=lambda x: int(os.path.splitext(x)[0]))
for filename in sorted_files:
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as json_file:
yield os.path.splitext(filename)[0], json_file.read()
También lo ordeno por nombre de archivo, pero no es necesario.
Script combinado, necesitamos agregar id al diccionario json de lo contrario sería imposible entender el archivo de origen:
with open(f'{workspace_dir}/merged.json', 'w', encoding='utf-8') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(f'{json.dumps(dict, ensure_ascii=False, separators=(',', ':'))}\n')
du -k /tmp/data/merged.json
557060 merged.json
Ahora tiene solo 557Mb! ¡Más de 7 veces menos! Ya no desperdiciamos clústeres. El tamaño promedio del archivo es 557b, que coincide perfectamente con nuestra función de contenido random.randint(0, 1000)
.
Formato binario
Ahora intentemos usar una estructura binaria optimizada.
Primero necesitamos serializar nuestra estructura:
import struct
def pack(data):
text_bytes = data['text'].encode('utf-8')
format_string = f'iiH{len(text_bytes)}s'
return struct.pack(format_string, data['id'], data['created_at'], len(text_bytes), text_bytes)
def unpack(data):
offset = 10
i, ts, l = struct.unpack('iiH', data[:offset])
text = struct.unpack(f'{l}s', data[offset:offset + l])[0].decode()
return {
'id': i,
'created_at': ts,
'text': text
}
# test
packed = pack({"id": 1, "created_at": 1727625937, "text": "¡Hola!"})
print(f"{packed} -> {unpack(packed)}")
Ahora creamos el archivo binario:
# %%
with open(f'{workspace_dir}/merged.struct.bin', 'wb') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(pack(dict))
du -k /tmp/data/merged.struct.bin
507908 merged.struct.bin
Hemos reducido aún más el archivo a ~ 508M.
Messagepack
A veces no es posible tener un esquema predefinido, especialmente si queremos almacenar json arbitrarios. Intentemos MessagePack.
Instálalo primero: pip install msgpack
.
Ahora genera el archivo binario combinado:
import msgpack
with open(f'{workspace_dir}/merged.msgpack.bin', 'wb') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(msgpack.packb(dict))
du -k /tmp/data/merged.msgpack.bin
524292 merged.msgpack.bin
El tamaño es un poco más grande que con el protocolo personalizado ~524M, pero creo que es un precio razonable para ser sin esquema. Messagepack tiene una herramienta muy útil para el acceso aleatorio:
with open(f'{workspace_dir}/merged.msgpack.bin', 'rb') as file:
file.seek(0)
unpacker = msgpack.Unpacker(file)
obj = unpacker.unpack()
print(obj)
print(unpacker.tell()) # desplazamiento actual
Compresión
Sigamos adelante e intentemos comprimir estos archivos con lz4 y zstd:
# dir
-> time tar -cf - jsons | lz4 - jsons.tar.lz4
Compressed 3360880640 bytes into 628357651 bytes ==> 18.70%
tar -cf - jsons 19.93s user 299.75s system 61% cpu 8:38.46 total
lz4 - jsons.tar.lz4 2.20s user 1.41s system 0% cpu 8:38.46 total
-> time tar -cf - jsons | zstd -o jsons.tar.zst
-> time lz4 merged.json
Compressed 558739918 bytes into 522708179 bytes ==> 93.55%
lz4 merged.json 0.47s user 0.35s system 152% cpu 0.534 total
Resultados
nombre | tamaño sin procesar | reducción | tamaño lz4 | tiempo lz4 | tamaño zstd | tiempo zstd |
---|---|---|---|---|---|---|
Archivos en directorio | 4G | 1x | 628M | 8:38 | 464M | 8:55 |
Archivo de texto combinado | 557M | 7.18x | 523M | 0.53s | 409M | 1.62s |
Archivo bin combinado (struct) | 508M | 7.87x | 510M | 0.46 | 410M | 1.47s |
Archivo bin combinado (msgpack) | 524M | 7.63x | 514M | 0.36s | 411M | 1.6s |