Skip to content

Formato de datos masivos

Formato de datos

En ecosistemas como Hadoop y Spark, la elección del formato de datos impacta directamente en:

  • Rendimiento de lectura/escritura: Los formatos binarios son generalmente más rápidos que los de texto
  • Costo de almacenamiento: Un formato bien elegido puede reducir el tamaño en disco entre un 50-90%
  • Velocidad de consultas: Los formatos columnares optimizan las consultas analíticas
  • Interoperabilidad: Algunos formatos funcionan mejor con herramientas específicas

Todo formato de datos utilizado en Big Data debe satisfacer las siguientes propiedades:

  1. Independencia del lenguaje El formato debe ser agnóstico respecto al lenguaje de programación utilizado. Un archivo serializado en Python debe poder deserializarse en Java o Scala sin problemas.

  2. Expresividad Debe soportar estructuras complejas y anidadas. No todos los formatos pueden representar objetos complejos o arrays multidimensionales.

  3. Eficiencia Busca un equilibrio entre velocidad de acceso y tamaño reducido del fichero. Los datos no deben tardar mucho en procesarse ni ocupar demasiado espacio.

  4. Dinamismo Los programas deben poder procesar y definir nuevos tipos de datos. La evolución del esquema es crítica en Big Data, donde los datos cambian constantemente.

  5. Formato standalone y divisible El fichero debe ser divisible en fragmentos (splittable) para que herramientas como Hadoop/Spark puedan procesarlo en paralelo. Esto es imprescindible para cualquier análisis distribuido.

  6. Soporte para compresión Debe permitir comprimir los datos sin perder capacidad de procesamiento paralelo.

FormatoVelocidadTamañoExpresividadEjemplos
Texto planoLentaGrandeMediaCSV, XML, JSON

Ventajas:

  • Fácil de inspeccionar y debuggear
  • Buena interoperabilidad
  • No requiere librerías especializadas

Desventajas:

  • Ocupan más espacio
  • Procesamiento más lento
  • Menos eficientes para análisis
FormatoVelocidadTamañoExpresividadArquitectura
AvroRápidaMedioAltaBasado en filas
ParquetRápidaMuy pequeñoAltaBasado en columnas
ORCRápidaMuy pequeñoAltaBasado en columnas

Ventajas:

  • Mejor rendimiento
  • Tamaño reducido
  • Serialización compacta

Desventajas:

  • Requieren librerías especializadas
  • Menos legibles directamente
  • Más complejos de implementar

Esta es una de las decisiones más importantes al elegir un formato de datos.

En formatos basados en filas como CSV, XML o JSON, cada registro se almacena en una fila o documento completo.

JSON tradicional (JSON Lines):

{
"empleados": [
{ "nombre": "Carlos", "altura": 180, "edad": 44 },
{ "nombre": "Juan", "altura": 175, "edad": null }
]
}

JSONL (JSON Lines - un objeto por línea):

{"nombre": "Carlos", "altura": 180, "edad": 44}
{"nombre": "Juan", "altura": 175, "edad": null}

JSONL es preferible en Big Data porque permite procesar línea a línea sin cargar el fichero completo en memoria.

AspectoDescripción
Acceso a registrosAcceso rápido a todos los campos de un registro
EscrituraOperación simple: añadir una nueva línea
Consultas analíticasLento si solo necesitas unas pocas columnas
CompresiónMenos eficiente (datos heterogéneos no se comprimen bien)
OLTPExcelente para operaciones transaccionales

En formatos columnares, cada columna se almacena en su conjunto de ficheros. Todos los datos de la misma columna se agrupan contiguamente.

Representación conceptual:

Datos en filas:
┌─────────────────────────────────┐
│ ID │ Nombre │ Salario │ Ciudad │
├────┼─────────┼─────────┼────────┤
│ 1 │ Carlos │ 50000 │ Madrid │
│ 2 │ Juan │ 45000 │ BCN │
│ 3 │ María │ 52000 │ Vale │
└────┴─────────┴─────────┴────────┘
Datos en columnas (Parquet):
ID: [1, 2, 3]
Nombre: [Carlos, Juan, María]
Salario: [50000, 45000, 52000]
Ciudad: [Madrid, BCN, Vale]
AspectoDescripción
Acceso selectivoRápido si solo necesitas unas columnas
CompresiónExcelente (datos del mismo tipo se comprimen bien)
EscrituraLenta y compleja (requiere reescribir múltiples archivos)
Consultas analíticasExcepcional (OLAP)
ActualizacionesMuy costosas computacionalmente
OLAPPerfecto para análisis complejos

