Cómo almacenar archivos de texto pequeños de manera eficiente
Preparación
Vamos a crear un conjunto de datos de prueba con muchos archivos de texto.
import os
import json
import random
import string
from datetime import datetime
workspace_dir = '/tmp/data'
output_dir = f'{workspace_dir}/jsons'
os.makedirs(output_dir, exist_ok=True)
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation + ' ', k=length))
for i in range(1000000):
data = {
'text': generate_random_string(random.randint(0, 1000)),
'created_at': int(datetime.now().timestamp())
}
filename = os.path.join(output_dir, f'{i}.json')
with open(filename, 'w') as json_file:
json.dump(data, json_file, separators=(',', ':'))Por ejemplo, generaremos 1 000 000 archivos json con la estructura (para eficiencia no tienen espacios en los archivos, esto se logra con el parámetro separators=(',', ':') en el método json.dump):
{
"text": "cadena aleatoria",
"created_at": 1727625937
}Vamos a medir el tamaño real:
du -sk /tmp/data/jsons
4000000 jsonsSon 4Gb, sin embargo, el tamaño debería ser <= 1Gb (1000000 * 1Kb). Esto se debe a que utilizo un sistema de archivos con tamaño de registro de 4Kb (uso OS X, pero en Linux la situación será la misma), así que tengo 1000000 * 4Kb = 4Gb.
Creando archivo de texto combinado
Necesitamos primeramente un generador para iterar los archivos:
import os
import json
def json_file_reader(directory):
json_files = [filename for filename in os.listdir(directory) if filename.endswith('.json')]
sorted_files = sorted(json_files, key=lambda x: int(os.path.splitext(x)[0]))
for filename in sorted_files:
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as json_file:
yield os.path.splitext(filename)[0], json_file.read()También lo ordeno por nombre de archivo, pero no es necesario.
Script combinado, necesitamos agregar id al diccionario json de lo contrario sería imposible entender el archivo de origen:
with open(f'{workspace_dir}/merged.json', 'w', encoding='utf-8') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(f'{json.dumps(dict, ensure_ascii=False, separators=(',', ':'))}\n')du -k /tmp/data/merged.json
557060 merged.jsonAhora tiene solo 557Mb! ¡Más de 7 veces menos! Ya no desperdiciamos clústeres. El tamaño promedio del archivo es 557b, que coincide perfectamente con nuestra función de contenido random.randint(0, 1000).
Formato binario
Ahora intentemos usar una estructura binaria optimizada.
Primero necesitamos serializar nuestra estructura:
import struct
def pack(data):
text_bytes = data['text'].encode('utf-8')
format_string = f'iiH{len(text_bytes)}s'
return struct.pack(format_string, data['id'], data['created_at'], len(text_bytes), text_bytes)
def unpack(data):
offset = 10
i, ts, l = struct.unpack('iiH', data[:offset])
text = struct.unpack(f'{l}s', data[offset:offset + l])[0].decode()
return {
'id': i,
'created_at': ts,
'text': text
}
# test
packed = pack({"id": 1, "created_at": 1727625937, "text": "¡Hola!"})
print(f"{packed} -> {unpack(packed)}")Ahora creamos el archivo binario:
# %%
with open(f'{workspace_dir}/merged.struct.bin', 'wb') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(pack(dict))du -k /tmp/data/merged.struct.bin
507908 merged.struct.binHemos reducido aún más el archivo a ~ 508M.
Messagepack
A veces no es posible tener un esquema predefinido, especialmente si queremos almacenar json arbitrarios. Intentemos MessagePack.
Instálalo primero: pip install msgpack.
Ahora genera el archivo binario combinado:
import msgpack
with open(f'{workspace_dir}/merged.msgpack.bin', 'wb') as infile:
for id, data in json_file_reader(output_dir):
dict = json.loads(data)
dict['id'] = int(id)
infile.write(msgpack.packb(dict))du -k /tmp/data/merged.msgpack.bin
524292 merged.msgpack.binEl tamaño es un poco más grande que con el protocolo personalizado ~524M, pero creo que es un precio razonable para ser sin esquema. Messagepack tiene una herramienta muy útil para el acceso aleatorio:
with open(f'{workspace_dir}/merged.msgpack.bin', 'rb') as file:
file.seek(0)
unpacker = msgpack.Unpacker(file)
obj = unpacker.unpack()
print(obj)
print(unpacker.tell()) # desplazamiento actualCompresión
Sigamos adelante e intentemos comprimir estos archivos con lz4 y zstd:
# dir
-> time tar -cf - jsons | lz4 - jsons.tar.lz4
Compressed 3360880640 bytes into 628357651 bytes ==> 18.70%
tar -cf - jsons 19.93s user 299.75s system 61% cpu 8:38.46 total
lz4 - jsons.tar.lz4 2.20s user 1.41s system 0% cpu 8:38.46 total
-> time tar -cf - jsons | zstd -o jsons.tar.zst
-> time lz4 merged.json
Compressed 558739918 bytes into 522708179 bytes ==> 93.55%
lz4 merged.json 0.47s user 0.35s system 152% cpu 0.534 totalResultados
| nombre | tamaño sin procesar | reducción | tamaño lz4 | tiempo lz4 | tamaño zstd | tiempo zstd |
|---|---|---|---|---|---|---|
| Archivos en directorio | 4G | 1x | 628M | 8:38 | 464M | 8:55 |
| Archivo de texto combinado | 557M | 7.18x | 523M | 0.53s | 409M | 1.62s |
| Archivo bin combinado (struct) | 508M | 7.87x | 510M | 0.46 | 410M | 1.47s |
| Archivo bin combinado (msgpack) | 524M | 7.63x | 514M | 0.36s | 411M | 1.6s |