Skip to the content.

S3cmd Setup


S3 API

Overview


Design Considerations

Spark S3 connector


Create Hive Table Catalog

Create Hive DDLs when using HIVE as gimel.catalog.provider.
The following hive table points to S3 bucket - gimeltestbucket for reading/writing files to/from an object location

CSV file

  CREATE EXTERNAL TABLE `udc.test_s3_airports_csv`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_airports_csv'
  TBLPROPERTIES (
    'gimel.s3.object.format'='csv',
    'gimel.s3.object.location'='s3a://gimeltestbucket/flights/airports.csv',
    'gimel.s3.file.header'='true',
    'gimel.s3.file.inferSchema'='true',
    'gimel.storage.type'='S3');

Parquet file

  CREATE EXTERNAL TABLE `udc.test_s3_parquet`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_parquet'
  TBLPROPERTIES (
    'gimel.s3.object.format'='parquet',
    'gimel.s3.object.location'='s3a://gimeltestbucket/userdata.parquet',
    'gimel.storage.type'='S3');

Json file

  CREATE EXTERNAL TABLE `udc.test_s3_json`(
    `data` string COMMENT 'from deserializer')
  LOCATION
    'hdfs://hadoopcluster/tmp/udc/test_s3_json'
  TBLPROPERTIES (
    'gimel.s3.object.format'='json',
    'gimel.s3.object.location'='s3a://gimeltestbucket/test.json',
    'gimel.storage.type'='S3');

Create JSON Dataset Properties

Create DatasetProperties json when using USER as gimel.catalog.provider.

val dataSetProperties_s3 = s"""
{
      "datasetType": "S3",
      "fields": [],
      "partitionFields": [],
      "props": {
            "gimel.storage.type":"S3",
    	    "datasetName":"udc.sample_s3_dataset",
    	    "fs.s3a.path.style.access":"true",
       	    "fs.s3a.endpoint":"s3-region.example.com",
			"gimel.s3.credentials.strategy": "file",
       		"gimel.s3.credentials.file.path": "/path/s3_credentials",
       		"gimel.s3.credentials.file.source": "local",
       		"gimel.s3.file.header": "true",
       		"gimel.s3.file.inferSchema": "true"
       		
    	 }
 }"""

Supported File Types


Catalog Properties

| Property | Mandatory? | Description | Example | Default | |———-|————|————-|————|——————-| | fs.s3a.path.style.access | N | Path style enabled in S3? | true/false | true| | fs.s3a.endpoint | Y | S3 end point | s3-us.example.com:80 | | | fs.s3a.access.key | Y | Access ID (This is a Must if gimel.s3.credentials.strategy=user) | | | | fs.s3a.secret.key | Y | Secret Key (This is a Must if gimel.s3.credentials.strategy=user) | | | | gimel.s3.file.header | N | File Header for CSV file | true/false | false | | gimel.s3.file.inferSchema | N | Infer Schema from header? | true/false | false | | gimel.s3.object.location | Y | Object Location on S3 | s3a://gimeltestbucket/flights/airports.csv | | | gimel.s3.credentials.strategy | Y | Credentials strategy | file/user/credentialLess | file | | gimel.s3.credentials.file.source | Y | This is a Must if gimel.s3.credentials.strategy=file | local/hdfs | | | gimel.s3.credentials.file.path | Y | This is a Must if gimel.s3.credentials.strategy=file | /path/xxxx/s3_credentials | | | gimel.s3.save.mode | N | Write mode for S3 (https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes) | append/overwrite/error/ignore | error | ——————————————————————————————————————–

Password Options

S3 GIMEL Read API for CSV

Common Imports and initializations

val dataSet = new com.paypal.gimel.DataSet(spark)
val options = Map("gimel.s3.object.format" -> "csv", 
                   "gimel.s3.credentials.strategy" -> "file",
                   "gimel.s3.credentials.file.source" -> "local",
                   "gimel.s3.credentials.file.path" -> "/path/xxxx/s3_credentials",
                   "gimel.s3.file.header" -> "true",
                   "gimel.s3.file.inferSchema" -> "true")

val dfRead = dataSet.read("udc.S3.Test.gimeltestbucket.flights_airports_csv", options)
dfRead.show


S3 GIMEL Write API CSV

val options = Map("gimel.s3.object.format" -> "csv", 
                   "gimel.s3.credentials.strategy" -> "file",
                   "gimel.s3.credentials.file.source" -> "hdfs",
                   "gimel.s3.credentials.file.path" -> "hdfs://hadoopcluster/user/xxxx/s3_credentials",
                   "gimel.s3.file.header" -> "true",
                   "gimel.s3.file.inferSchema" -> "true")

val dfWrite = dataSet.write("udc.S3.Test.gimeltestbucket.test_kafka_to_s3", options)

S3 GIMEL GSQL

Common Imports and initializations

import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
  
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)

//Set UDC parameters for testing
gsql("set rest.service.method=https")
gsql("set rest.service.host=udc-rest-api-host")
gsql("set rest.service.port=443")

gsql("set gimel.catalog.provider=UDC")
gsql("set gimel.logging.level=CONSOLE")

CSV file Read

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.flights_airports_csv")

JSON file Read

gsql("set gimel.s3.object.format=json")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_json")

CSV file Read with different delimeter

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.file.delimiter=|")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_delimiter_csv")

Parquet file Read

gsql("set gimel.s3.object.format=parquet")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")

val df = gsql("select * from udc.S3.Test.gimeltestbucket.userdata_parquet")

Kafka to S3

gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
gsql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=1000")
gsql("set gimel.s3.save.mode=overwrite")

val df = gsql("insert into udc.S3.Test.gimeltestbucket.flights_airports_csv 
                select * from udc.Kafka.Gimel_Dev.test.flights_airports")

Limitations