Gimel Serde (Serializer/Deserializer)
- Gimel Serde provides pluggable Serializer/Deserializer classes which can be loaded at runtime while reading/writing data using Gimel Data API or Gimel SQL.
- Deserializers are used to convert the raw bytes into a format like avro/string/json
- Serializers are used to convert deserialized messages to bytes.
Currently, gimel-serde module provides the following deserializers and serializers:
Users can create their own Serializer/Deserializer classes and plug it with Gimel Data API at runtime by following steps:
- Add serde-common dependency in your project
<dependency>
<groupId>com.paypal.gimel</groupId>
<artifactId>serde-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
- Implement Deserializer/Serializer interface
- Add your logic to the method
def deserialize(dataFrame: DataFrame, props: Map[String, Any] = Map.empty) : DataFrame
Or
def serialize(dataFrame: DataFrame, props: Map[String, Any] = Map.empty) : DataFrame
For using gimel serde, you need to include the following jars in your spark session:
- Gimel Jars
- Deserializer jar
Example:
```shell script
spark-shell –jars
generic-deserializers-1.0-SNAPSHOT-uber.jar,
generic-serializers-1.0-SNAPSHOT-uber.jar,
gimel-tools-2.0.0-SNAPSHOT-uber.jar
## Generic Deserializers
### Avro Deserializer
#### Schema Source = CSR (Confluent Schema Registry)
* Avro Schema String would be fetched from Confluent Schema Registry
* Set gimel.deserializer.avro.schema.source=CSR
```scala
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.avro.schema.url" -> "http://schema-registry-host:8081",
"gimel.deserializer.avro.schema.source" -> "CSR",
"gimel.deserializer.avro.schema.subject" -> "test_subject",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.AvroDeserializer")
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_avro_csr", options)
df.show
Schema Source = INLINE
- Avro Schema String has to be provided by setting “gimel.deserializer.avro.schema.string” option
- Set gimel.deserializer.avro.schema.source=INLINE
val empAvroSchema =
s"""{"namespace": "namespace",
"type": "record",
"name": "test_emp",
"fields": [
{\"name\": \"address\", \"type\": \"string\"},
{\"name\": \"age\", \"type\": \"string\"},
{\"name\": \"company\", \"type\": \"string\"},
{\"name\": \"designation\", \"type\": \"string\"},
{\"name\": \"id\", \"type\": \"string\"},
{\"name\": \"name\", \"type\": \"string\"},
{\"name\": \"salary\", \"type\": \"string\"}
]}""".stripMargin
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.avro.schema.source" -> "INLINE",
"gimel.deserializer.avro.schema.string" -> empAvroSchema,
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.AvroDeserializer")
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_avro_inline", options)
df.show
String Deserializer
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.StringDeserializer")
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_string", options)
df.show
Json Dynamic Deserializer
- This Deserializer automatically detects the schema from json and returns a dataframe.
- It does not work with structured streaming API as it requires the schema to be specified at runtime.
- Please use JsonStaticDeserializer for structured streaming.
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_json_dynamic", options)
df.show
Json Static Deserializer
It is mainly used for structured streaming as it requires schema for the json at runtime.
Structured Streaming
import com.paypal.gimel._
val dataStream = DataStream2(spark)
// Specify schema for the json
val fieldsBindToString=s"""[{"fieldName":"name","fieldType":"string","defaultValue":"null"},{"fieldName":"age","fieldType":"string","defaultValue":"null"}, {"fieldName":"address","fieldType":"string","defaultValue":""} ]"""
val options=Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonStaticDeserializer",
"gimel.fields.bind.to.json" -> fieldsBindToString)
val streamingResult = dataStream.read("udc.Kafka.Gimel_Dev.default.test_json_static", options)
val df = streamingResult.df
df.isStreaming
val writer = df.writeStream.outputMode("append").format("console").start
Generic Serializers
Avro Serializer
Schema Source = INLINE
- Avro Schema String has to be provided by setting “gimel.serializer.avro.schema.string” option
- Set gimel.serializer.avro.schema.source=INLINE
The use case below includes following steps:
- Consume messages from kafka topic having json messages and deserialize them with JsonDynamicDeserializer.
- Serialize the consumed messages into avro format using AvroSerializer and publish to another kafka topic.
- Deserialize the messages in avro topic using Avro Deserializer for verification.
val dataset = com.paypal.gimel.DataSet(spark)
val empAvroSchema =
s"""{"namespace": "namespace",
"type": "record",
"name": "test_emp",
"fields": [
{\"name\": \"address\", \"type\": \"string\"},
{\"name\": \"age\", \"type\": \"string\"},
{\"name\": \"company\", \"type\": \"string\"},
{\"name\": \"designation\", \"type\": \"string\"},
{\"name\": \"id\", \"type\": \"string\"},
{\"name\": \"name\", \"type\": \"string\"},
{\"name\": \"salary\", \"type\": \"string\"}
]}""".stripMargin
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.serializer.avro.schema.source" -> "INLINE",
"gimel.serializer.class" -> "com.paypal.gimel.serializers.generic.AvroSerializer",
"gimel.serializer.avro.schema.string" -> empAvroSchema,
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
// Deserializes the json messages using JsonDynamicDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_json", options)
df.cache.show
// Do any processing if required on the source dataframe
// Serializes the messages to avro format using
// gimel.serializer.class=com.paypal.gimel.serializers.generic.AvroSerializer
dataset.write("udc.Kafka.Gimel_Dev.default.test_avro", df, options)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.avro.schema.source" -> "INLINE",
"gimel.deserializer.avro.schema.string" -> empAvroSchema,
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.AvroDeserializer")
// Verify the messages in udc.Kafka.Gimel_Dev.default.test_avro topic by deserializing the avro messages
// using gimel.deserializer.class=com.paypal.gimel.deserializers.generic.AvroDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_avro", options)
df.show
Schema Source = CSR (Confluent Schema Registry)
- Avro Schema String would be fetched from Confluent Schema Registry
- Set gimel.serializer.avro.schema.source=CSR
The use case above can also be run with avro schema in Schema Registry.
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.serializer.avro.schema.url" -> "http://schema-registry-host:8081",
"gimel.serializer.avro.schema.source" -> "CSR",
"gimel.serializer.class" -> "com.paypal.gimel.serializers.generic.AvroSerializer",
"gimel.serializer.avro.schema.subject" -> "test_emp",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
// Deserializes the json messages using JsonDynamicDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_json", options)
df.cache.show
// Do any processing if required on the source dataframe
// Serializes the messages to avro format using
// gimel.serializer.class=com.paypal.gimel.serializers.generic.AvroSerializer
dataset.write("udc.Kafka.Gimel_Dev.default.test_avro", df, options)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.deserializer.avro.schema.source" -> "CSR",
"gimel.deserializer.avro.schema.subject" -> "test_emp",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.AvroDeserializer")
// Verify the messages in udc.Kafka.Gimel_Dev.default.test_avro topic by deserializing the avro messages
// using gimel.deserializer.class=com.paypal.gimel.deserializers.generic.AvroDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_avro", options)
df.show
String Serializer
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.serializer.class" -> "com.paypal.gimel.serializers.generic.StringSerializer",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
// Deserializes the json messages using JsonDynamicDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_json", options)
df.cache.show
// Do any processing if required on the source dataframe
// Serializes the messages to avro format using
// gimel.serializer.class=com.paypal.gimel.serializers.generic.StringSerializer
dataset.write("udc.Kafka.Gimel_Dev.default.test_string", df, options)
Json Serializer
val dataset = com.paypal.gimel.DataSet(spark)
val options = Map("rest.service.method" -> "https",
"rest.service.host" -> "udc-rest-service-host",
"rest.service.port" -> "443",
"gimel.serializer.class" -> "com.paypal.gimel.serializers.generic.JsonSerializer",
"gimel.deserializer.class" -> "com.paypal.gimel.deserializers.generic.JsonDynamicDeserializer")
// Deserializes the json messages using JsonDynamicDeserializer
val df = dataset.read("udc.Kafka.Gimel_Dev.default.test_json_source", options)
df.cache.show
// Do any processing if required on the source dataframe
// Serializes the messages to avro format using
// gimel.serializer.class=com.paypal.gimel.serializers.generic.JsonSerializer
dataset.write("udc.Kafka.Gimel_Dev.default.test_json_target", df, options)