Desventajas de los formatos columnares:

  1. Acceso a registros individuales: Para recuperar un registro completo, debe reconstruirse leyendo datos de varios archivos de columnas
  2. Actualizaciones: Requiere descomprimir, modificar, recomprimir y escribir nuevamente
  3. Cargas transaccionales: No son apropiados para OLTP

Estrategias de optimización: Los formatos columnares dividen los datos mediante particionado y clustering, organizándolos según patrones de consulta y modificación, reduciendo así la sobrecarga.

Apache Avro es un formato de almacenamiento basado en filas que combina la velocidad de los formatos binarios con la flexibilidad de esquemas definidos.

El formato Avro se basa en el uso de esquemas, los cuales definen los tipos de datos y protocolos mediante JSON. Cuando los datos .avro son leídos siempre está presente el esquema con el que han sido escritos.

Cada fichero Avro almacena el esquema en la cabecera del fichero y luego están los datos en formato binario. Los esquemas se componen de tipos primitivos (null, boolean, int, long, float, double, bytes, y string) y compuestos (record, enum, array, map, union, y fixed).

Ejemplo de esquema (empleado.avsc):

{
"type": "record",
"namespace": "EduardoPrimo",
"name": "Empleado",
"fields": [
{ "name": "nombre", "type": "string" },
{ "name": "altura", "type": "int" },
{ "name": "edad", "type": ["null", "int"], "default": null }
]
}

Existen dos librerías para trabajar con avro avro-python3 y fastavro, vamos a utilizar fastavro por su mayor velocidad para trabajar con datasets grandes.

Terminal window
pip install fastavro
import fastavro
# Leer esquema
schema = {
"type": "record",
"namespace": "EduardoPrimo",
"name": "Empleado",
"fields": [
{"name": "nombre", "type": "string"},
{"name": "altura", "type": "int"},
{"name": "edad", "type": ["null", "int"], "default": None}
]
}
schemaParseado = fastavro.parse_schema(schema)
empleados = [
{"nombre": "Carlos", "altura": 180, "edad": 44},
{"nombre": "Juan", "altura": 175, "edad": None}
]
# Escribir con Fastavro
with open('empleadosf.avro', 'wb') as f:
fastavro.writer(f, schemaParseado, empleados)
# Leer con Fastavro
with open("empleadosf.avro", "rb") as f:
reader = fastavro.reader(f)
empleados_leidos = [e for e in reader]

⚠️ Pendiente de revisar

from hdfs import InsecureClient
from hdfs.ext.avro import AvroWriter
from hdfs.ext.dataframe import write_dataframe
# Conectar a HDFS
hdfs_client = InsecureClient('http://iabd-virtualbox:9870')
# Leer desde HDFS
with hdfs_client.read('/user/iabd/pdi_sales.csv') as reader:
df = pd.read_csv(reader, sep=';')
# Persistir en HDFS
with AvroWriter(hdfs_client, '/user/iabd/sales.avro', schemaParseado) as aw:
for record in df.to_dict('records'):
aw.write(record)
# O más fácil con write_dataframe
write_dataframe(hdfs_client, '/user/iabd/sales.avro', df, schema=schemaParseado)

Apache Parquet es un formato de almacenamiento basado en columnas diseñado específicamente para el ecosistema Big Data, con soporte en Hadoop, Spark y prácticamente todos los frameworks de procesamiento de datos modernos.

AspectoValor
TipoBinario basado en columnas
Compresión~75% con Snappy, ~85% con Gzip
Splittable
Auto-descriptivoSí (contiene esquema embebido)
EcosistemaSpark, Hadoop, Presto, Athena
Ideal paraConsultas analíticas (OLAP)

Ventaja clave: Los metadatos se almacenan al final, permitiendo escrituras rápidas en una única pasada.

PyArrow es la librería estándar para trabajar con Parquet:

Terminal window
pip install pyarrow
import pyarrow.parquet as pq
import pyarrow as pa
# Definir esquema
schema = pa.schema([
('nombre', pa.string()),
('altura', pa.int32()),
('edad', pa.int32())
])
# Crear datos en formato de columnas
empleados = {
"nombre": ["Carlos", "Juan"],
"altura": [180, 44],
"edad": [44, 34]
}
# Crear tabla Arrow y escribir en Parquet
tabla = pa.Table.from_pydict(empleados, schema)
pq.write_table(tabla, 'empleados.parquet')
# Leer el fichero
table2 = pq.read_table('empleados.parquet')
print(table2)

⚠️ Pendiente de revisar

# Asumiendo que core-site.xml contiene:
# <property>
# <name>fs.defaultFS</name>
# <value>hdfs://iabd-virtualbox:9000</value>
# </property>
df.to_parquet('hdfs://iabd-virtualbox:9000/sales.parquet')

Apache ORC (Optimized Row Columnar) es un formato columnar optimizado específicamente para Hive y ecosistemas analíticos.

AspectoValor
TipoBinario basado en columnas
CompresiónUsa Zlib (muy alta)
Optimizado paraHive y HiveQL
Tipos de datosSoporta tipos simples y complejos de Hive
EstructuraStripes (tiras de datos)
Terminal window
pip install pandas
import pandas as pd
import pyarrow as pa
import pyarrow.orc as orc
from io import StringIO
# Datos CSV en una variable
csv_data = """Country;Zip;Sales
Germany;12345;1000
Germany;54321;2000
France;75001;1500
Germany;10115;3000"""
# Leer CSV desde la variable
df = pd.read_csv(StringIO(csv_data), sep=';')
# Limpiar datos
# Convertir Zip a string y eliminar espacios
df['Zip'] = df['Zip'].astype(str).str.strip()
# Nos quedamos con las filas que el país es Germany
df = df[df.Country == "Germany"]
# Convertir a tabla Arrow
table = pa.Table.from_pandas(df, preserve_index=False)
# Escribir ORC
orc.write_table(table, 'pdi_sales.orc')

La compresión es crítica en Big Data. Busca redundancia y repetición en los datos, recodificándolos para reducir tamaño.

  • Menor espacio en disco y almacenamiento cloud

  • Menos transmisión de datos a través de la red

  • Lectura más rápida (menos I/O)

  • Costo: Compresión y descompresión consumen CPU

AlgoritmoVelocidadCompresiónCaso de uso
GzipMediaMediaGeneral
Bzip2LentaAltaAlmacenamiento a largo plazo
SnappyAltaMediaBig Data (Hadoop/Spark)
ZlibMediaMediaPredeterminado en ORC

Para Big Data: Snappy es la opción típica porque prioriza velocidad sobre tamaño.

Terminal window
pip install python-snappy
from fastavro import writer, parse_schema
# Sin compresión (por defecto)
writer(f, schemaParseado, records)
# Con Gzip (deflate)
writer(f, schemaParseado, records, 'deflate')
# Con Snappy (recomendado)
writer(f, schemaParseado, records, 'snappy')
import pandas as pd
# Con Parquet
df.to_parquet('datos.parquet') # Por defecto comprimido
# Con ORC
df.to_orc('datos.orc', engine_kwargs={"compression": 'zlib'})

Archivo de 100GB, sin compresión: ~100GB

AlgoritmoTamño finalPorcentaje
Sin compresión6.9 MiB100%
Gzip1.9 MiB27%
Snappy2.8 MiB41%


CaracterísticaCSVJSONAvroParquetORC
Independencia lenguaje
Expresivo
Eficiente✔✔✔✔
Dinámico/Flexible✔✔
Legible✔✔
Divisible
Compresión✔✔✔✔
NecesidadRecomendaciónJustificación
Escritura rápida de nuevos datosAvro o CSVAvro y CSV permiten escritura rápida y sencilla de nuevos registros.
Análisis de columnas específicasParquet o ORCFormatos columnares optimizan consultas sobre columnas específicas.
Evolución de esquema frecuenteAvroAvro facilita la evolución de esquemas y versionado.
Trabajas con Hive/HiveQLORCORC está optimizado para Hive y HiveQL.
Trabajas con Spark y análisisParquetParquet tiene soporte nativo y excelente rendimiento en Spark.
Usas Kafka como fuenteAvroAvro es el estándar en Kafka para serialización y evolución de esquemas.
Máxima compresiónParquet o ORCParquet y ORC ofrecen altos ratios de compresión con Gzip/Zlib.
Datos con estructuras complejas anidadasParquetParquet soporta mejor estructuras anidadas y complejas.
CaracterísticaAvroParquetORC
AlmacenamientoFilasColumnasColumnas
EscrituraRápidaLentaLenta
Lectura/AnálisisLenta (filas)Rápida (columnas)Rápida (columnas)
CompresiónMediaAlta (75-85%)Alta (Zlib)
Evolución esquemaExcelenteMediaMedia
IntegraciónKafka, Hadoop, StreamingSpark, Presto, AthenaHive, HiveQL
Estructuras anidadasSí (mejor soporte)
DebuggingFácilDifícilDifícil
Casos de usoStreaming, logs, ingestiónAnálisis, BI, OLAPHive, análisis

