Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/2/2020

Reading time:10 min

Spark And Cassandra: 2 Fast, 2 Furious

by Jen Aman

Spark And Cassandra: 2 Fast, 2 Furious SlideShare Explore You Successfully reported this slideshow.Spark And Cassandra: 2 Fast, 2 FuriousUpcoming SlideShareLoading in …5× 7 Comments 9 Likes Statistics Notes Murugan Shanmugam Rajesh Kinnoli Rajesh Lucian Neghina Nader Attia jixuan1989 Show More No DownloadsNo notes for slide 1. Spark and Cassandra:2 Fast, 2 FuriousRussell SpitzerDataStax Inc. 2. Russell, ostensibly a software engineer• Did a Ph.D in bioinformatics at somepoint• Written a great deal of automation andtesting framework code• Now develops for Datastax on theAnalytics Team• Focuses a lot on the 
Datastax OSS Spark Cassandra Connector 3. Datastax Spark Cassandra ConnectorLet Spark Interact with your Cassandra Data!https://github.com/datastax/spark-cassandra-connectorhttp://spark-packages.org/package/datastax/spark-cassandra-connectorhttp://spark-packages.org/package/TargetHolding/pyspark-cassandraCompatible with Spark 1.6 + Cassandra 3.0• Bulk writing to Cassandra• Distributed full table scans• Optimized direct joins with Cassandra• Secondary index pushdown• Connection and prepared statement pools 4. Cassandra is essentially a hybrid between a key-value and a column-oriented(or tabular) database management system. Its data model is a partitioned rowstore with tunable consistency**https://en.wikipedia.org/wiki/Apache_CassandraLet's break that down
1.What is a C* Partition and Row
2.How does C* Place Partitions 5. CQL looks a lot like SQLCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts)) 6. INSERTS look almost IdenticalCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1) 7. Cassandra Data is stored in PartitionsCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1) 8. C* Partitions Store Many RowsCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1ts: 1x: -1 y:0ts: 2x: 0 y: -1ts: 3x: 1 y: 0 9. C* Partitions Store Many RowsCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1ts: 1x: -1 y:0ts: 2x: 0 y: -1ts: 3x: 1 y: 0Partition 10. C* Partitions Store Many RowsCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1ts: 1x: -1 y:0ts: 2x: 0 y: -1ts: 3x: 1 y: 0Row 11. Within a partition there is ordering based on theClustering KeysCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1ts: 1x: -1 y:0ts: 2x: 0 y: -1ts: 3x: 1 y: 0Ordered by Clustering Key 12. Slices within a Partition are Very EasyCREATE TABLE tracker (vehicle_id uuid,ts timestamp,x double,y double,PRIMARY KEY (vehicle_id, ts))vehicle_id1ts: 0x: 0 y: 1ts: 1x: -1 y:0ts: 2x: 0 y: -1ts: 3x: 1 y: 0Ordered by Clustering Key 13. Cassandra is a Distributed Fault-Tolerant DatabaseSan JoseOaklandSan Francisco 14. Data is located on a Token RangeSan JoseOaklandSan Francisco 15. Data is located on a Token Range0San JoseOaklandSan Francisco1200600 16. The Partition Key of a Row is Hashed to Determineit's Token0San JoseOaklandSan Francisco1200600 17. The Partition Key of a Row is Hashed to Determineit's Token01200600San JoseOaklandSan Franciscovehicle_id1INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1) 18. The Partition Key of a Row is Hashed to Determineit's Token01200600San JoseOaklandSan Franciscovehicle_id1INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)Hash(1) = 1000 19. How The Spark Cassandra Connector Reads0500 20. How The Spark Cassandra Connector Reads0San Jose Oakland San Francisco500 21. Spark Partitions are Built From the Token Range0San Jose Oakland San Francisco500500-600600-700500 1200 22. Each Partition Can be Drawn Locally from atLeast One Node0San Jose Oakland San Francisco500500-600600-700500 1200 23. Each Partition Can be Drawn Locally from atLeast One Node0San Jose Oakland San Francisco500500-600600-700500 1200Spark Executor Spark Executor Spark Executor 24. No Need to Leave the Node For Data!0500 25. Data is Retrieved using the Datastax Java Driver0Spark Executor Cassandra 26. A Connection is Established0Spark Executor Cassandra500-600 27. A Query is Prepared with Token Bounds0Spark Executor CassandraSELECT * FROM TABLE WHEREToken(PK) > StartToken ANDToken(PK) < EndToken500-600 28. The Spark Partitions Bounds are Placed in the Query0Spark Executor CassandraSELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600500-600 29. Paged a number of rows at a Time0Spark Executor Cassandra500-600SELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600 30. Data is Paged0Spark Executor Cassandra500-600SELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600 31. Data is Paged0Spark Executor Cassandra500-600SELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600 32. Data is Paged0Spark Executor Cassandra500-600SELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600 33. Data is Paged0Spark Executor Cassandra500-600SELECT * FROM TABLE WHEREToken(PK) > 500 ANDToken(PK) < 600 34. How we write to Cassandra• Data is written via the Datastax Java Driver• Data is grouped based on Partition Key(configurable)• Batches are written 35. Data is Written using the Datastax Java Driver0Spark Executor Cassandra 36. A Connection is Established0Spark Executor Cassandra 37. Data is Grouped on Key0Spark Executor Cassandra 38. Data is Grouped on Key0Spark Executor Cassandra 39. Once a Batch is Full or There are Too Many Batches
The Largest Batch is Executed0Spark Executor Cassandra 40. Once a Batch is Full or There are Too Many Batches
The Largest Batch is Executed0Spark Executor Cassandra 41. Once a Batch is Full or There are Too Many Batches
The Largest Batch is Executed0Spark Executor Cassandra 42. Once a Batch is Full or There are Too Many Batches
The Largest Batch is Executed0Spark Executor Cassandra 43. Most Common Features• RDD APIs• cassandraTable• saveToCassandra• repartitionByCassandraTable• joinWithCassandraTable• DF API• Datasource 44. Full Table Scans, Making an RDD out of a Tableimport com.datastax.spark.connector._ sc.cassandraTable(KeyspaceName, TableName)import com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName) 45. Pushing Down CQL to Cassandraimport com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName).select("vehicle_id").where("ts > 10")SELECT "vehicle_id" FROM TABLEWHEREToken(PK) > 500 ANDToken(PK) < 600 AND 
ts > 10 46. Distributed Key Retrievalimport com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table")San Jose Oakland San FranciscoSpark Executor Spark Executor Spark ExecutorRDD 47. But our Data isn't Colocatedimport com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table")San Jose Oakland San FranciscoSpark Executor Spark Executor Spark ExecutorRDD 48. RBCR Moves bulk reshuffles our data so Data WillBe Localrdd .repartitionByCassandraReplica("keyspace","table") .joinWithCassandraTable("keyspace", "table")San Jose Oakland San FranciscoSpark Executor Spark Executor Spark ExecutorRDD 49. The Connector Provides a Distributed Pool forPrepared Statements and SessionsCassandraConnector(sc.getConf)rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()}
 }Your Pool Ready to Be Deployed 50. The Connector Provides a Distributed Pool forPrepared Statements and SessionsCassandraConnector(sc.getConf)rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()}
 }Your Pool Ready to Be DeployedPreparedStatement CacheSession Cache 51. The Connector Supports the DataSources ApisqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("keyspace" -> "read_test", "table" -> "simple_kv"))
 .loadimport org.apache.spark.sql.cassandra._ sqlContext
 .read
 .cassandraFormat("read_test","table") .loadhttps://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md 52. The Connector Works with Catalyst to 
Pushdown Predicates to Cassandraimport com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10")SELECT "vehicle_id" FROM TABLEWHEREToken(PK) > 500 ANDToken(PK) < 600 AND 
ts > 10 53. The Connector Works with Catalyst to 
Pushdown Predicates to Cassandraimport com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10")QueryPlan Catalyst PrunedFilteredScanOnly Prunes (projections) andFilters (predicates) are able tobe pushed down. 54. Recent Features• C* 3.0 Support• Materialized Views• SASL Indexes (for pushdown)• Advanced Spark Partitioner Support• Increased Performance on JWCT 55. Use C* Partitioning in Spark 56. Use C* Partitioning in Spark• C* Data is Partitioned• Spark has Partitions and partitioners• Spark can use a known partitioner to speed upCogroups (joins)• How Can we Leverage this? 57. Use C* Partitioning in Sparkhttps://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.mdNow if keyBy is used on a CassandraTableScanRDD and the PartitionKeyis included in the key. The RDD will be given a C* Partitionerval ks = "doc_example" val rdd = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") } rdd.partitioner //res3: Option[org.apache.spark.Partitioner] = Some(com.datastax.spark.connector.rdd.partitioner.CassandraPartitioner@94515d3e) 58. Use C* Partitioning in Sparkhttps://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.mdShare partitioners between Tables for joins on Partition Keyval ks = "doc_example" val rdd1 = { sc.cassandraTable[(Int, Int, String)](ks, "purchases") .select("purchaseid" as "_1", "amount" as "_2", "objectid" as "_3", "userid") .keyBy[Tuple1[Int]]("userid") } val rdd2 = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") }.applyPartitionerFrom(rdd1) // Assigns the partitioner from the first rdd to this one val joinRDD = rdd1.join(rdd2) 59. Use C* Partitioning in Sparkhttps://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.mdMany other uses for this, try it yourself!• All self joins using the Partition key• Groups within C* Partitions• Anything formerly using SpanBy• Joins with other RDDs
And much more! 60. Enhanced Parallelism with JWCT• joinWithCassandraTable now has increasedconcurrency and parallelism!• 5X Speed increases in some cases• https://datastax-oss.atlassian.net/browse/SPARKC-233• Thanks Jaroslaw! 61. The Connector wants you!• OSS Project that loves community involvement• Bug Reports• Feature Requests• Doc Improvements• Come join us!Vin Diesel may or may not be a contributor 62. Tons of Videos at Datastax Academyhttps://academy.datastax.com/https://academy.datastax.com/courses/getting-started-apache-spark 63. Tons of Videos at Datastax Academyhttps://academy.datastax.com/https://academy.datastax.com/courses/getting-started-apache-spark 64. See you at Cassandra Summit!Join me and 3500 of your database peers, and take a deep diveinto Apache Cassandra™, the massively scalable NoSQLdatabase that powers global businesses like Apple, Spotify,Netflix and Sony.San Jose Convention Center
September 7-9, 2016https://cassandrasummit.org/Build Something Disruptive Recommended Spark on MesosJen Aman Re-Architecting Spark For Performance UnderstandabilityJen Aman Livy: A REST Web Service For Apache SparkJen Aman Airstream: Spark Streaming At AirbnbJen Aman Yggdrasil: Faster Decision Trees Using Column Partitioning In SparkJen Aman Time-Evolving Graph Processing On Commodity ClustersJen Aman Building Custom Machine Learning Algorithms With Apache SystemMLJen Aman Low Latency Execution For Apache SparkJen Aman Spark Uber Development KitJen Aman Recent Developments In SparkR For Advanced AnalyticsDatabricks About Blog Terms Privacy Copyright × Public clipboards featuring this slideNo public clipboards found for this slideSelect another clipboard ×Looks like you’ve clipped this slide to already.Create a clipboardYou just clipped your first slide! Clipping is a handy way to collect important slides you want to go back to later. Now customize the name of a clipboard to store your clips. Description Visibility Others can see my Clipboard

