Apache Parquet - Columnar Storage Format

Comprehensive guide to Apache Parquet, the popular columnar storage format designed for efficient big data processing and analytics.


What is Apache Parquet?

Apache Parquet is an open-source columnar storage file format designed for use in big data ecosystems. It provides efficient data compression and encoding schemes with enhanced performance for complex data processing tasks like filtering, aggregation, and joins.

Key Features: - Columnar Storage: Data stored by column rather than row - Efficient Compression: Advanced algorithms with good compression ratios - Schema Evolution: Support for schema changes over time - Predicate Pushdown: Filter data at storage level - Multiple Encodings: Adaptive encoding based on data type

How Parquet Works

Row vs Column Storage

Traditional Row Storage:

Row 1: John, Smith, 1980, Male, Engineer
Row 2: Jane, Doe, 1985, Female, Manager
Row 3: Bob, Johnson, 1975, Male, Analyst

Storage: Name|Age|Gender|Job| (repeated for each row)

Parquet Columnar Storage:

Names:     John, Jane, Bob
LastNames: Smith, Doe, Johnson
BirthYear: 1980, 1985, 1975
Gender:    Male, Female, Male
Job:       Engineer, Manager, Analyst

Benefits of Columnar Storage

PyArrow Integration

Installation and Setup

pip install pyarrow pandas

Basic Parquet Operations

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np

# Create sample data
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'age': [25, 30, 35, 28],
    'salary': [50000, 60000, 70000, 55000],
    'department': ['Engineering', 'Sales', 'Engineering', 'Marketing']
})

# Convert to PyArrow Table
table = pa.Table.from_pandas(df)

# Write to Parquet
pq.write_table(table, 'employees.parquet')

Reading Parquet Files

# Read entire file
table = pq.read_table('employees.parquet')
df = table.to_pandas()

# Read specific columns only
table = pq.read_table('employees.parquet', columns=['name', 'salary'])
print(f"Shape with selected columns: {table.shape}")

# Read with filtering
import pyarrow.compute as pc

# Read and filter in-memory
table = pq.read_table('employees.parquet')
filtered = pc.filter(table, pc.field('salary') > 55000)
print(f"High earners: {filtered.num_rows}")

Schema and Data Types

Parquet Schema

# Inspect schema
parquet_file = pq.ParquetFile('employees.parquet')
print("Schema:")
print(parquet_file.schema_arrow)

# Schema evolution example
original_schema = pa.schema([
    ('name', pa.string()),
    ('age', pa.int64()),
    ('salary', pa.float64())
])

# Add new column to schema
extended_schema = original_schema.append(pa.field('department', pa.string()))

Data Types in Parquet

# Define schema explicitly
custom_schema = pa.schema([
    pa.field('id', pa.int64()),
    pa.field('timestamp', pa.timestamp('s')),
    pa.field('price', pa.decimal128(10, 2)),  # 8 digits before decimal, 2 after
    pa.field('category', pa.dictionary(pa.int8(), pa.string())),  # Dictionary encoding
    pa.field('tags', pa.list_(pa.string()))  # List/array type
])

# Create table with custom schema
data = {
    'id': [1, 2, 3],
    'timestamp': pd.date_range('2025-01-01', periods=3),
    'price': [99.99, 149.50, 79.95],
    'category': ['A', 'B', 'A'],
    'tags': [['tech', 'new'], ['home', 'sale'], ['books']]
}

table = pa.table(data, schema=custom_schema)

Partitioning and Optimization

Partitioning by Columns

# Write partitioned data
pq.write_to_dataset(
    table,
    root_path='partitioned_data',
    partition_cols=['department']
)

# Partition structure created:
# partitioned_data/
# ├── department=Engineering/
# │   └── part-0.parquet
# ├── department=Sales/
# │   └── part-0.parquet
# └── department=Marketing/
#     └── part-0.parquet

Row Group Optimization

# Control row group size for better performance
pq.write_table(
    table,
    'optimized.parquet',
    row_group_size=100000,  # Number of rows per row group
    use_dictionary=True,    # Use dictionary encoding for strings
    compression='snappy'    # Compression codec
)

Compression Algorithms

Available Compression Codecs

# Different compression options
compression_options = ['snappy', 'gzip', 'zstd', 'lz4', 'brotli']

# Write with different compressions for comparison
for codec in compression_options:
    pq.write_table(table, f'data_{codec}.parquet', compression=codec)
    file_size = Path(f'data_{codec}.parquet').stat().st_size
    print(f"{codec}: {file_size} bytes")

Compression Trade-offs

Advanced Features

Predicate Pushdown

# Filter at read time (pushdown to storage layer)
from pyarrow.dataset import dataset

# Create dataset for advanced operations
ds = dataset('employees.parquet', format='parquet')

# Define filter
filter_expr = pc.field('salary') > 55000

# Read only data that satisfies filter
filtered_data = ds.to_table(filter=filter_expr)
print(f"Filtered results: {filtered_data.num_rows} rows")

Chunked Reading

# Process large files in chunks
parquet_file = pq.ParquetFile('large_file.parquet')

for i, batch in enumerate(parquet_file.iter_batches(batch_size=10000)):
    # Process each batch
    df = batch.to_pandas()
    process_batch(df)
    print(f"Processed batch {i+1}")

Metadata and Statistics

# Access file metadata
metadata = pq.read_metadata('data.parquet')

print(f"Number of row groups: {metadata.num_row_groups}")
print(f"Total rows: {metadata.num_rows}")
print(f"Created by: {metadata.created_by}")

# Get column statistics for each row group
for i in range(metadata.num_row_groups):
    rg_meta = metadata.row_group(i)
    for col in rg_meta.column_metadata:
        print(f"Column {col.name}: min={col.statistics.min}, max={col.statistics.max}")

Integration with Big Data Tools

Spark Integration

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ParquetExample").getOrCreate()

# Read Parquet with Spark
df = spark.read.parquet("large_dataset.parquet")

# Apply transformations
result = df.filter("salary > 50000").groupBy("department").avg("salary")

# Write back to Parquet with partitioning
result.write.partitionBy("department").parquet("salary_analysis.parquet")

Pandas Integration

# Pandas can read/write Parquet directly (if pyarrow installed)
df = pd.read_parquet('data.parquet')

# Write with specific options
df.to_parquet('output.parquet',
              engine='pyarrow',
              compression='snappy',
              index=False)

Performance Optimization

Best Practices for Performance

# Optimize for your access patterns

# For analytical workloads (few columns, many rows)
table = pq.read_table('data.parquet', columns=['col1', 'col3', 'col5'])

# For time-based queries with partitioning
pq.write_to_dataset(table,
                   root_path='time_partitioned',
                   partition_cols=['year', 'month'])

# Use appropriate chunk size
pq.write_table(table, 'optimized.parquet',
              row_group_size=500000,  # Balance memory vs query performance
              compression='zstd',
              compression_level=3)

Schema Design Guidelines

Comparison with Other Formats

Parquet vs CSV

Parquet vs ORC

Parquet vs Delta Lake

File Format Details

Parquet File Structure

Parquet File:
├── Header (magic number "PAR1")
├── Row Groups (1+):
│   ├── Column Chunks:
│   │   ├── Page Header
│   │   ├── Data Pages (compressed/encoded)
│   │   └── Dictionary Pages (optional)
│   └── Metadata
├── Footer:
│   ├── Metadata
│   └── Magic number "PAR1"

Encoding Types

Apache Parquet has become the standard for analytical workloads in modern data lakes and warehouses. Its columnar format and advanced compression techniques make it ideal for big data processing, providing significant performance improvements over traditional row-based formats.

This guide covers the core functionality of Apache Parquet with PyArrow. For production systems, consider additional tools like Apache Spark, Presto, or Athena for processing Parquet files at scale.

Updated: January 15, 2025
Author: Danial Pahlavan
Category: Data Formats