Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

5/24/2018

Reading time:19 mins

Search and Analytics on Streaming Data With Kafka, Solr, Cassandra, Spark

by John Doe

In this blog post we will see how to setup a simple search and anlytics pipeline on streaming data in scala.For sample timeseries data, we will use twitter stream.For data pipelining, we will use kafkaFor search, we will use Solr. We will use Banana for a UI query interface for solr data.For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.Full code for this post is avaliable at https://github.com/saumitras/twitter-analysisDependenciesCreate a new project and add following dependecies in build.sbt. Note that there are few conflicting dependecies in kafka so exclude them:12345678910111213141516libraryDependencies ++= Seq( "org.twitter4j" % "twitter4j-core" % "4.0.4", "org.twitter4j" % "twitter4j-stream" % "4.0.4", "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17", "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"), "org.apache.avro" % "avro" % "1.7.7" withSources(), "org.apache.solr" % "solr-solrj" % "6.4.1" withSources(), "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0", "ch.qos.logback" % "logback-classic" % "1.1.2", "com.datastax.cassandra" % "cassandra-driver-core" % "3.0.2", "org.apache.cassandra" % "cassandra-clientutil" % "3.0.2", "org.apache.spark" %% "spark-core" % "2.1.0", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-hive" % "2.1.0", "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0")Setting up twiiter streamFor streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on “Keys and access token” and copy following:Consumer Key (API Key)Consumer Secret (API Secret)Access TokenAccess Token SecretWe will use twitter4j. Build a configuration using token and key123456val cb = new ConfigurationBuilder()cb.setDebugEnabled(true)cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk") //replace this with your own keyscb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keyscb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn") //replace this with your own keyscb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU") //replace this with your own keysYou can now open a stream and listen for tweets with some specific keyswords or hashtags:12345678910111213141516171819202122232425262728val stream = new TwitterStreamFactory(cb.build()).getInstance()val listener = new StatusListener { override def onTrackLimitationNotice(i: Int): Unit = logger.warn(s"Track limited $i tweets") override def onStallWarning(stallWarning: StallWarning): Unit = logger.error("Stream stalled") override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = logger.warn("Status ${statusDeletionNotice.getStatusId} deleted") override def onScrubGeo(l: Long, l1: Long): Unit = logger.warn(s"Geo info scrubbed. userId:$l, upToStatusId:$l1") override def onException(e: Exception): Unit = logger.error("Exception occurred. " + e.getMessage) override def onStatus(status: Status): Unit = { logger.info("Msg: " + status.getText) }}val keywords = List("#scala", "#kafka", "#cassandra", "#solr", "#bigdata", "#apachespark", "#streamingdata")stream.addListener(listener)val fq = new FilterQuery()fq.track(keywords.mkString(","))stream.filter(fq)StatusListener provide couple of callback to handle different scenarios. onStatus is the one which will get the tweet and its metadata. stream.filter(fq) will start the stream.If you run this, you should start seeing the tweets:123456Msg: RT @botbigdata: How to build a data science team https://t.co/xJWWgueGAV #bigdataMsg: RT @ATEKAssetScan: Why Velocity Of Innovation Is A Data Friction Problem https://t.co/Eo1pTNCEv9 #BigData #IoT #IIoT #InternetOfThings #Art…Msg: Making the Most of Big Data https://t.co/X52AZ5n5nT #BigDataMsg: RT @botbigdata: Create editable Microsoft Office charts from R https://t.co/LnSDU0iSMq #bigdataMsg: RT @YarmolukDan: How #Twitter Users Can Generate Better Ideas https://t.co/b0O9iEULHG #DataScience #DataScientist #BigData #IoT… Msg: RT @botbigdata: VIDEO: Installing TOR on an Ubuntu Virtual Machine https://t.co/Q3FPhY8CGm #bigdataLets define a type and extract out tweet metadata12345case class Tweet(id:String, username:String, userId:Long, userScreenName:String, userDesc:String, userProfileImgUrl:String, favCount:Long, retweetCount:Long, lang:String, place:String, message:String, isSensitive:Boolean, isTruncated:Boolean, isFavorited:Boolean, isRetweeted:Boolean, isRetweet:Boolean, createdAt:Long)123456789101112131415161718192021222324252627282930override def onStatus(status: Status): Unit = { val retweetCount = if(status.getRetweetedStatus == null) 0 else status.getRetweetedStatus.getRetweetCount val userDesc = if(status.getUser.getDescription == null) "null" else status.getUser.getDescription val userProfileImgUrl = if(status.getUser.getProfileImageURL == null) "null" else status.getUser.getProfileImageURL val lang = if(status.getLang == null) "null" else status.getLang val place = if(status.getPlace == null) "null" else status.getPlace.getFullName val tweet = Tweet( id = status.getId.toString, username = status.getUser.getName, userId = status.getUser.getId, userScreenName = status.getUser.getScreenName, userDesc = userDesc, userProfileImgUrl = userProfileImgUrl, createdAt = status.getCreatedAt.getTime, favCount = status.getFavoriteCount, retweetCount = retweetCount, lang = lang, place = place, message = status.getText, isSensitive = status.isPossiblySensitive, isTruncated = status.isTruncated, isFavorited = status.isFavorited, isRetweeted = status.isRetweeted, isRetweet = status.isRetweet ) logger.info("Msg: " + tweet.message) }Next we will send these tweets to kafka.Zookeeper setupZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.Download it from http://apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gzExtract it and go inside conf directoryMake a copy of zoo_sample.conf as zoo.cfgRun it using bin/zkServer.sh startVerify its started successfully by running bin/zkServer.sh status command.Putting data in KafkaHere’s steps to send data to kafka.Start kafka server and broker(s)Create a topic in kafka to which data will be sendDefine a avro schema for the tweetsCreate a kafka producer which will serialize tweets using avro schema and send it to kafkaDownload kafka from here.Start server1bin/kafka-server-start.sh config/server.propertiesCreate a topic1bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic tweet1You can see if topic is created successfully1bin/kafka-topics.sh --list --zookeeper localhost:2181/kafkaAvro schemaAvro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our Tweet type:123456789101112131415161718192021222324{ "type": "record", "namespace": "tweet", "name": "tweet", "fields":[ { "name": "id", "type":"string" }, { "name": "username", "type":"string" }, { "name": "userId", "type":"long" }, { "name": "userScreenName", "type":"string" }, { "name": "userDesc", "type":"string" }, { "name": "userProfileImgUrl", "type":"string" }, { "name": "favCount", "type":"int" }, { "name": "retweetCount", "type":"int" }, { "name": "lang", "type":"string" }, { "name": "place", "type":"string" }, { "name": "message", "type":"string" }, { "name": "isSensitive", "type":"boolean" }, { "name": "isTruncated", "type":"boolean" }, { "name": "isFavorited", "type":"boolean" }, { "name": "isRetweeted", "type":"boolean" }, { "name": "isRetweet", "type":"boolean" }, { "name": "createdAt", "type":"long" } ]}Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here https://www.confluent.io/blog/avro-kafka-data/Next create a producer1234567val props = new Properties()props.put("bootstrap.servers", brokerList)props.put("client.id", "KafkaTweetProducer")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")val producer = new KafkaProducer[String, Array[Byte]](props)Creata a Schema using the avro schema definition1val schema = new Parser().parse(Source.fromURL(getClass.getResource("/tweet.avsc")).mkString)Serialize the tweet and send it to producer123456789101112131415161718192021222324252627282930313233def writeToKafka(tweet: Tweet) = { val row = new GenericData.Record(schema) row.put("id", tweet.id) row.put("username", tweet.username) row.put("userId", tweet.userId) row.put("userScreenName", tweet.userScreenName) row.put("userDesc", tweet.userDesc) row.put("userProfileImgUrl", tweet.userProfileImgUrl) row.put("favCount", tweet.favCount) row.put("retweetCount", tweet.retweetCount) row.put("lang", tweet.lang) row.put("place", tweet.place) row.put("message", tweet.message) row.put("isSensitive", tweet.isSensitive) row.put("isTruncated", tweet.isTruncated) row.put("isFavorited", tweet.isFavorited) row.put("isRetweeted", tweet.isRetweeted) row.put("isRetweet", tweet.isRetweet) row.put("createdAt", tweet.createdAt) val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(row, encoder) encoder.flush() logger.info("Pushing to kafka. TweetId= " + tweet.id) val data = new ProducerRecord[String, Array[Byte]](topic, out.toByteArray) producer.send(data)}We will create an ActorSystem and put all this inside a KafkaTweetProducer actor. We will then send a message to KafkaTweetProducer whenever a new tweet is recieved.12345678910val zkHostKafka = "localhost:2181/kafka"val kafkaBrokers = "localhost:9092"val topic = "tweet1"val system = ActorSystem("TwitterAnalysis")val kafkaProducer = system.actorOf(Props(new KafkaTweetProducer(kafkaBrokers, topic)), name = "kafka_tweet_producer")val twitterStream = new TwitterWatcher(cb, topics, kafkaProducer)twitterStream.startTracking()1234567891011121314class TwitterWatcher(cb:ConfigurationBuilder, keywords:List[String], destination:ActorRef) extends Logging { override def onStatus(status: Status): Unit = { ... val tweet = Tweet( ... ) destination ! tweet }}123456789class KafkaTweetProducer(brokerList:String, topic:String) extends Actor with Logging { override def receive: Receive = { case t:Tweet => writeToKafka(t) ... }}To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic tweet1:1bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweet1 --from-beginningNext we will consume this data in solr and cassandraPutting data in solrHere’s steps for writing data to solr:Define a solr schema(config-set) corresponding to tweet typeUpload the schmea to zookeeperCreata a collection in solr using this config setCreate a solr consumer which will read from tweet1 topic from kafkaDeserialize the data read from kafka and create solr documents from itSend documents to solrHere’s what the shema definition will look like:1234567891011121314151617 <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="username" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userId" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userScreenName" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userDesc" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userProfileImgUrl" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="favCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="retweetCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="lang" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="place" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="message" type="text_en" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isSensitive" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isTruncated" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isFavorited" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isRetweeted" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isRetweet" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="createdAt" type="tdate" indexed="true" stored="true" required="true" multiValued="false" />Upload the configset to solr and create a collection:1./server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir tweet-schema -confname tweet-schemaCreate the collection1http://localhost:8983/solr/admin/collections?action=create&name=tweet&collection.configName=tweet-schema&numShards=1Next create a SolrWriter actor which will recieve a Tweet message from a KafkaSolrComsumer (which we will define next), convert it to SolrInputDocument and send it to solr1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950class SolrWriter(zkHost: String, collection: String, commitAfterBatch: Boolean) extends Actor with Logging { val client = new CloudSolrClient.Builder().withZkHost(zkHost).build() client.setDefaultCollection(collection) ... var batch = List[SolrInputDocument]() val MAX_BATCH_SIZE = 100 override def receive: Receive = { case doc: Tweet => val solrDoc = new SolrInputDocument() solrDoc.setField("id", doc.id) solrDoc.setField("username", doc.username) ... batch = solrDoc :: batch if (batch.size > MAX_BATCH_SIZE) indexBatch() case FlushBuffer => indexBatch() case _ => logger.warn("Unknown message") } def indexBatch(): Boolean = { try { logger.info("Flushing batch") client.add(batch.asJavaCollection) batch = List[SolrInputDocument]() if (commitAfterBatch) client.commit() true } catch { case ex: Exception => logger.error(s"Failed to indexing solr batch. Exception is " + ex.getMessage) ex.printStackTrace() batch = List[SolrInputDocument]() false } } ...}Now we need to define a kafka consumer which will read data from solr and send it to SolrWriterKafka ConsumerConsumer will read data from kafka, deserialize it using avro schema, and convert it to Tweet type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.123456789class KafkaTweetConsumer(zkHost:String, groupId:String, topic:String, destination:ActorRef) extends Actor with Logging { ... def read() = try { ... destination ! tweet //destination will be either solr or cassandra ... }}Create consumer and avro schema object1234567891011121314private val props = new Properties()props.put("group.id", groupId)props.put("zookeeper.connect", zkHost)props.put("auto.offset.reset", "smallest")props.put("consumer.timeout.ms", "120000")props.put("auto.commit.interval.ms", "10000")private val consumerConfig = new ConsumerConfig(props)private val consumerConnector = Consumer.create(consumerConfig)private val filterSpec = new Whitelist(topic)val schemaString = Source.fromURL(getClass.getResource("/tweet.avsc")).mkStringval schema = new Schema.Parser().parse(schemaString)Convert binary data to Tweet type using avro12345678910111213private def getTweet(message: Array[Byte]): Tweet = { val reader = new SpecificDatumReader[GenericRecord](schema) val decoder = DecoderFactory.get().binaryDecoder(message, null) val record = reader.read(null, decoder) val tweet = Tweet( id = record.get("id").toString, username = record.get("username").toString, ... ) tweet }Start consuming from kafka and send messages to destination, Solr in this specific case.123456789val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,new DefaultDecoder(), new DefaultDecoder())(0)lazy val iterator = streams.iterator()while (iterator.hasNext()) { val tweet = getTweet(iterator.next().message()) //logger.info("Consuming tweet: " + tweet.id) destination ! tweet}You shoud now start seeing data in solr:1http://localhost:8983/solr/tweet/select?q=*:*&wt=json&rows=11234567891011121314151617181920212223242526272829303132333435363738{ "responseHeader":{ "zkConnected":true, "status":0, "QTime":1, "params":{ "q":"*:*", "rows":"1", "wt":"json" } }, "response":{ "numFound":42, "start":0, "docs":[ { "id":"923302396612182016", "username":"Tawanna Kessler", "userId":898322458742337536, "userScreenName":"tawanna_kessler", "userDesc":"null", "userProfileImgUrl":"http://pbs.twimg.com/profile_images/898323854417940484/lke3BSjt_normal.jpg", "favCount":0, "retweetCount":183, "lang":"en", "place":"null", "message":"RT @craigbrownphd: Two upcoming webinars: Two new Microsoft webinars are taking place over the next week that may… https://t.co/SAb9CMmVXY…", "isSensitive":false, "isTruncated":false, "isFavorited":false, "isRetweeted":false, "isRetweet":true, "createdAt":"2017-10-26T03:07:00Z", "_version_":1582267022370144256 } ] }}Querying solr data with bananaBanana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here’s how to set it up:Here’s how to set it up for our tweet data. We will run it in same container as solr:Download banana and put it in solr’s webapp direcory12cd SOLR_HOME/server/solr-webapp/webapp/git clone https://github.com/lucidworks/banana --depth 1To save dashboards and setting, banana expects a collection named banana-int. Lets go ahead and create it. Configset for that collection can be obtained found in banana/resources/banana-int-solr-5.0/.Upload banana config to zookeeper1$SOLR_HOME/server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir banana-int-solr-5.0/conf/ -confname bananaCreate the collection1http://localhost:8983/solr/admin/collections?action=create&name=banana-int&collection.configName=banana&numShards=1Navigate to banana UI at http://localhost:8983/solr/banana/src/index.html and change the collection in settings to point to tweet collection inHere’s what it will look like for our tweets data:Next we will create a cassandra consumer.Putting data in cassandraDownload cassandra from http://archive.apache.org/dist/cassandra/3.0.12/apache-cassandra-3.0.12-bin.tar.gz and uncompress itRun bin/cassandra to start itWe need to first create a keyspace and table for storing tweets12345678910111213141516171819202122232425CREATE KEYSPACE twitter WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };CREATE TABLE twitter.tweet ( topic text, id text, username text, userId text, userScreenName text, userDesc text, userProfileImgUrl text, favCount bigint, retweetCount bigint, lang text, place text, message text, isSensitive boolean, isTruncated boolean, isFavorited boolean, isRetweeted boolean, isRetweet boolean, createdAt timestamp, creationDate timestamp, PRIMARY KEY ((topic, creationDate), username, id))Then we will create a CassWriter actor similar to solr one which will accept a tweet message and write it to cassandra.Connect to cluster.12lazy val cluster = Cluster.builder().addContactPoint(seeds).build()lazy val session = cluster.connect(keyspace)Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:12345lazy val prepStmt = session.prepare(s"INSERT INTO $cf (" + "topic, id, username, userId, userScreenName, userDesc, userProfileImgUrl, favCount," + "retweetCount, lang, place, message, isSensitive, isTruncated, isFavorited, isRetweeted," + "isRetweet, createdAt, creationDate" + ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")Take Tweet, create a BoundStatement by setting values for all fields and write it to cassandra123456789101112131415161718192021222324252627282930313233def writeToCass(t:Tweet) = { try { val boundStmt = prepStmt.bind() .setString("topic", topic) .setString("id",t.id) .setString("username", t.username) .setString("userId", t.userId.toString) .setString("userScreenName",t.userScreenName) .setString("userDesc",t.userDesc) .setString("userProfileImgUrl",t.userProfileImgUrl) .setLong("favCount",t.favCount) .setLong("retweetCount",t.retweetCount) .setString("lang",t.lang) .setString("place",t.place) .setString("message",t.message) .setBool("isSensitive",t.isSensitive) .setBool("isTruncated",t.isTruncated) .setBool("isFavorited",t.isFavorited) .setBool("isRetweeted",t.isRetweeted) .setBool("isRetweet",t.isRetweet) .setTimestamp("createdAt", new Date(t.createdAt)) .setTimestamp("creationDate", new Date(t.createdAt)) session.execute(boundStmt) } catch { case ex: Exception => logger.error("C* insert exception. Message: " + ex.getMessage) }}We will create a new instance of this actor1val cassWriter = system.actorOf(Props(new CassWriter(cassSeeds, cassKeyspace, cassCf, topic)), name = "cass_writer")And then create a new KafkaTweetConsumer whose destination will be this cassWriter actor12val cassConsumer = system.actorOf(Props( new KafkaTweetConsumer(zkHostKafka, "tweet-cass-consumer", topic, cassWriter)), name = "cass_consumer")You should start seeing data in cassandra12345cqlsh> select creationdate, userscreenname, lang, message from twitter.tweet limit 1; creationdate | userscreenname | lang | message--------------------------+----------------+------+-------------------------------------------------------------------------------------------------------------------------------------------- 2017-10-25 21:56:30+0000 | alevergara78 | en | RT @HomesAtMetacoda: data in motion >> Online learning: #MachineLearning’s secret for #bigdata via\n@SASsoftware https://t.co/eGbAumJzEt…Next we will setup spark and use it to query cassandra data.Query cassandra data with sparkWe will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:First thing which we need is a spark context12345678910val CASS_SEEDS = "127.0.0.1"val SPARK_MASTER = "spark://sam-ub:7077"val conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASS_SEEDS) .setJars(Seq("lib/spark-cassandra-connector-assembly-2.0.0.jar")) .setMaster(SPARK_MASTER) .setAppName("cass_query")lazy val sc = new SparkContext(conf)Then you can query and apply different aggregrations. This query will be picked up as a spark job and exectuted on you spark cluster:12345678val data = sc.cassandraTable("twitter", "tweets") .select("topic", "creationdate", "retweetcount", "id", "isretweet") .where("topic = 'tweets' and creationdate = '2017-10-25 20:15:05+0000'") .groupBy(_.getLong("retweetcount")) .map(r => (r._1, r._2.size)) .collect()logger.info("Count of rows = " + data)If job is successfull, you will see the result:1Count of rows = 38Visulizing cassandra data with zeppelinZeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.Download the binary from https://zeppelin.apache.org/download.html and uncompress it.Default port used by it is 8080 which conflicts with spark master web ui port, so change the port in conf/zeppelin-site.xml.Create a new notebook and select spark interpreterCreate a view of our tweet table from cassandra12345%spark.sqlcreate temporary view mytweetsusing org.apache.spark.sql.cassandraoptions (keyspace "twitter", table "tweet")We can now run aggregations or other analytics queries on this view:123%spark.sqlselect lang, count(*) as occur from mytweets where lang != 'und' group by lang order by occur desc limit 10Here’s what output of above query will look like:ConclusionI hope you got the idea of how to get started with creating a search and analytics pipeline.

Illustration Image

In this blog post we will see how to setup a simple search and anlytics pipeline on streaming data in scala.

  • For sample timeseries data, we will use twitter stream.
  • For data pipelining, we will use kafka
  • For search, we will use Solr. We will use Banana for a UI query interface for solr data.
  • For analytics, we will store data in cassandra. We will see example of using spark for running analytics query. We will use zeppelin for a UI query interface.

Full code for this post is avaliable at https://github.com/saumitras/twitter-analysis

Dependencies

Create a new project and add following dependecies in build.sbt. Note that there are few conflicting dependecies in kafka so exclude them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
libraryDependencies ++= Seq(
  "org.twitter4j" % "twitter4j-core" % "4.0.4",
  "org.twitter4j" % "twitter4j-stream" % "4.0.4",
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.17",
  "org.apache.kafka" % "kafka_2.11" % "0.10.0.0" withSources() exclude("org.slf4j","slf4j-log4j12") exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"),
  "org.apache.avro" % "avro" % "1.7.7" withSources(),
  "org.apache.solr" % "solr-solrj" % "6.4.1" withSources(),
  "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
  "ch.qos.logback" % "logback-classic" % "1.1.2",
  "com.datastax.cassandra" % "cassandra-driver-core"  % "3.0.2",
  "org.apache.cassandra" % "cassandra-clientutil"  % "3.0.2",
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-sql" % "2.1.0",
  "org.apache.spark" %% "spark-hive" % "2.1.0",
  "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.0"
)

