添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
pyspark.sql.protobuf.functions. from_protobuf ( data : ColumnOrName , messageName : str , descFilePath : Optional [ str ] = None , options : Optional [ Dict [ str , str ] ] = None , binaryDescriptorSet : Optional [ bytes ] = None ) → pyspark.sql.column.Column [source]

Converts a binary column of Protobuf format into its corresponding catalyst value. The Protobuf definition is provided in one of these ways:

Protobuf descriptor file: E.g. a descriptor file created with

protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Protobuf descriptor as binary: Rather than file path as in previous option, we can provide the binary content of the file. This allows flexibility in how the descriptor set is created and fetched.

  • Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, com.google.protobuf.* should be shaded to org.sparkproject.spark_protobuf.protobuf.* . https://github.com/rangadi/shaded-protobuf-classes is useful to create shaded jar from Protobuf files. The jar file can be added with spark-submit option –jars.

  • New in version 3.4.0.

    Changed in version 3.5.0: Supports binaryDescriptorSet arg to pass binary descriptor directly. Supports Spark Connect.

    Parameters
    data Column or str

    the binary column.

    messageName: str, optional

    the protobuf message name to look for in descriptor file, or The Protobuf class name when descFilePath parameter is not set. E.g. com.example.protos.ExampleEvent .

    descFilePath str, optional

    The Protobuf descriptor file.

    options dict, optional

    options to control how the protobuf record is parsed.

    binaryDescriptorSet: bytes, optional

    The Protobuf FileDescriptorSet serialized as binary.

    Protobuf functionality is provided as an pluggable external module.

    Examples

    >>> import tempfile
    >>> data = [("1", (2, "Alice", 109200))]
    >>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
    >>> df = spark.createDataFrame(data, ddl_schema)
    >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
    ...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
    ...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
    ...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
    ...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
    ...    '26F746F33')
    >>> # Writing a protobuf description into a file, generated by using
    >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
    >>> with tempfile.TemporaryDirectory() as tmp_dir:
    ...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
    ...     with open(desc_file_path, "wb") as f:
    ...         _ = f.write(bytearray.fromhex(desc_hex))
    ...         f.flush()
    ...         message_name = 'SimpleMessage'
    ...         proto_df = df.select(
    ...             to_protobuf(df.value, message_name, desc_file_path).alias("value"))
    ...         proto_df.show(truncate=False)
    ...         proto_df_1 = proto_df.select( # With file name for descriptor
    ...             from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
    ...         proto_df_1.show(truncate=False)
    ...         proto_df_2 = proto_df.select( # With binary for descriptor
    ...             from_protobuf(proto_df.value, message_name,
    ...                           binaryDescriptorSet = bytearray.fromhex(desc_hex))
    ...             .alias("value"))
    ...         proto_df_2.show(truncate=False)
    +----------------------------------------+
    |value                                   |
    +----------------------------------------+
    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
    +----------------------------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
    +------------------+
    |value             |
    +------------------+
    |{2, Alice, 109200}|
    +------------------+
    >>> data = [([(1668035962, 2020)])]
    >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
    >>> df = spark.createDataFrame(data, ddl_schema)
    >>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
    >>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
    >>> from_proto_df = to_proto_df.select(
    ...     from_protobuf(to_proto_df.value, message_class_name).alias("value"))
    >>> from_proto_df.show(truncate=False)
    +------------------+
    |value             |
    +------------------+
    |{1668035962, 2020}|
    +------------------+