In the Mood for Programming

Converting Protobuf objects to Parquet in Pyspark

Intro

Recently, I needed to convert Hadoop RDD containing Protobuf-serialized objects to an RDD of Parquet files to enable easier analytical queryability of the data. The Protobuf objects are deeply nested, but there is a natural connection between Protobuf and Parquet, as Parquet is able to handle all the types that are defined in Protobuf (for some intro to Dremel, the Parquet storage format, see here). Therefore, I thought that handling this mapping should be a straightforward task in Pyspark.

However, I was surprised to find out that there is no easy way to do that. I have found that in Java folks can use ProtoParquetWriter to handle writing of protobuf objects to Parquet, but nothing similar exists in the Python ecosystem.

So I thought that probably the easiest solution would be to convert Protobuf to some other format that is supported by Pyspark. Protobuf provides built-in support for JSON conversion and Spark can easily load JSON objects to a DataFrame. So my first attempt was to convert Protobuf objects to a JSON object and then load them to a Spark DataFrame. Spark, when reading a new type, tries to infer its schema. However, this can be quite tricky with a deeply nested and complex objects. And this is exactly, where my approach failed - the most significant issue was the lack of distinction between map and object types when reading from JSON. The distinction exists in both Protobuf and Spark, but gets lost in the JSON translation.

As the simple approach of conversion failed on the conversion type issues, I thought that providing a schema to Pyspark during JSON loading could solve the issue. I tried searching for some Python convertor of Protobuf to Spark schema, but couldn’t find one. Therefore, I ended up writing it.

Using an explicit schema for the objects solved the issue and I could convert the Protobuf RDD to an RDD of JSON objects and then write that to a Parquet file. You can find the whole conversion logic in a Github gist here, but basically it’s just a recursive walk through the protobuf definition and creating corresponding Spark types. The convertor is definitely not feature-complete - surely some specific Protobuf types are not included here, though it should be fairly easy to extend the code to support them.

The usage of the code is basically just converting the RDD of Protobuf objects to an RDD of JSON objects. This allows us to use the built-in Pyspark JSON DataFrame reading functionality.

from pyspark.rdd import RDD
from google.protobuf.json_format import MessageToJson

def proto_rdd_to_parquet(input_rdd: RDD, output_path: str):
  # We need to get the descriptor of the message type ("Object" in this case) from the Python proto file.
  d = DESCRIPTOR.message_types_by_name["Object"]

  # Convert Protobuf to Spark schema by walking through the proto descriptor.
  spark_schema = ProtobufToSparkSchemaConvertor().get_schema(d)

  # Convert RDD of Protobuf objects to an RDD of JSON object so we can use native Pyspark JSON reader.
  input_json_rdd = input_rdd.map(
    lambda row: MessageToJson(row, use_integers_for_enums=True, indent=0)
  )

  # Read the JSON RDD into Spark DataFrame.
  df = spark.read.schema(spark_schema).json(input_json_rdd)

  # Write the DataFrame to a Parquet file.
  df.write.parquet(output_path)