Setting up twiiter stream

For streaming data from twitter you need access keys and token. You can go to https://apps.twitter.com and creata a new app to get these. After creating an app, click on “Keys and access token” and copy following:

  • Consumer Key (API Key)
  • Consumer Secret (API Secret)
  • Access Token
  • Access Token Secret

We will use twitter4j. Build a configuration using token and key

1 2 3 4 5 6 val cb = new ConfigurationBuilder() cb.setDebugEnabled(true) cb.setOAuthConsumerKey("p5vABCjRWWSXNBkypnb8ZnSzk") //replace this with your own keys cb.setOAuthConsumerSecret("wCVFIpwWxEyOcM9lrHa9TYExbNsLGvEUgJucePPjcTx83bD1Gt") //replace this with your own keys cb.setOAuthAccessToken("487652626-kDOFZLu8bDjFyCKUOCDa7FtHsr22WC3PMH4iuNtn") //replace this with your own keys cb.setOAuthAccessTokenSecret("4W3LaQTAgGoW5SsHUAgp6gK9b5AKgl8hRcFnNYgvPTylU") //replace this with your own keys

You can now open a stream and listen for tweets with some specific keyswords or hashtags:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 val stream = new TwitterStreamFactory(cb.build()).getInstance() val listener = new StatusListener { override def onTrackLimitationNotice(i: Int): Unit = logger.warn(s"Track limited $i tweets") override def onStallWarning(stallWarning: StallWarning): Unit = logger.error("Stream stalled") override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = logger.warn("Status ${statusDeletionNotice.getStatusId} deleted") override def onScrubGeo(l: Long, l1: Long): Unit = logger.warn(s"Geo info scrubbed. userId:$l, upToStatusId:$l1") override def onException(e: Exception): Unit = logger.error("Exception occurred. " + e.getMessage) override def onStatus(status: Status): Unit = { logger.info("Msg: " + status.getText) } } val keywords = List("#scala", "#kafka", "#cassandra", "#solr", "#bigdata", "#apachespark", "#streamingdata") stream.addListener(listener) val fq = new FilterQuery() fq.track(keywords.mkString(",")) stream.filter(fq)

StatusListener provide couple of callback to handle different scenarios. onStatus is the one which will get the tweet and its metadata. stream.filter(fq) will start the stream.

If you run this, you should start seeing the tweets:

1 2 3 4 5 6 Msg: RT @botbigdata: How to build a data science team https://t.co/xJWWgueGAV #bigdata Msg: RT @ATEKAssetScan: Why Velocity Of Innovation Is A Data Friction Problem https://t.co/Eo1pTNCEv9 #BigData #IoT #IIoT #InternetOfThings #Art… Msg: Making the Most of Big Data https://t.co/X52AZ5n5nT #BigData Msg: RT @botbigdata: Create editable Microsoft Office charts from R https://t.co/LnSDU0iSMq #bigdata Msg: RT @YarmolukDan: How #Twitter Users Can Generate Better Ideas https://t.co/b0O9iEULHG #DataScience #DataScientist #BigData #IoT… Msg: RT @botbigdata: VIDEO: Installing TOR on an Ubuntu Virtual Machine https://t.co/Q3FPhY8CGm #bigdata

Lets define a type and extract out tweet metadata

1 2 3 4 5 case class Tweet(id:String, username:String, userId:Long, userScreenName:String, userDesc:String, userProfileImgUrl:String, favCount:Long, retweetCount:Long, lang:String, place:String, message:String, isSensitive:Boolean, isTruncated:Boolean, isFavorited:Boolean, isRetweeted:Boolean, isRetweet:Boolean, createdAt:Long)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 override def onStatus(status: Status): Unit = { val retweetCount = if(status.getRetweetedStatus == null) 0 else status.getRetweetedStatus.getRetweetCount val userDesc = if(status.getUser.getDescription == null) "null" else status.getUser.getDescription val userProfileImgUrl = if(status.getUser.getProfileImageURL == null) "null" else status.getUser.getProfileImageURL val lang = if(status.getLang == null) "null" else status.getLang val place = if(status.getPlace == null) "null" else status.getPlace.getFullName val tweet = Tweet( id = status.getId.toString, username = status.getUser.getName, userId = status.getUser.getId, userScreenName = status.getUser.getScreenName, userDesc = userDesc, userProfileImgUrl = userProfileImgUrl, createdAt = status.getCreatedAt.getTime, favCount = status.getFavoriteCount, retweetCount = retweetCount, lang = lang, place = place, message = status.getText, isSensitive = status.isPossiblySensitive, isTruncated = status.isTruncated, isFavorited = status.isFavorited, isRetweeted = status.isRetweeted, isRetweet = status.isRetweet ) logger.info("Msg: " + tweet.message) }

Next we will send these tweets to kafka.

Zookeeper setup

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. In our example, both Kafka and Solr will need zookeeper for their state and config management, so you need to first start zookeeper.

  • Download it from http://apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
  • Extract it and go inside conf directory
  • Make a copy of zoo_sample.conf as zoo.cfg
  • Run it using bin/zkServer.sh start
  • Verify its started successfully by running bin/zkServer.sh status command.

Putting data in Kafka

Here’s steps to send data to kafka.

  • Start kafka server and broker(s)
  • Create a topic in kafka to which data will be send
  • Define a avro schema for the tweets
  • Create a kafka producer which will serialize tweets using avro schema and send it to kafka

Download kafka from here.

Start server

1 bin/kafka-server-start.sh config/server.properties

Create a topic

1 bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic tweet1

You can see if topic is created successfully

1 bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka

Avro schema

Avro is a data serialization system. It has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data. Lets define avro schema for our Tweet type:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 { "type": "record", "namespace": "tweet", "name": "tweet", "fields":[ { "name": "id", "type":"string" }, { "name": "username", "type":"string" }, { "name": "userId", "type":"long" }, { "name": "userScreenName", "type":"string" }, { "name": "userDesc", "type":"string" }, { "name": "userProfileImgUrl", "type":"string" }, { "name": "favCount", "type":"int" }, { "name": "retweetCount", "type":"int" }, { "name": "lang", "type":"string" }, { "name": "place", "type":"string" }, { "name": "message", "type":"string" }, { "name": "isSensitive", "type":"boolean" }, { "name": "isTruncated", "type":"boolean" }, { "name": "isFavorited", "type":"boolean" }, { "name": "isRetweeted", "type":"boolean" }, { "name": "isRetweet", "type":"boolean" }, { "name": "createdAt", "type":"long" } ] }

Kafka supports lot of other formats too, but avro is the preferred format for streaming data. You can read more about it here https://www.confluent.io/blog/avro-kafka-data/

Next create a producer

1 2 3 4 5 6 7 val props = new Properties() props.put("bootstrap.servers", brokerList) props.put("client.id", "KafkaTweetProducer") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new KafkaProducer[String, Array[Byte]](props)

Creata a Schema using the avro schema definition

1 val schema = new Parser().parse(Source.fromURL(getClass.getResource("/tweet.avsc")).mkString)

Serialize the tweet and send it to producer

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def writeToKafka(tweet: Tweet) = { val row = new GenericData.Record(schema) row.put("id", tweet.id) row.put("username", tweet.username) row.put("userId", tweet.userId) row.put("userScreenName", tweet.userScreenName) row.put("userDesc", tweet.userDesc) row.put("userProfileImgUrl", tweet.userProfileImgUrl) row.put("favCount", tweet.favCount) row.put("retweetCount", tweet.retweetCount) row.put("lang", tweet.lang) row.put("place", tweet.place) row.put("message", tweet.message) row.put("isSensitive", tweet.isSensitive) row.put("isTruncated", tweet.isTruncated) row.put("isFavorited", tweet.isFavorited) row.put("isRetweeted", tweet.isRetweeted) row.put("isRetweet", tweet.isRetweet) row.put("createdAt", tweet.createdAt) val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(row, encoder) encoder.flush() logger.info("Pushing to kafka. TweetId= " + tweet.id) val data = new ProducerRecord[String, Array[Byte]](topic, out.toByteArray) producer.send(data) }

We will create an ActorSystem and put all this inside a KafkaTweetProducer actor. We will then send a message to KafkaTweetProducer whenever a new tweet is recieved.

1 2 3 4 5 6 7 8 9 10 val zkHostKafka = "localhost:2181/kafka" val kafkaBrokers = "localhost:9092" val topic = "tweet1" val system = ActorSystem("TwitterAnalysis") val kafkaProducer = system.actorOf(Props(new KafkaTweetProducer(kafkaBrokers, topic)), name = "kafka_tweet_producer") val twitterStream = new TwitterWatcher(cb, topics, kafkaProducer) twitterStream.startTracking()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class TwitterWatcher(cb:ConfigurationBuilder, keywords:List[String], destination:ActorRef) extends Logging { override def onStatus(status: Status): Unit = { ... val tweet = Tweet( ... ) destination ! tweet } }
1 2 3 4 5 6 7 8 9 class KafkaTweetProducer(brokerList:String, topic:String) extends Actor with Logging { override def receive: Receive = { case t:Tweet => writeToKafka(t) ... } }

To test whether this data is getting written in kafka properly on not, you can use the command line console consumer and watch for the topic tweet1:

1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tweet1 --from-beginning

Next we will consume this data in solr and cassandra

Putting data in solr

Here’s steps for writing data to solr:

  • Define a solr schema(config-set) corresponding to tweet type
  • Upload the schmea to zookeeper
  • Creata a collection in solr using this config set
  • Create a solr consumer which will read from tweet1 topic from kafka
  • Deserialize the data read from kafka and create solr documents from it
  • Send documents to solr

Here’s what the shema definition will look like:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="username" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userId" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userScreenName" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userDesc" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="userProfileImgUrl" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="favCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="retweetCount" type="tlong" indexed="true" stored="true" required="true" multiValued="false" /> <field name="lang" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="place" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="message" type="text_en" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isSensitive" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isTruncated" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isFavorited" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isRetweeted" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="isRetweet" type="boolean" indexed="true" stored="true" required="true" multiValued="false" /> <field name="createdAt" type="tdate" indexed="true" stored="true" required="true" multiValued="false" />

Upload the configset to solr and create a collection:

1 ./server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir tweet-schema -confname tweet-schema

Create the collection

1 http://localhost:8983/solr/admin/collections?action=create&name=tweet&collection.configName=tweet-schema&numShards=1

Next create a SolrWriter actor which will recieve a Tweet message from a KafkaSolrComsumer (which we will define next), convert it to SolrInputDocument and send it to solr

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class SolrWriter(zkHost: String, collection: String, commitAfterBatch: Boolean) extends Actor with Logging { val client = new CloudSolrClient.Builder().withZkHost(zkHost).build() client.setDefaultCollection(collection) ... var batch = List[SolrInputDocument]() val MAX_BATCH_SIZE = 100 override def receive: Receive = { case doc: Tweet => val solrDoc = new SolrInputDocument() solrDoc.setField("id", doc.id) solrDoc.setField("username", doc.username) ... batch = solrDoc :: batch if (batch.size > MAX_BATCH_SIZE) indexBatch() case FlushBuffer => indexBatch() case _ => logger.warn("Unknown message") } def indexBatch(): Boolean = { try { logger.info("Flushing batch") client.add(batch.asJavaCollection) batch = List[SolrInputDocument]() if (commitAfterBatch) client.commit() true } catch { case ex: Exception => logger.error(s"Failed to indexing solr batch. Exception is " + ex.getMessage) ex.printStackTrace() batch = List[SolrInputDocument]() false } } ... }

Now we need to define a kafka consumer which will read data from solr and send it to SolrWriter

Kafka Consumer

Consumer will read data from kafka, deserialize it using avro schema, and convert it to Tweet type and forward the message to a destination actor. We will keep the consumer generic so that any destination actor(solr or cassandra) can be passed to it.

