Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

8/28/2018

Reading time:1 min

Spark SQL Against Cassandra Example - DZone Database

by John Doe

Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL (including data stored in Cassandra!).First thing to do is to create a SQLContext from your SparkContext.  I'm using Java so... (sorry -- I'm still not hip enough for Scala) JavaSparkContext context =new JavaSparkContext(conf);JavaSQLContext sqlContext =new JavaSQLContext(context); Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark: JavaPairRDD<Integer, Product> productsRDD = javaFunctions(context).cassandraTable("test_keyspace", "products", productReader).keyBy(new Function<Product, Integer>() { @Override public Integer call(Product product) throws Exception { return product.getId(); }}); (The example above comes from the spark-on-cassandra-quickstart project, as described in my previous post.) Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines: JavaSchemaRDD schemaRDD = sqlContext.applySchema(productsRDD.values(), Product.class); sqlContext.registerRDDAsTable(schemaRDD, "products"); Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I'll show how to do this dynamically) You can prime the pump with a: System.out.println("Total Records = [" + productsRDD.count() + "]"); The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast: JavaSchemaRDD result = sqlContext.sql("SELECT id from products WHERE price < 0.50");for (Row row : result.collect()){ System.out.println(row);} That's it.  You're off to the SQL races. P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this: Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:'Project ['id] 'Filter ('price < 0.5) NoRelation$

Illustration Image

Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL (including data stored in Cassandra!).

First thing to do is to create a SQLContext from your SparkContext.  I'm using Java so...
(sorry -- I'm still not hip enough for Scala)

JavaSparkContext context =new JavaSparkContext(conf);
JavaSQLContext sqlContext =new JavaSQLContext(context);

Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark:

JavaPairRDD<Integer, Product> productsRDD = 
  javaFunctions(context).cassandraTable("test_keyspace", "products",
    productReader).keyBy(new Function<Product, Integer>() {
  @Override
  public Integer call(Product product) throws Exception {
    return product.getId();
  }
});

(The example above comes from the spark-on-cassandra-quickstart project, as described in my previous post.)

Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines:

JavaSchemaRDD schemaRDD =   sqlContext.applySchema(productsRDD.values(), Product.class);        
sqlContext.registerRDDAsTable(schemaRDD, "products");   

Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I'll show how to do this dynamically)

You can prime the pump with a:

System.out.println("Total Records = [" + productsRDD.count() + "]");

The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:

JavaSchemaRDD result = sqlContext.sql("SELECT id from products WHERE price < 0.50");
for (Row row : result.collect()){
  System.out.println(row);
}

That's it.  You're off to the SQL races.

P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:
'Project ['id]
 'Filter ('price < 0.5)
  NoRelation$

Related Articles

sstable
cassandra
spark

Spark and Cassandra’s SSTable loader

Arunkumar

11/1/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra