Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

1/30/2019

Reading time:5 min

Aggregate operations on Azure Cosmos DB Cassandra API tables from Spark

by John Doe


 Saradnici
 This article describes basic aggregation operations against Azure Cosmos DB Cassandra API tables from Spark. NoteServer-side filtering, and server-side aggregation is currently not supported in Azure Cosmos DB Cassandra API.Cassandra API configurationimport org.apache.spark.sql.cassandra._//Spark connectorimport com.datastax.spark.connector._import com.datastax.spark.connector.cql.CassandraConnector//CosmosDB library for multiple retryimport com.microsoft.azure.cosmosdb.cassandra//Connection-relatedspark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")spark.conf.set("spark.cassandra.connection.port","10350")spark.conf.set("spark.cassandra.connection.ssl.enabled","true")spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")//Throughput-related...adjust as neededspark.conf.set("spark.cassandra.output.batch.size.rows", "1")spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")spark.conf.set("spark.cassandra.concurrent.reads", "512")spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")Sample data generator// Generate a simple dataset containing five valuesval booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")booksDF.write .mode("append") .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")) .save()Count operationRDD APIsc.cassandraTable("books_ks", "books").countOutput:res48: Long = 5Dataframe APICount against dataframes is currently not supported. The sample below shows how to execute a dataframe count after persisting the dataframe to memory as a workaround.Choose a storage option from the following available options, to avoid running into "out of memory" issues:MEMORY_ONLY: This is the default storage option. Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and they are recomputed on the fly each time they're needed.MEMORY_AND_DISK: Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and whenever required, read them from the location they are stored.MEMORY_ONLY_SER (Java/Scala): Stores RDD as serialized Java objects- one-byte array per partition. This option is space-efficient when compared to deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.MEMORY_AND_DISK_SER (Java/Scala): This storage option is like MEMORY_ONLY_SER, the only difference is that it spills partitions that don't fit in the disk memory instead of recomputing them when they're needed.DISK_ONLY: Stores the RDD partitions on the disk only.MEMORY_ONLY_2, MEMORY_AND_DISK_2…: Same as the levels above, but replicates each partition on two cluster nodes.OFF_HEAP (experimental): Similar to MEMORY_ONLY_SER, but it stores the data in off-heap memory, and it requires off-heap memory to be enabled ahead of time. //Workaroundimport org.apache.spark.storage.StorageLevel//Read from sourceval readBooksDF = spark .read .cassandraFormat("books", "books_ks", "") .load()//Explain planreadBooksDF.explain//Materialize the dataframereadBooksDF.persist(StorageLevel.MEMORY_ONLY)//Subsequent execution against this DF hits the cache readBooksDF.count//Persist as temporary viewreadBooksDF.createOrReplaceTempView("books_vw")SQLselect * from books_vw;select count(*) from books_vw where book_pub_year > 1900;select count(book_id) from books_vw;select book_author, count(*) as count from books_vw group by book_author;select count(*) from books_vw;Average operationRDD APIsc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).meanOutput:res24: Double = 16.016000175476073Dataframe APIspark .read .cassandraFormat("books", "books_ks", "") .load() .select("book_price") .agg(avg("book_price")) .showOutput:+------------------+| avg(book_price)|+------------------+|16.016000175476073|+------------------+SQLselect avg(book_price) from books_vw;Output:16.016000175476073Min operationRDD APIsc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).minOutput:res31: Float = 11.33Dataframe APIspark .read .cassandraFormat("books", "books_ks", "") .load() .select("book_id","book_price") .agg(min("book_price")) .showOutput:+---------------+|min(book_price)|+---------------+| 11.33|+---------------+SQLselect min(book_price) from books_vw;Output:11.33Max operationRDD APIsc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).maxDataframe APIspark .read .cassandraFormat("books", "books_ks", "") .load() .select("book_price") .agg(max("book_price")) .showOutput:+---------------+|max(book_price)|+---------------+| 22.45|+---------------+SQLselect max(book_price) from books_vw;Output:22.45Sum operationRDD APIsc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sumOutput:res46: Double = 80.08000087738037Dataframe APIspark .read .cassandraFormat("books", "books_ks", "") .load() .select("book_price") .agg(sum("book_price")) .showOutput:+-----------------+| sum(book_price)|+-----------------+|80.08000087738037|+-----------------+SQLselect sum(book_price) from books_vw;Output:80.08000087738037Top or comparable operationRDD APIval readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)//delivers the first top n items without collecting the rdd to the driver.Output:(CassandraRow{book_name: A sign of four, book_price: 22.45},0)(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1Dataframe APIimport org.apache.spark.sql.functions._val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "books", "keyspace" -> "books_ks")) .load .select("book_name","book_price") .orderBy(desc("book_price")) .limit(3)//Explain planreadBooksDF.explain//TopreadBooksDF.showOutput:== Physical Plan ==TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>+--------------------+----------+| book_name|book_price|+--------------------+----------+| A sign of four| 22.45||The adventures of...| 19.83||The memoirs of Sh...| 14.22|+--------------------+----------+import org.apache.spark.sql.functions._readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]SQLselect book_name,book_price from books_vw order by book_price desc limit 3;Next stepsTo perform table copy operations, see:Table copy operations