1 2 3 4 5 6 7 8 9 class KafkaTweetConsumer(zkHost:String, groupId:String, topic:String, destination:ActorRef) extends Actor with Logging { ... def read() = try { ... destination ! tweet //destination will be either solr or cassandra ... } }

Create consumer and avro schema object

1 2 3 4 5 6 7 8 9 10 11 12 13 14 private val props = new Properties() props.put("group.id", groupId) props.put("zookeeper.connect", zkHost) props.put("auto.offset.reset", "smallest") props.put("consumer.timeout.ms", "120000") props.put("auto.commit.interval.ms", "10000") private val consumerConfig = new ConsumerConfig(props) private val consumerConnector = Consumer.create(consumerConfig) private val filterSpec = new Whitelist(topic) val schemaString = Source.fromURL(getClass.getResource("/tweet.avsc")).mkString val schema = new Schema.Parser().parse(schemaString)

Convert binary data to Tweet type using avro

1 2 3 4 5 6 7 8 9 10 11 12 13 private def getTweet(message: Array[Byte]): Tweet = { val reader = new SpecificDatumReader[GenericRecord](schema) val decoder = DecoderFactory.get().binaryDecoder(message, null) val record = reader.read(null, decoder) val tweet = Tweet( id = record.get("id").toString, username = record.get("username").toString, ... ) tweet }

Start consuming from kafka and send messages to destination, Solr in this specific case.

1 2 3 4 5 6 7 8 9 val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1,new DefaultDecoder(), new DefaultDecoder())(0) lazy val iterator = streams.iterator() while (iterator.hasNext()) { val tweet = getTweet(iterator.next().message()) //logger.info("Consuming tweet: " + tweet.id) destination ! tweet }

You shoud now start seeing data in solr:

1 http://localhost:8983/solr/tweet/select?q=*:*&wt=json&rows=1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 { "responseHeader":{ "zkConnected":true, "status":0, "QTime":1, "params":{ "q":"*:*", "rows":"1", "wt":"json" } }, "response":{ "numFound":42, "start":0, "docs":[ { "id":"923302396612182016", "username":"Tawanna Kessler", "userId":898322458742337536, "userScreenName":"tawanna_kessler", "userDesc":"null", "userProfileImgUrl":"http://pbs.twimg.com/profile_images/898323854417940484/lke3BSjt_normal.jpg", "favCount":0, "retweetCount":183, "lang":"en", "place":"null", "message":"RT @craigbrownphd: Two upcoming webinars: Two new Microsoft webinars are taking place over the next week that may… https://t.co/SAb9CMmVXY…", "isSensitive":false, "isTruncated":false, "isFavorited":false, "isRetweeted":false, "isRetweet":true, "createdAt":"2017-10-26T03:07:00Z", "_version_":1582267022370144256 } ] } }

Querying solr data with banana

Banana is a data visualization tool that uses solr for data analysis and display. It can be run in same container as solr. Here’s how to set it up:

Here’s how to set it up for our tweet data. We will run it in same container as solr:

Download banana and put it in solr’s webapp direcory

1 2 cd SOLR_HOME/server/solr-webapp/webapp/ git clone https://github.com/lucidworks/banana --depth 1

To save dashboards and setting, banana expects a collection named banana-int. Lets go ahead and create it. Configset for that collection can be obtained found in banana/resources/banana-int-solr-5.0/.

Upload banana config to zookeeper

1 $SOLR_HOME/server/scripts/cloud-scripts/zkcli.sh -cmd upconfig -zkhost localhost:2181 -confdir banana-int-solr-5.0/conf/ -confname banana

Create the collection

1 http://localhost:8983/solr/admin/collections?action=create&name=banana-int&collection.configName=banana&numShards=1

Navigate to banana UI at http://localhost:8983/solr/banana/src/index.html and change the collection in settings to point to tweet collection in

Here’s what it will look like for our tweets data:

image

Next we will create a cassandra consumer.

Putting data in cassandra

We need to first create a keyspace and table for storing tweets

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 CREATE KEYSPACE twitter WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; CREATE TABLE twitter.tweet ( topic text, id text, username text, userId text, userScreenName text, userDesc text, userProfileImgUrl text, favCount bigint, retweetCount bigint, lang text, place text, message text, isSensitive boolean, isTruncated boolean, isFavorited boolean, isRetweeted boolean, isRetweet boolean, createdAt timestamp, creationDate timestamp, PRIMARY KEY ((topic, creationDate), username, id) )

Then we will create a CassWriter actor similar to solr one which will accept a tweet message and write it to cassandra.

Connect to cluster.

1 2 lazy val cluster = Cluster.builder().addContactPoint(seeds).build() lazy val session = cluster.connect(keyspace)

Since we will be using same query repeatedly to insert data with different parameters, hence we will use prepared statement to improve performance:

1 2 3 4 5 lazy val prepStmt = session.prepare(s"INSERT INTO $cf (" + "topic, id, username, userId, userScreenName, userDesc, userProfileImgUrl, favCount," + "retweetCount, lang, place, message, isSensitive, isTruncated, isFavorited, isRetweeted," + "isRetweet, createdAt, creationDate" + ") values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")

Take Tweet, create a BoundStatement by setting values for all fields and write it to cassandra

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def writeToCass(t:Tweet) = { try { val boundStmt = prepStmt.bind() .setString("topic", topic) .setString("id",t.id) .setString("username", t.username) .setString("userId", t.userId.toString) .setString("userScreenName",t.userScreenName) .setString("userDesc",t.userDesc) .setString("userProfileImgUrl",t.userProfileImgUrl) .setLong("favCount",t.favCount) .setLong("retweetCount",t.retweetCount) .setString("lang",t.lang) .setString("place",t.place) .setString("message",t.message) .setBool("isSensitive",t.isSensitive) .setBool("isTruncated",t.isTruncated) .setBool("isFavorited",t.isFavorited) .setBool("isRetweeted",t.isRetweeted) .setBool("isRetweet",t.isRetweet) .setTimestamp("createdAt", new Date(t.createdAt)) .setTimestamp("creationDate", new Date(t.createdAt)) session.execute(boundStmt) } catch { case ex: Exception => logger.error("C* insert exception. Message: " + ex.getMessage) } }

We will create a new instance of this actor

1 val cassWriter = system.actorOf(Props(new CassWriter(cassSeeds, cassKeyspace, cassCf, topic)), name = "cass_writer")

And then create a new KafkaTweetConsumer whose destination will be this cassWriter actor

1 2 val cassConsumer = system.actorOf(Props( new KafkaTweetConsumer(zkHostKafka, "tweet-cass-consumer", topic, cassWriter)), name = "cass_consumer")

You should start seeing data in cassandra

1 2 3 4 5 cqlsh> select creationdate, userscreenname, lang, message from twitter.tweet limit 1; creationdate | userscreenname | lang | message --------------------------+----------------+------+-------------------------------------------------------------------------------------------------------------------------------------------- 2017-10-25 21:56:30+0000 | alevergara78 | en | RT @HomesAtMetacoda: data in motion >> Online learning: #MachineLearning’s secret for #bigdata via\n@SASsoftware https://t.co/eGbAumJzEt…

Next we will setup spark and use it to query cassandra data.

Query cassandra data with spark

We will use datastax spark cassandra connector https://github.com/datastax/spark-cassandra-connector. Download the correct connection version jar and place it in lib directory of your project:

First thing which we need is a spark context

1 2 3 4 5 6 7 8 9 10 val CASS_SEEDS = "127.0.0.1" val SPARK_MASTER = "spark://sam-ub:7077" val conf = new SparkConf(true) .set("spark.cassandra.connection.host", CASS_SEEDS) .setJars(Seq("lib/spark-cassandra-connector-assembly-2.0.0.jar")) .setMaster(SPARK_MASTER) .setAppName("cass_query") lazy val sc = new SparkContext(conf)

Then you can query and apply different aggregrations. This query will be picked up as a spark job and exectuted on you spark cluster:

1 2 3 4 5 6 7 8 val data = sc.cassandraTable("twitter", "tweets") .select("topic", "creationdate", "retweetcount", "id", "isretweet") .where("topic = 'tweets' and creationdate = '2017-10-25 20:15:05+0000'") .groupBy(_.getLong("retweetcount")) .map(r => (r._1, r._2.size)) .collect() logger.info("Count of rows = " + data)

If job is successfull, you will see the result:

1 Count of rows = 38

Visulizing cassandra data with zeppelin

Zeppelin is a web-based notebook that can be used for interactive data analytics on cassandra data using spark.

Download the binary from https://zeppelin.apache.org/download.html and uncompress it. Default port used by it is 8080 which conflicts with spark master web ui port, so change the port in conf/zeppelin-site.xml.

Create a new notebook and select spark interpreter

Create a view of our tweet table from cassandra

1 2 3 4 5 %spark.sql create temporary view mytweets using org.apache.spark.sql.cassandra options (keyspace "twitter", table "tweet")

We can now run aggregations or other analytics queries on this view:

1 2 3 %spark.sql select lang, count(*) as occur from mytweets where lang != 'und' group by lang order by occur desc limit 10

Here’s what output of above query will look like:

image

Conclusion

I hope you got the idea of how to get started with creating a search and analytics pipeline.

Related Articles

cassandra
event.driven
spark

Build an Event-Driven Architecture with Apache Kafka, Apache Spark, and Apache Cassandra

DataStax

8/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

search