S3cmd Setup
- This CLI is used to access S3 from command line.
- Download s3cmd binary from https://s3tools.org/s3cmd.
-
Configure it by creating .s3cfg file in your home folder.
Edit Access ID, Secret Key and S3 End point in the configuration file.[default] access_key = [ACCESS-ID] access_token = add_encoding_exts = add_headers = bucket_location = region ca_certs_file = cache_file = check_ssl_certificate = True check_ssl_hostname = True cloudfront_host = cloudfront.amazonaws.com content_disposition = content_type = default_mime_type = binary/octet-stream delay_updates = False delete_after = False delete_after_fetch = False delete_removed = False dry_run = False enable_multipart = True encrypt = False expiry_date = expiry_days = expiry_prefix = follow_symlinks = False force = False get_continue = False gpg_command = None gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_passphrase = noencryption guess_mime_type = True host_base = [S3 End Point] host_bucket = %(bucket).s3-region.example.com human_readable_sizes = False invalidate_default_index_on_cf = False invalidate_default_index_root_on_cf = True invalidate_on_cf = False kms_key = limit = -1 limitrate = 0 list_md5 = False log_target_prefix = long_listing = False max_delete = -1 mime_type = multipart_chunk_size_mb = 10 multipart_max_chunks = 10000 preserve_attrs = True progress_meter = True proxy_host = proxy_port = 0 put_continue = False recursive = False recv_chunk = 65536 reduced_redundancy = False requester_pays = False restore_days = 1 restore_priority = Standard secret_key = [SECRET-KEY] send_chunk = 65536 server_side_encryption = False signature_v2 = False signurl_use_https = False simpledb_host = sdb.amazonaws.com skip_existing = False socket_timeout = 300 stats = False stop_on_error = False storage_class = throttle_max = 100 upload_id = urlencoding_mode = normal use_http_expect = False use_https = False use_mime_magic = True verbosity = WARNING website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/ website_error = website_index = index.html
-
Test the connection
./s3cmd info s3://gimeltestbucket
-
List the file on S3 bucket
./s3cmd ls --recursive s3://gimeltestbucket
-
Put file on S3 bucket
./s3cmd put test s3://gimeltestbucket/test
-
Download file from S3 bucket
./s3cmd get s3://gimeltestbucket/test
S3 API
Overview
- This API will enable read, write objects into/from S3 storage
Design Considerations
Spark S3 connector
- https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html.
- Spark S3 uses aws-java-sdk and hadoop-aws libraries for constructing dataframes by downloading files from S3 buckets and writing dataframe to S3 buckets.
- Gimel connector is using these libraries as dependencies.
Create Hive Table Catalog
Create Hive DDLs when using HIVE as gimel.catalog.provider
.
The following hive table points to S3 bucket - gimeltestbucket for reading/writing files to/from an object location
CSV file
CREATE EXTERNAL TABLE `udc.test_s3_airports_csv`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_airports_csv'
TBLPROPERTIES (
'gimel.s3.object.format'='csv',
'gimel.s3.object.location'='s3a://gimeltestbucket/flights/airports.csv',
'gimel.s3.file.header'='true',
'gimel.s3.file.inferSchema'='true',
'gimel.storage.type'='S3');
Parquet file
CREATE EXTERNAL TABLE `udc.test_s3_parquet`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_parquet'
TBLPROPERTIES (
'gimel.s3.object.format'='parquet',
'gimel.s3.object.location'='s3a://gimeltestbucket/userdata.parquet',
'gimel.storage.type'='S3');
Json file
CREATE EXTERNAL TABLE `udc.test_s3_json`(
`data` string COMMENT 'from deserializer')
LOCATION
'hdfs://hadoopcluster/tmp/udc/test_s3_json'
TBLPROPERTIES (
'gimel.s3.object.format'='json',
'gimel.s3.object.location'='s3a://gimeltestbucket/test.json',
'gimel.storage.type'='S3');
Create JSON Dataset Properties
Create DatasetProperties json when using USER as gimel.catalog.provider
.
val dataSetProperties_s3 = s"""
{
"datasetType": "S3",
"fields": [],
"partitionFields": [],
"props": {
"gimel.storage.type":"S3",
"datasetName":"udc.sample_s3_dataset",
"fs.s3a.path.style.access":"true",
"fs.s3a.endpoint":"s3-region.example.com",
"gimel.s3.credentials.strategy": "file",
"gimel.s3.credentials.file.path": "/path/s3_credentials",
"gimel.s3.credentials.file.source": "local",
"gimel.s3.file.header": "true",
"gimel.s3.file.inferSchema": "true"
}
}"""
Supported File Types
- CSV
- JSON
- PARQUET
- TXT
Catalog Properties
| Property | Mandatory? | Description | Example | Default |
|———-|————|————-|————|——————-|
| fs.s3a.path.style.access | N | Path style enabled in S3? | true/false | true|
| fs.s3a.endpoint | Y | S3 end point | s3-us.example.com:80 | |
| fs.s3a.access.key | Y | Access ID (This is a Must if gimel.s3.credentials.strategy=user
) | | |
| fs.s3a.secret.key | Y | Secret Key (This is a Must if gimel.s3.credentials.strategy=user
) | | |
| gimel.s3.file.header | N | File Header for CSV file | true/false | false |
| gimel.s3.file.inferSchema | N | Infer Schema from header? | true/false | false |
| gimel.s3.object.location | Y | Object Location on S3 | s3a://gimeltestbucket/flights/airports.csv | |
| gimel.s3.credentials.strategy | Y | Credentials strategy | file/user/credentialLess | file |
| gimel.s3.credentials.file.source | Y | This is a Must if gimel.s3.credentials.strategy=file
| local/hdfs | |
| gimel.s3.credentials.file.path | Y | This is a Must if gimel.s3.credentials.strategy=file
| /path/xxxx/s3_credentials | |
| gimel.s3.save.mode | N | Write mode for S3 (https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#save-modes) | append/overwrite/error/ignore | error |
——————————————————————————————————————–
Password Options
- To access an S3 bucket, Access ID and Secret Key are required
- These credentials can be given using either local file sytem or HDFS file system
- For local file system, we need to put the credentials in a file and mention the file as below options
-
This will be useful in default or yarn client mode
“gimel.s3.credentials.strategy” -> “file” “gimel.s3.credentials.file.source” -> “local” “gimel.s3.credentials.file.path” -> “/path/xxxx/mycredentials.txt”
- for hdfs file system, we need to put the password in a file and mention the file as below options.
-
This will be useful in yarn cluster mode
“gimel.s3.credentials.strategy” -> “file” “gimel.s3.credentials.file.source” -> “hdfs” “gimel.s3.credentials.file.path” -> “hdfs://cluster/xxxx/mycredentials.txt”
- The credentials file must not be accessible by any other user (Set permission to 700)
- Put the Access ID and Secret Key in credentials file in the following format
ACCESS-ID SECRET-KEY
S3 GIMEL Read API for CSV
- The following example says how to read data from S3 by giving the credentials using local file system as source
Common Imports and initializations
val dataSet = new com.paypal.gimel.DataSet(spark)
val options = Map("gimel.s3.object.format" -> "csv",
"gimel.s3.credentials.strategy" -> "file",
"gimel.s3.credentials.file.source" -> "local",
"gimel.s3.credentials.file.path" -> "/path/xxxx/s3_credentials",
"gimel.s3.file.header" -> "true",
"gimel.s3.file.inferSchema" -> "true")
val dfRead = dataSet.read("udc.S3.Test.gimeltestbucket.flights_airports_csv", options)
dfRead.show
S3 GIMEL Write API CSV
- The following example says how to write data to S3 by giving the credentials using HDFS file as source
val options = Map("gimel.s3.object.format" -> "csv",
"gimel.s3.credentials.strategy" -> "file",
"gimel.s3.credentials.file.source" -> "hdfs",
"gimel.s3.credentials.file.path" -> "hdfs://hadoopcluster/user/xxxx/s3_credentials",
"gimel.s3.file.header" -> "true",
"gimel.s3.file.inferSchema" -> "true")
val dfWrite = dataSet.write("udc.S3.Test.gimeltestbucket.test_kafka_to_s3", options)
S3 GIMEL GSQL
Common Imports and initializations
import org.apache.spark.sql.{Column, Row, SparkSession,DataFrame}
import org.apache.spark.sql.functions._
// Create Gimel SQL reference
val gsql: (String) => DataFrame = com.paypal.gimel.sql.GimelQueryProcessor.executeBatch(_: String, spark)
//Set UDC parameters for testing
gsql("set rest.service.method=https")
gsql("set rest.service.host=udc-rest-api-host")
gsql("set rest.service.port=443")
gsql("set gimel.catalog.provider=UDC")
gsql("set gimel.logging.level=CONSOLE")
CSV file Read
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.flights_airports_csv")
JSON file Read
gsql("set gimel.s3.object.format=json")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_json")
CSV file Read with different delimeter
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.file.delimiter=|")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.test_delimiter_csv")
Parquet file Read
gsql("set gimel.s3.object.format=parquet")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxx/s3_credentials")
val df = gsql("select * from udc.S3.Test.gimeltestbucket.userdata_parquet")
Kafka to S3
gsql("set gimel.s3.object.format=csv")
gsql("set gimel.s3.credentials.strategy=file")
gsql("set gimel.s3.credentials.file.source=local")
gsql("set gimel.s3.credentials.file.path=/path/xxxxx/s3_credentials")
gsql("set gimel.s3.file.header=true")
gsql("set gimel.s3.file.inferSchema=true")
gsql("set gimel.kafka.throttle.batch.fetchRowsOnFirstRun=1000")
gsql("set gimel.s3.save.mode=overwrite")
val df = gsql("insert into udc.S3.Test.gimeltestbucket.flights_airports_csv
select * from udc.Kafka.Gimel_Dev.test.flights_airports")
Limitations
- Presently we dont support Binary file type.