Illustration Image

This article describes basic aggregation operations against Azure Cosmos DB Cassandra API tables from Spark.

Note

Server-side filtering, and server-side aggregation is currently not supported in Azure Cosmos DB Cassandra API.

Cassandra API configuration

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
//Connection-related
spark.conf.set("spark.cassandra.connection.host","YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port","10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled","true")
spark.conf.set("spark.cassandra.auth.username","YOUR_ACCOUNT_NAME")
spark.conf.set("spark.cassandra.auth.password","YOUR_ACCOUNT_KEY")
spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
//Throughput-related...adjust as needed
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "1000")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keep_alive_ms", "600000000")

Sample data generator

// Generate a simple dataset containing five values
val booksDF = Seq(
   ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
   ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
   ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
   ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
   ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
  .mode("append")
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
  .save()

Count operation

RDD API

sc.cassandraTable("books_ks", "books").count

Output:

res48: Long = 5

Dataframe API

Count against dataframes is currently not supported. The sample below shows how to execute a dataframe count after persisting the dataframe to memory as a workaround.

Choose a storage option from the following available options, to avoid running into "out of memory" issues:

  • MEMORY_ONLY: This is the default storage option. Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and they are recomputed on the fly each time they're needed.

  • MEMORY_AND_DISK: Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and whenever required, read them from the location they are stored.

  • MEMORY_ONLY_SER (Java/Scala): Stores RDD as serialized Java objects- one-byte array per partition. This option is space-efficient when compared to deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

  • MEMORY_AND_DISK_SER (Java/Scala): This storage option is like MEMORY_ONLY_SER, the only difference is that it spills partitions that don't fit in the disk memory instead of recomputing them when they're needed.

  • DISK_ONLY: Stores the RDD partitions on the disk only.

  • MEMORY_ONLY_2, MEMORY_AND_DISK_2…: Same as the levels above, but replicates each partition on two cluster nodes.

  • OFF_HEAP (experimental): Similar to MEMORY_ONLY_SER, but it stores the data in off-heap memory, and it requires off-heap memory to be enabled ahead of time.

//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache 
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")

SQL

select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;

Average operation

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean

Output:

res24: Double = 16.016000175476073

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(avg("book_price"))
  .show

Output:

+------------------+
|   avg(book_price)|
+------------------+
|16.016000175476073|
+------------------+

SQL

select avg(book_price) from books_vw;

Output:

16.016000175476073

Min operation

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min

Output:

res31: Float = 11.33

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_id","book_price")
  .agg(min("book_price"))
  .show

Output:

+---------------+
|min(book_price)|
+---------------+
|          11.33|
+---------------+

SQL

select min(book_price) from books_vw;

Output:

11.33

Max operation

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(max("book_price"))
  .show

Output:

+---------------+
|max(book_price)|
+---------------+
|          22.45|
+---------------+

SQL

select max(book_price) from books_vw;

Output: 22.45

Sum operation

RDD API

sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum

Output:

res46: Double = 80.08000087738037

Dataframe API

spark
  .read
  .cassandraFormat("books", "books_ks", "")
  .load()
  .select("book_price")
  .agg(sum("book_price"))
  .show

Output:

+-----------------+
|  sum(book_price)|
+-----------------+
|80.08000087738037|
+-----------------+

SQL

select sum(book_price) from books_vw;

Output:

80.08000087738037

Top or comparable operation

RDD API

val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.

Output:

(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1

Dataframe API

import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_price")
  .orderBy(desc("book_price"))
  .limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show

Output:

== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
|           book_name|book_price|
+--------------------+----------+
|      A sign of four|     22.45|
|The adventures of...|     19.83|
|The memoirs of Sh...|     14.22|
+--------------------+----------+
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]

SQL

select book_name,book_price from books_vw order by book_price desc limit 3;

Next steps

To perform table copy operations, see:

Related Articles

sstable
cassandra
spark

Spark and Cassandra’s SSTable loader

Arunkumar

11/1/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra