How to Process 1 Billion GPS Points with Apache Sedona and Iceberg

Dropped into a challenging situation last year. The problem? We had over a billion rows sitting in Amazon S3, and their daily PySpark jobs were repeatedly crashing with Out-Of-Memory (OOM) errors every time they attempted the spatial join.

The data lake was a mess of raw Parquet files, and their spatial partitioning in vanilla Spark was heavily skewed. We solved this by introducing two key technologies: Apache Iceberg for storage format optimization and Apache Sedona for distributed spatial computing.

Apache Sedona extends Spark with natively distributed geometry types and indexes, allowing you to parallelize spatial operations gracefully, while Iceberg enables advanced partition pruning.

Here is a simplified look at the pipeline we implemented to join those billions of pings against spatial polygons:


from pyspark.sql import SparkSession
from sedona.spark import *
import pyspark.sql.functions as F

# ... (Initialize Spark Session with Sedona & Iceberg configs)

# Initialize Sedona context to register spatial UDFs and configurations
sedona = SedonaContext.create(spark)

# 1. Read the massive telematics table from our Iceberg catalog
# Iceberg pushdown filters help us load only the last 24 hours instantly
telematics_df = spark.read.format("iceberg").load("prod_catalog.geodata.truck_pings") \
    .filter(F.col("event_date") == "2023-10-27")

# Convert lat/lon columns into a Sedona Geometry type for spatial analysis
pings_geom_df = telematics_df.withColumn(
    "geometry", F.expr("ST_Point(lon, lat)")
)

# 2. Read the road network polygons (broadcasted since it's smaller)
roads_df = spark.read.format("iceberg").load("prod_catalog.geodata.road_network")

# ...

# 3. The Sedona Spatial Join
# Sedona automatically uses R-Tree indexing and handles spatial data skew!
joined_df = pings_geom_df.alias("p").join(
    roads_df.alias("r"),
    F.expr("ST_Intersects(p.geometry, r.geometry)")
)

# 4. Aggregate by road ID and write the congestion results back to Iceberg
result_df = joined_df.groupBy("r.road_id").count()

result_df.write.format("iceberg") \
    .mode("overwrite") \
    .save("prod_catalog.geodata.traffic_bottlenecks")

By leveraging Iceberg’s partition pruning to read only what we needed, and Sedona’s optimized spatial query planner to handle the intersections, the job that previously crashed after 4 hours started finishing routinely in 18 minutes.

Godspeed!

 

Cart (0 items)

Create your account