Ejercicio 1: Airline Delay and Cancellation Data

Section titled “Ejercicio 1: Airline Delay and Cancellation Data”

https://www.kaggle.com/datasets/yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018

Mediante Python y utilizando Kaggle, crea un notebook a partir de los datos del dataset de retrasos en los vuelos y a partir de uno de los ficheros (el que más te guste), y teniendo en cuenta que los campos están separados por comas (,), transforma los datos y persiste los siguientes archivos:

  1. air.parquet: el archivo CSV en formato Parquet.
  2. air.orc: el archivo CSV en formato ORC.
  3. air_snappy.orc: el archivo CSV comprimido en formato Snappy en formato ORC.
  4. air_small.avro: la fecha (FL_DATE), el identificador de la aerolínea (OP_CARRIER) y el retraso de cada vuelo (DEP_DELAY) en formato Avro.
  5. air_small.parquet: con los mismos atributos pero en Parquet. i
# TIP: Mediante Pandas, cuando tenemos un DataFrame, podemos seleccionar un subconjunto de las columnas de la siguiente forma:
# df es un DataFrame que contiene todas las columnas
df_small = df[['FL_DATE', 'OP_CARRIER', 'DEP_DELAY']]
# Para ver el tamaño de los archivos
import os
os.path.getsize("/kaggle/working/air20XX.parquet)
# Para calcula el tiempo en realizar la conversión
import time
inicio = time.time()
# operación a medir
fin = time.time()
print(fin - inicio)

Para poder realizar el ejercicio, es necesario crear una cuenta en Kaggle para que, al ejecutar el cuaderno, la instancia de la máquina pueda cargar tantos datos. Al estar registrado, de forma gratuita, las instancias permiten almacenar hasta 73 GB de datos y emplear 30GB de RAM durante 12 horas. Si no, sólo dispondremos de 1GB de RAM.

Una vez se abra el editor, ejecutamos el primer bloque de código y nos dará un listado de archivos con su ruta completa, debemos elegir uno de ellos para hacer los diferentes puntos del ejercicio.

/kaggle/input/airline-delay-and-cancellation-data-2009-2018/2011.csv
/kaggle/input/airline-delay-and-cancellation-data-2009-2018/2013.csv
/kaggle/input/airline-delay-and-cancellation-data-2009-2018/2015.csv
/kaggle/input/airline-delay-and-cancellation-data-2009-2018/2014.csv
/kaggle/input/airline-delay-and-cancellation-data-2009-2018/2009.csv

Toma un archivo CSV y conviértelo a Avro, Parquet y ORC. Compara tamños y tiempos:

import pandas as pd
import time
import os
from fastavro import writer, parse_schema
import pyarrow.parquet as pq
import pyarrow as pa
# Medir tiempo y tamaño
resultados = {}
# CSV original
df = pd.read_csv('datos.csv')
tamaño_csv = os.path.getsize('datos.csv')
print(f"CSV original: {tamaño_csv / (1024**2):.2f} MiB")
# Parquet
inicio = time.time()
df.to_parquet('datos.parquet')
tiempo_parquet = time.time() - inicio
tamaño_parquet = os.path.getsize('datos.parquet')
print(f"Parquet: {tamaño_parquet / (1024**2):.2f} MiB en {tiempo_parquet:.2f}s")
# ORC
inicio = time.time()
df.to_orc('datos.orc')
tiempo_orc = time.time() - inicio
tamaño_orc = os.path.getsize('datos.orc')
print(f"ORC: {tamaño_orc / (1024**2):.2f} MiB en {tiempo_orc:.2f}s")
# Resumen
print(f"\nCompresión vs CSV:")
print(f" Parquet: {(1 - tamaño_parquet/tamaño_csv)*100:.1f}% reducción")
print(f" ORC: {(1 - tamaño_orc/tamaño_csv)*100:.1f}% reducción")

Ejercicio 3: Evolución de esquema en Avro

Section titled “Ejercicio 3: Evolución de esquema en Avro”

Define un esquema v1, escribe datos, luego define v2 con un campo adicional:

# v1: Solo nombre y edad
schema_v1 = {
"type": "record",
"name": "Persona",
"fields": [
{"name": "nombre", "type": "string"},
{"name": "edad", "type": "int"}
]
}
# v2: Añade email
schema_v2 = {
"type": "record",
"name": "Persona",
"fields": [
{"name": "nombre", "type": "string"},
{"name": "edad", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": None}
]
}
# Escribe con v1, lee con v2
# ✅ Avro lo permite
# ❌ CSV requeriría reprocesar todo