Illustration Image
Spark And Cassandra: 2 Fast, 2 Furious

Successfully reported this slideshow.

Spark And Cassandra: 2 Fast, 2 Furious
Spark and Cassandra:
2 Fast, 2 Furious
Russell Spitzer
DataStax Inc.
Russell, ostensibly a software engineer
• Did a Ph.D in bioinformatics at some
point
• Written a great deal of automation ...
Datastax Spark Cassandra Connector
Let Spark Interact with your Cassandra Data!
https://github.com/datastax/spark-cassandr...
Cassandra is essentially a hybrid between a key-value and a column-oriented
(or tabular) database management system. Its d...
CQL looks a lot like SQL
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY KEY (vehicle_id...
INSERTS look almost Identical
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY KEY (vehic...
Cassandra Data is stored in Partitions
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY K...
C* Partitions Store Many Rows
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY KEY (vehic...
C* Partitions Store Many Rows
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY KEY (vehic...
C* Partitions Store Many Rows
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY KEY (vehic...
Within a partition there is ordering based on the
Clustering Keys
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x ...
Slices within a Partition are Very Easy
CREATE TABLE tracker (
vehicle_id uuid,
ts timestamp,
x double,
y double,
PRIMARY ...
Cassandra is a Distributed Fault-Tolerant Database
San Jose
Oakland
San Francisco
Data is located on a Token Range
San Jose
Oakland
San Francisco
Data is located on a Token Range
0
San Jose
Oakland
San Francisco
1200
600
The Partition Key of a Row is Hashed to Determine
it's Token
0
San Jose
Oakland
San Francisco
1200
600
The Partition Key of a Row is Hashed to Determine
it's Token
01200
600
San Jose
Oakland
San Francisco
vehicle_id
1
INSERT ...
The Partition Key of a Row is Hashed to Determine
it's Token
01200
600
San Jose
Oakland
San Francisco
vehicle_id
1
INSERT ...
How The Spark Cassandra Connector Reads
0
500
How The Spark Cassandra Connector Reads
0
San Jose Oakland San Francisco
500
Spark Partitions are Built From the Token Range
0
San Jose Oakland San Francisco
500
500
-
600
600
-
700
500 1200
Each Partition Can be Drawn Locally from at
Least One Node
0
San Jose Oakland San Francisco
500
500
-
600
600
-
700
500 12...
Each Partition Can be Drawn Locally from at
Least One Node
0
San Jose Oakland San Francisco
500
500
-
600
600
-
700
500 12...
No Need to Leave the Node For Data!
0
500
Data is Retrieved using the Datastax Java Driver
0
Spark Executor Cassandra
A Connection is Established
0
Spark Executor Cassandra
500
-
600
A Query is Prepared with Token Bounds
0
Spark Executor Cassandra
SELECT * FROM TABLE WHERE
Token(PK) > StartToken AND
Toke...
The Spark Partitions Bounds are Placed in the Query
0
Spark Executor Cassandra
SELECT * FROM TABLE WHERE
Token(PK) > 500 A...
Paged a number of rows at a Time
0
Spark Executor Cassandra
500
-
600
SELECT * FROM TABLE WHERE
Token(PK) > 500 AND
Token(...
Data is Paged
0
Spark Executor Cassandra
500
-
600
SELECT * FROM TABLE WHERE
Token(PK) > 500 AND
Token(PK) < 600
Data is Paged
0
Spark Executor Cassandra
500
-
600
SELECT * FROM TABLE WHERE
Token(PK) > 500 AND
Token(PK) < 600
Data is Paged
0
Spark Executor Cassandra
500
-
600
SELECT * FROM TABLE WHERE
Token(PK) > 500 AND
Token(PK) < 600
Data is Paged
0
Spark Executor Cassandra
500
-
600
SELECT * FROM TABLE WHERE
Token(PK) > 500 AND
Token(PK) < 600
How we write to Cassandra
• Data is written via the Datastax Java Driver
• Data is grouped based on Partition Key
(configu...
Data is Written using the Datastax Java Driver
0
Spark Executor Cassandra
A Connection is Established
0
Spark Executor Cassandra
Data is Grouped on Key
0
Spark Executor Cassandra
Data is Grouped on Key
0
Spark Executor Cassandra
Once a Batch is Full or There are Too Many Batches

The Largest Batch is Executed
0
Spark Executor Cassandra
Once a Batch is Full or There are Too Many Batches

The Largest Batch is Executed
0
Spark Executor Cassandra
Once a Batch is Full or There are Too Many Batches

The Largest Batch is Executed
0
Spark Executor Cassandra
Once a Batch is Full or There are Too Many Batches

The Largest Batch is Executed
0
Spark Executor Cassandra
Most Common Features
• RDD APIs
• cassandraTable
• saveToCassandra
• repartitionByCassandraTable
• joinWithCassandraTable
...
Full Table Scans, Making an RDD out of a Table
import	com.datastax.spark.connector._	
sc.cassandraTable(KeyspaceName,	Tabl...
Pushing Down CQL to Cassandra
import	com.datastax.spark.connector._	
sc.cassandraTable[MyClass](KeyspaceName,	
TableName)....
Distributed Key Retrieval
import	com.datastax.spark.connector._	
rdd.joinWithCassandraTable("keyspace",	"table")
San Jose ...
But our Data isn't Colocated
import	com.datastax.spark.connector._	
rdd.joinWithCassandraTable("keyspace",	"table")
San Jo...
RBCR Moves bulk reshuffles our data so Data Will
Be Local
rdd	
	.repartitionByCassandraReplica("keyspace","table")	
	.join...
The Connector Provides a Distributed Pool for
Prepared Statements and Sessions
CassandraConnector(sc.getConf)
rdd.mapParti...
The Connector Provides a Distributed Pool for
Prepared Statements and Sessions
CassandraConnector(sc.getConf)
rdd.mapParti...
The Connector Supports the DataSources Api
sqlContext

		.read

		.format("org.apache.spark.sql.cassandra")

		.options(Ma...
The Connector Works with Catalyst to 

Pushdown Predicates to Cassandra
import	com.datastax.spark.connector._	
df.select("...
The Connector Works with Catalyst to 

Pushdown Predicates to Cassandra
import	com.datastax.spark.connector._	
df.select("...
Recent Features
• C* 3.0 Support
• Materialized Views
• SASL Indexes (for pushdown)
• Advanced Spark Partitioner Support
•...
Use C* Partitioning in Spark
Use C* Partitioning in Spark
• C* Data is Partitioned
• Spark has Partitions and partitioners
• Spark can use a known part...
Use C* Partitioning in Spark
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md
Now ...
Use C* Partitioning in Spark
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md
Shar...
Use C* Partitioning in Spark
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md
Many...
Enhanced Parallelism with JWCT
• joinWithCassandraTable now has increased
concurrency and parallelism!
• 5X Speed increase...
The Connector wants you!
• OSS Project that loves community involvement
• Bug Reports
• Feature Requests
• Doc Improvement...
Tons of Videos at Datastax Academy
https://academy.datastax.com/
https://academy.datastax.com/courses/getting-started-apac...
Tons of Videos at Datastax Academy
https://academy.datastax.com/
https://academy.datastax.com/courses/getting-started-apac...
See you at Cassandra Summit!
Join me and 3500 of your database peers, and take a deep dive
into Apache Cassandra™, the mas...

Upcoming SlideShare

Loading in …5

×

  1. 1. Spark and Cassandra: 2 Fast, 2 Furious Russell Spitzer DataStax Inc.
  2. 2. Russell, ostensibly a software engineer • Did a Ph.D in bioinformatics at some point • Written a great deal of automation and testing framework code • Now develops for Datastax on the Analytics Team • Focuses a lot on the 
 Datastax OSS Spark Cassandra Connector
  3. 3. Datastax Spark Cassandra Connector Let Spark Interact with your Cassandra Data! https://github.com/datastax/spark-cassandra-connector http://spark-packages.org/package/datastax/spark-cassandra-connector http://spark-packages.org/package/TargetHolding/pyspark-cassandra Compatible with Spark 1.6 + Cassandra 3.0 • Bulk writing to Cassandra • Distributed full table scans • Optimized direct joins with Cassandra • Secondary index pushdown • Connection and prepared statement pools
  4. 4. Cassandra is essentially a hybrid between a key-value and a column-oriented (or tabular) database management system. Its data model is a partitioned row store with tunable consistency* *https://en.wikipedia.org/wiki/Apache_Cassandra Let's break that down
 1.What is a C* Partition and Row
 2.How does C* Place Partitions
  5. 5. CQL looks a lot like SQL CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts))
  6. 6. INSERTS look almost Identical CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
  7. 7. Cassandra Data is stored in Partitions CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
  8. 8. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0
  9. 9. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0Partition
  10. 10. C* Partitions Store Many Rows CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0Row
  11. 11. Within a partition there is ordering based on the Clustering Keys CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0 Ordered by Clustering Key
  12. 12. Slices within a Partition are Very Easy CREATE TABLE tracker ( vehicle_id uuid, ts timestamp, x double, y double, PRIMARY KEY (vehicle_id, ts)) vehicle_id 1 ts: 0 x: 0 y: 1 ts: 1 x: -1 y:0 ts: 2 x: 0 y: -1 ts: 3 x: 1 y: 0 Ordered by Clustering Key
  13. 13. Cassandra is a Distributed Fault-Tolerant Database San Jose Oakland San Francisco
  14. 14. Data is located on a Token Range San Jose Oakland San Francisco
  15. 15. Data is located on a Token Range 0 San Jose Oakland San Francisco 1200 600
  16. 16. The Partition Key of a Row is Hashed to Determine it's Token 0 San Jose Oakland San Francisco 1200 600
  17. 17. The Partition Key of a Row is Hashed to Determine it's Token 01200 600 San Jose Oakland San Francisco vehicle_id 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1)
  18. 18. The Partition Key of a Row is Hashed to Determine it's Token 01200 600 San Jose Oakland San Francisco vehicle_id 1 INSERT INTO tracker (vehicle_id, ts, x, y) Values ( 1, 0, 0, 1) Hash(1) = 1000
  19. 19. How The Spark Cassandra Connector Reads 0 500
  20. 20. How The Spark Cassandra Connector Reads 0 San Jose Oakland San Francisco 500
  21. 21. Spark Partitions are Built From the Token Range 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200
  22. 22. Each Partition Can be Drawn Locally from at Least One Node 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200
  23. 23. Each Partition Can be Drawn Locally from at Least One Node 0 San Jose Oakland San Francisco 500 500 - 600 600 - 700 500 1200 Spark Executor Spark Executor Spark Executor
  24. 24. No Need to Leave the Node For Data! 0 500
  25. 25. Data is Retrieved using the Datastax Java Driver 0 Spark Executor Cassandra
  26. 26. A Connection is Established 0 Spark Executor Cassandra 500 - 600
  27. 27. A Query is Prepared with Token Bounds 0 Spark Executor Cassandra SELECT * FROM TABLE WHERE Token(PK) > StartToken AND Token(PK) < EndToken500 - 600
  28. 28. The Spark Partitions Bounds are Placed in the Query 0 Spark Executor Cassandra SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600500 - 600
  29. 29. Paged a number of rows at a Time 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
  30. 30. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
  31. 31. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
  32. 32. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
  33. 33. Data is Paged 0 Spark Executor Cassandra 500 - 600 SELECT * FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600
  34. 34. How we write to Cassandra • Data is written via the Datastax Java Driver • Data is grouped based on Partition Key (configurable) • Batches are written
  35. 35. Data is Written using the Datastax Java Driver 0 Spark Executor Cassandra
  36. 36. A Connection is Established 0 Spark Executor Cassandra
  37. 37. Data is Grouped on Key 0 Spark Executor Cassandra
  38. 38. Data is Grouped on Key 0 Spark Executor Cassandra
  39. 39. Once a Batch is Full or There are Too Many Batches
 The Largest Batch is Executed 0 Spark Executor Cassandra
  40. 40. Once a Batch is Full or There are Too Many Batches
 The Largest Batch is Executed 0 Spark Executor Cassandra
  41. 41. Once a Batch is Full or There are Too Many Batches
 The Largest Batch is Executed 0 Spark Executor Cassandra
  42. 42. Once a Batch is Full or There are Too Many Batches
 The Largest Batch is Executed 0 Spark Executor Cassandra
  43. 43. Most Common Features • RDD APIs • cassandraTable • saveToCassandra • repartitionByCassandraTable • joinWithCassandraTable • DF API • Datasource
  44. 44. Full Table Scans, Making an RDD out of a Table import com.datastax.spark.connector._ sc.cassandraTable(KeyspaceName, TableName) import com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName)
  45. 45. Pushing Down CQL to Cassandra import com.datastax.spark.connector._ sc.cassandraTable[MyClass](KeyspaceName, TableName).select("vehicle_id").where("ts > 10") SELECT "vehicle_id" FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600 AND 
 ts > 10
  46. 46. Distributed Key Retrieval import com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
  47. 47. But our Data isn't Colocated import com.datastax.spark.connector._ rdd.joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
  48. 48. RBCR Moves bulk reshuffles our data so Data Will Be Local rdd .repartitionByCassandraReplica("keyspace","table") .joinWithCassandraTable("keyspace", "table") San Jose Oakland San Francisco Spark Executor Spark Executor Spark Executor RDD
  49. 49. The Connector Provides a Distributed Pool for Prepared Statements and Sessions CassandraConnector(sc.getConf) rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()}
 } Your Pool Ready to Be Deployed
  50. 50. The Connector Provides a Distributed Pool for Prepared Statements and Sessions CassandraConnector(sc.getConf) rdd.mapPartitions{ it => { val ps = CassandraConnector(sc.getConf) .withSessionDo( s => s.prepare) it.map{ ps.bind(_).executeAsync()}
 } Your Pool Ready to Be Deployed Prepared Statement Cache Session Cache
  51. 51. The Connector Supports the DataSources Api sqlContext
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map("keyspace" -> "read_test", "table" -> "simple_kv"))
 .load import org.apache.spark.sql.cassandra._ sqlContext
 .read
 .cassandraFormat("read_test","table") .load https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md
  52. 52. The Connector Works with Catalyst to 
 Pushdown Predicates to Cassandra import com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10") SELECT "vehicle_id" FROM TABLE WHERE Token(PK) > 500 AND Token(PK) < 600 AND 
 ts > 10
  53. 53. The Connector Works with Catalyst to 
 Pushdown Predicates to Cassandra import com.datastax.spark.connector._ df.select("vehicle_id").filter("ts > 10") QueryPlan Catalyst PrunedFilteredScan Only Prunes (projections) and Filters (predicates) are able to be pushed down.
  54. 54. Recent Features • C* 3.0 Support • Materialized Views • SASL Indexes (for pushdown) • Advanced Spark Partitioner Support • Increased Performance on JWCT
  55. 55. Use C* Partitioning in Spark
  56. 56. Use C* Partitioning in Spark • C* Data is Partitioned • Spark has Partitions and partitioners • Spark can use a known partitioner to speed up Cogroups (joins) • How Can we Leverage this?
  57. 57. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Now if keyBy is used on a CassandraTableScanRDD and the PartitionKey is included in the key. The RDD will be given a C* Partitioner val ks = "doc_example" val rdd = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") } rdd.partitioner //res3: Option[org.apache.spark.Partitioner] = Some(com.datastax.spark.connector.rdd.partitioner.CassandraPartitioner@94515d3e)
  58. 58. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Share partitioners between Tables for joins on Partition Key val ks = "doc_example" val rdd1 = { sc.cassandraTable[(Int, Int, String)](ks, "purchases") .select("purchaseid" as "_1", "amount" as "_2", "objectid" as "_3", "userid") .keyBy[Tuple1[Int]]("userid") } val rdd2 = { sc.cassandraTable[(String, Int)](ks, "users") .select("name" as "_1", "zipcode" as "_2", "userid") .keyBy[Tuple1[Int]]("userid") }.applyPartitionerFrom(rdd1) // Assigns the partitioner from the first rdd to this one val joinRDD = rdd1.join(rdd2)
  59. 59. Use C* Partitioning in Spark https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md Many other uses for this, try it yourself! • All self joins using the Partition key • Groups within C* Partitions • Anything formerly using SpanBy • Joins with other RDDs
 And much more!
  60. 60. Enhanced Parallelism with JWCT • joinWithCassandraTable now has increased concurrency and parallelism! • 5X Speed increases in some cases • https://datastax-oss.atlassian.net/browse/ SPARKC-233 • Thanks Jaroslaw!
  61. 61. The Connector wants you! • OSS Project that loves community involvement • Bug Reports • Feature Requests • Doc Improvements • Come join us! Vin Diesel may or may not be a contributor
  62. 62. Tons of Videos at Datastax Academy https://academy.datastax.com/ https://academy.datastax.com/courses/getting-started-apache-spark
  63. 63. Tons of Videos at Datastax Academy https://academy.datastax.com/ https://academy.datastax.com/courses/getting-started-apache-spark
  64. 64. See you at Cassandra Summit! Join me and 3500 of your database peers, and take a deep dive into Apache Cassandra™, the massively scalable NoSQL database that powers global businesses like Apple, Spotify, Netflix and Sony. San Jose Convention Center
 September 7-9, 2016 https://cassandrasummit.org/ Build Something Disruptive

×

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra