Cassandra API
Note
- Experimental API, meaning there is no production usecase on this yet.
Design Considerations
- Under the hood - we are leveraging the DataStax spark connector for Cassandra.
Create Cassandra Table
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE test_table_1 (
name text,
age int,
rev int,
PRIMARY KEY (name)
);
insert into test_table_1 (name,age,rev) values ( 'deepak',100,9999);
Create Hive Table pointing to Cassandra table
The following hive table points to a Cassandra
create table pcatalog.cassandra_testing (
payload string
)
location 'hdfs:///tmp/cassandra_testing'
TBLPROPERTIES (
'cassandra.connection.host'='hostname',
'gimel.storage.type'='CASSANDRA',
'gimel.cassandra.cluster.name'='POC_Cluster',
'gimel.cassandra.keyspace.name'='test',
'gimel.cassandra.table.name'='test_table_1',
'gimel.cassandra.pushdown.is.enabled'='true',
'gimel.cassandra.table.confirm.truncate'='false',
'spark.cassandra.input.split.size_in_mb'='128'
)
Catalog Properties
Refer DataStax Cassandra Connector FAQ for indepth details.
Property | Mandatory? | Description | Example | Default |
---|---|---|---|---|
spark.cassandra.connection.host | Y | Host fqdn or IP | localhost | |
gimel.cassandra.cluster.name | Y | Name of cluster | poc | |
gimel.cassandra.keyspace.name | Y | Key Space in Cassandra | test | |
gimel.cassandra.table.name | Y | Cassandra Table Name | test_table_1 | |
gimel.cassandra.pushdown.is.enabled | N | Setting to enable pushdown | true | true |
spark.cassandra.input.split.size_in_mb | N | The number of Spark partitions(tasks) created is directly controlled by the setting spark.cassandra.input.split.size_in_mb. This number reflects the approximate amount of Cassandra Data in any given Spark partition |
128 | 48 |
Common Imports in all Cassandra API Usages
import org.apache.spark.sql._;
import com.paypal.gimel.logger.Logger;
import com.paypal.gimel._;
Cassandra API Usage
val dataSet= DataSet(sparkSession)
sparkSession.setConf("spark.cassandra.connection.host", "hostname_or_ip")
sparkSession.setConf("spark.cleaner.ttl", "3600")
val cassandraDfOptions= scala.collection.immutable.Map(
"keyspace" -> "test", "table" -> "test_table_1"
)
val fromCassandra1 = dataSet.read("pcatalog.cassandra_testing")
val fromCassandra2 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(cassandraDfOptions).load()
fromCassandra2.show