Upcoming talks and demos:

Jupyter Con - New York 23-25 Aug









View Natalino Busa's profile on LinkedIn





Principal Data Scientist, Director for Data Science, AI, Big Data Technologies. O’Reilly author on distributed computing and machine learning.



Natalino leads the definition, design and implementation of data-driven financial and telecom applications. He has previously served as Enterprise Data Architect at ING in the Netherlands, focusing on fraud prevention/detection, SoC, cybersecurity, customer experience, and core banking processes.


​Prior to that, he had worked as senior researcher at Philips Research Laboratories in the Netherlands, on the topics of system-on-a-chip architectures, distributed computing and compilers. All-round Technology Manager, Product Developer, and Innovator with 15+ years track record in research, development and management of distributed architectures, scalable services and data-driven applications.

Thursday, July 9, 2015

Clustering check-ins with Spark and Cassandra

Spark and Cassandra provide a good alternative to traditional big data architectures based on Hadoop hdfs, map-reduce and hive. In this post/tutorial I will show how to combine Cassandra and Spark to get the best of those two technologies and provide faster analytics on an operational low-latency data store.

As a demo, we will cluster check-in transactions in New York using the gowalla dataset. Clustering will be performed on Spark using the mllib library while extracting data from a cassandra keyspace using the Spark-Cassandra connector.

Cassandra

http://cassandra.apache.org/

Apache Cassandra is an open source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

Cassandra's data model is a partitioned row store with tunable consistency. Rows are organized into tables; the first component of a table's primary key is the partition key; within a partition, rows are clustered by the remaining columns of the key. Other columns may be indexed separately from the primary key. Tables may be created, dropped, and altered at runtime without blocking updates and queries.

Cassandra does not currently support joins or subqueries. Rather, Cassandra emphasizes denormalization through features like collections. However as shown in this post, this limitations can be overcome by using Spark and Cassandra together.

Spark

http://spark.apache.org/

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Spark Cassandra Connector


This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

Zeppelin (Apache incubator)


A web-based notebook that enables interactive data analytics. You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.
Zeppelin is really early days, but very promising and a nice alternative to ipython. Currently, it can only be build from sources. Check the master branch on github.

Install and Setup


The full tutorial, including install script, data munging and analysis with scikit-learn in python notebooks and data analysis with Spark zeppeling notebooks are available as a github project. Check out the code at https://github.com/natalinobusa/gowalla-spark-demo

Versions
Matching version is quite important when working with this set of technologies. The demo provided here below is tested on the following versions;
  • cassandra 2.1.7
  • spark 1.3.1
  • spark-cassandra connector 1.3
  • zeppelin (master github)
Requirements
  • jdk 1.7 or higher
  • git
  • npm
  • maven
  • curl
  • wget
Setup
You need to have Cassandra, Spark and Zeppeling running on your system in order to proceed. For those of you who want to give it a try, I have written a very rudimental script which downloads, builds and install all the necessary tools on a local directory (no admin required).

First clone the project from github, then run the install.sh script. Since the script will build Spark and Zeppelin from source, it's gonna take a while. I am planning to test the pre-built Spark when I have some more time.

git clone https://github.com/natalinobusa/gowalla-spark-demo.git gowalla-spark-demo
cd gowalla-spark-demo
./install.sh
Run it!
Again, I have prepared a script start-all.sh, which runs all the various ingredients. A similar script stop-all.sh is available to stop all services. Spark is configured to run in cluster mode (albeit on a single node), a password might be prompted, since the master and the workers of spark communicate via ssh.

Some headstart:

Cassandra can be accessed with the cqlsh command line interface. After installing and setting up the system, type ./apache-cassandra-2.1.7/bin/cqlsh from the root of the git project, to start the cql client.

Data
The venues id and names are taken from the following datasets:

loc-gowalla_totalCheckins.txt.gz | https://snap.stanford.edu/data/loc-gowalla.html
E. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011.

gowalla-dataset.zip | https://code.google.com/p/locrec/downloads/detail?name=gowalla-dataset.zip
The project is being developed in the context of the SInteliGIS project financed by the Portuguese Foundation for Science and Technology (FCT) through project grant PTDC/EIA-EIA/109840/2009.

Generating a sample extract using python


The csv files provided in this demo have been extracted using a python notebook. Some info about those files. The checking file checkins.csv has year, month, day and utc checkin date and provides the coordinates of the checking, the id of the venue and the id of the user. The venues.csv provides the id, the coordinates and the name of some of those historical venues.


Here below a few pics showing how gowalla app ramped up during 2010, and an overlay of the hottest checked in spots in New York during 2010, generated using google maps and highchart, clustered using scikit k-means. Please refer to the ipython notebook (available on the github repo https://github.com/natalinobusa/gowalla-spark-demo) for more details about how this graphs, and the csv files were generated.

Data Modeling with Cassandra

Create keyspace and tables


First, create the keyspace and the tables. Since in this demo we are focusing on queries and data science, we are not going to set up proper replication as it should be for an actual cassandra cluster keyspace.

CREATE KEYSPACE IF NOT EXISTS lbsn 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
For the checkins table, I am gonna model the primary key, using a partition key with year and month, and a cluster key of days and timestamp. For this specific example we use a time field expressing the time of the day in seconds, since the date is already available in the other partition and clustering keys.

This specific modeling of time will turn up useful later on since I can query a range of days in a month or a given time window during the day, by making using of the properties of the clustering key. Finally we add the uid as a clustering key to avoid overwrites in case that two users check in at exactly the same date and time.

CREATE TABLE checkins (
  year  int, 
  month int, 
  day   int,
  time  int,
  ts    timestamp,
  uid   bigint,
  lat   double,
  lon   double,
  vid   bigint,
  PRIMARY KEY ((year, month), day, time, uid)
) WITH CLUSTERING ORDER BY (day DESC, ts DESC);
The venue table is much easier to model and I will just provide a key based on the partition by venue id.

CREATE TABLE lbsn.venues (
  vid   bigint, 
  name  text,
  lat   double,
  lon   double,
  PRIMARY KEY (vid)
);

Loading data

Data can be loaded with the load_tables.cql script, located in datasets/cassandra-cql
See here below the gist of it:

COPY checkins (year,month,day,time,ts,uid,lat,lon,vid) FROM '../checkins.csv' ;
COPY lbsn.venues (vid,name,lat,lon) FROM '../venues.csv' ;

Queries with Cassandra CQL

/* query a given day */
select * from lbsn.checkins where year=2010 and month=9 and day=13 limit 5;

 year | month | day | time | uid    | lat      | lon       | ts                       | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  13 |   39 | 114507 | 40.80794 | -73.96409 | 2010-09-13 00:00:39+0000 |   78472
 2010 |     9 |  13 |  138 |  37571 | 40.62563 | -74.25763 | 2010-09-13 00:02:18+0000 | 1663634
 2010 |     9 |  13 |  147 |  11321 | 40.73396 | -73.99301 | 2010-09-13 00:02:27+0000 |   35372
 2010 |     9 |  13 |  597 | 165226 | 40.64388 | -73.78281 | 2010-09-13 00:09:57+0000 |   23261
 2010 |     9 |  13 |  641 |  12719 | 40.72978 | -74.00121 | 2010-09-13 00:10:41+0000 |  758447
/* range by date day */
select * from checkins where year=2010 and month=9 and day<16 and day>13 limit 5;

 year | month | day | time | uid    | lat      | lon       | ts                       | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  14 |   91 |    853 | 40.73474 | -73.87434 | 2010-09-14 00:01:31+0000 |  917955
 2010 |     9 |  14 |  328 |   4516 | 40.72585 | -73.99289 | 2010-09-14 00:05:28+0000 |   37160
 2010 |     9 |  14 |  344 |   2964 | 40.67621 | -73.98405 | 2010-09-14 00:05:44+0000 |  956870
 2010 |     9 |  14 |  359 |  48555 | 40.76068 | -73.98699 | 2010-09-14 00:05:59+0000 | 3026508
 2010 |     9 |  14 |  688 | 189786 | 40.71588 | -74.00663 | 2010-09-14 00:11:28+0000 | 1036251
You can range by day, as long as you provide the composite partition key (year,month). Also since we have defined two clustering keys we can also define range queries within a single day, as defined below:

/* within a day, range by time between 17:00:00 and 18:00:00*/
select * from checkins where year=2010 and month=9 and day=13 and time>61200 and time<64800  limit 5;

 year | month | day | time  | uid    | lat      | lon       | ts                       | vid
------+-------+-----+-------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  13 | 61365 |  22137 | 40.71551 | -73.99062 | 2010-09-13 17:02:45+0000 | 1091950
 2010 |     9 |  13 | 61437 | 159418 | 40.74139 | -73.98125 | 2010-09-13 17:03:57+0000 |  101317
 2010 |     9 |  13 | 61519 |  41643 | 40.76386 | -73.97293 | 2010-09-13 17:05:19+0000 |   12535
 2010 |     9 |  13 | 62031 |  11695 | 40.74103 | -73.99337 | 2010-09-13 17:13:51+0000 |  918429
 2010 |     9 |  13 | 62149 |  16328 | 40.70582 |  -73.9967 | 2010-09-13 17:15:49+0000 |   11794
Beware that clustering keys ordering is important. You will be able to query a range only on the clustering key provided as last. This is because clustering keys are internally stored in a tree-like fashion. The following query, for instance, is invalid:

/* INVALID!: range by date and by time */
select * from checkins where year=2010 and month=9 and day>13 and day<16 and time>61200 and time<64800  limit 5;
InvalidRequest: code=2200 [Invalid query] message="PRIMARY KEY column "time" cannot be restricted (preceding column "day" is restricted by a non-EQ relation)"

Spark time

First, open the zeppelin console on the url http://localhost:8888/, then open up the notebook Location Based Social Networks: Spark and Cassandra.


You can go through the demo yourself, I will present here below an extract of what you can do with spark, cassandra, and the cassandra driver for spark.

Load The Cassandra-Spark Connector Library
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

val checkins = sc.cassandraTable("lbsn", "checkins")
val venues   = sc.cassandraTable("lbsn", "venues")
Basic operations
Table import can be done as RDD's or as Spark SQL/DataFrames. When done as RDD's, it's the developer responsibility to provide the right casting from the CassandraRow type to scala native types.
Checkins
checkins.count()
checkins.first()
res4: Long = 138449
res5: com.datastax.spark.connector.CassandraRow = CassandraRow{year: 2010, month: 7, day: 1, time: 11, uid: 74226, lat: 40.75548535, lon: -73.99116677, ts: 2010-07-01 02:00:11+0200, vid: 1365122}
Venues
venues.count()
venues.first()
res7: Long = 28699
res8: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 754108, lat: 40.7480915333, lon: -73.9891771, name: My Suit NY }
Query the collection using the cassandraRow API
The where clause translates back into cql, therefore it would allow selection on partition key and range on clustering key.
// how many checkins in new york during the valentine weekend
checkins.where("year = 2010 and month=2 and day>12 and day<15").count()
res12: Long = 887
Where is Central Park?
venues.where("vid = 7239827").first()
res14: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 7239827, lat: 40.758265613, lon: -73.994356727, name: Central Park Manhattan }
Column selection
venues.select("vid", "name").take(10).foreach(println)
CassandraRow{vid: 754108, name: My Suit NY }
CassandraRow{vid: 249755, name: UA Court Street Stadium 12}
CassandraRow{vid: 6919688, name: Nasty London!}
CassandraRow{vid: 521775, name: Calico Jacks Cantina}
CassandraRow{vid: 866136, name: Pho Bac}
CassandraRow{vid: 3841997, name: cave of the ridiculous}
CassandraRow{vid: 246266, name: Sky Asian Bistro}
CassandraRow{vid: 6762177, name: All Star Car Wash}
CassandraRow{vid: 797962, name: Blush}
CassandraRow{vid: 535067, name: Thompson Lower East Side}
Converting to native scala types, for instance tuples
The CassandraTable produces an RDD of type CassandraRow. If you want to move away from the CassandraRow type you can map to other types, using the usual spark transformations (map, reduce, flatmap, etc.)
How many checked in Soho?
// soho bounding box: -74.0055, 40.7187, -73.9959, 40.7296
// from CassandraRow to scala Tuples
val coords = checkins.map(row => (row.getDouble("lat"), row.getDouble("lon")))

coords.
  filter(coord => coord._1 > 40.7187 & coord._1 < 40.7296).
  filter(coord => coord._2 > -74.0055 & coord._2 < -73.9959).
  count()
res596: Long = 4825
filtering on Columns, Map to Tuples
// from CassandraRow to some other object

// via cassandraRow methods
venues.select("vid", "name").map(row => (row.getLong("vid"), row.getString("name"))).take(10).foreach(println)
(1162492,Hillary Flowers)
(1165309,Roadside Entertainment)
(703772,Pan Aqua)
(396792,O'Reilly's Pub)
(447153,Kam Wei Kitchen)
(7558564,SouthWestNY)
(7410669,Windsor Hotel)
(6856641,New Rochelle High School)
(267286,US Flag Plaza, Liberty Park, NJ)
(262669,Galloway Castle)
Filtering on columns, Map to case class
// by defining a case class
case class Venue(vid: Long, name: String)
sc.cassandraTable[Venue]("lbsn", "venues").take(10).foreach(println)
defined class Venue
Venue(1162492,Hillary Flowers)
Venue(1165309,Roadside Entertainment)
Venue(703772,Pan Aqua)
Venue(396792,O'Reilly's Pub)
Venue(447153,Kam Wei Kitchen)
Venue(7558564,SouthWestNY)
Venue(7410669,Windsor Hotel)
Venue(6856641,New Rochelle High School)
Venue(267286,US Flag Plaza, Liberty Park, NJ)
Venue(262669,Galloway Castle)
SQL schema extraction with Spark SQL
By using the CassandraSQLContext, the schema is extracted from a sample of the read data. Therefore no explicit casting/mapping is required, and you can move faster to standard scala native types.
Table import with Spark SQL/DataFrame
// by converting to a spark sql/dataframe
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
val df_venues = cc.sql("select vid, name from lbsn.venues")

df_venues.show(10)
vid     name                
754108  My Suit NY          
249755  UA Court Street S...
6919688 Nasty London!       
521775  Calico Jacks Cantina
866136  Pho Bac             
3841997 cave of the ridic...
246266  Sky Asian Bistro    
6762177 All Star Car Wash   
797962  Blush               
535067  Thompson Lower Ea...
Cassandra-side vs Spark-side Filtering
When the query is compatible with cql filtering and selection is executed fully on the cassandra side. By using the CassandraSQL context you can also define a query which would require also some filtering on the spark side.
Cassandra-side filtering
As RDD of CassandraRows
val checkins = sc.cassandraTable("lbsn", "checkins").select("ts", "uid", "vid").where("year=2010 and month=9 and day<16 and day>13")
checkins.take(10).foreach(println)
CassandraRow{ts: 2010-09-14 02:01:31+0200, uid: 853, vid: 917955}
CassandraRow{ts: 2010-09-14 02:05:28+0200, uid: 4516, vid: 37160}
CassandraRow{ts: 2010-09-14 02:05:44+0200, uid: 2964, vid: 956870}
CassandraRow{ts: 2010-09-14 02:05:59+0200, uid: 48555, vid: 3026508}
CassandraRow{ts: 2010-09-14 02:11:28+0200, uid: 189786, vid: 1036251}
CassandraRow{ts: 2010-09-14 02:14:33+0200, uid: 33841, vid: 1502210}
CassandraRow{ts: 2010-09-14 02:16:07+0200, uid: 12719, vid: 1078872}
CassandraRow{ts: 2010-09-14 02:18:17+0200, uid: 105012, vid: 341495}
CassandraRow{ts: 2010-09-14 02:19:24+0200, uid: 1214, vid: 1205097}
CassandraRow{ts: 2010-09-14 02:22:20+0200, uid: 189786, vid: 541535}

As a SchemaRDD
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and day<16 and day>13")
checkins.show(10)
ts                   uid    vid    
2010-09-14 02:01:... 853    917955 
2010-09-14 02:05:... 4516   37160  
2010-09-14 02:05:... 2964   956870 
2010-09-14 02:05:... 48555  3026508
2010-09-14 02:11:... 189786 1036251
2010-09-14 02:14:... 33841  1502210
2010-09-14 02:16:... 12719  1078872
2010-09-14 02:18:... 105012 341495 
2010-09-14 02:19:... 1214   1205097
2010-09-14 02:22:... 189786 541535

Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where day<16 and day>13")
checkins.show(10)
ts                   uid   vid   
2010-07-14 02:00:... 9024  107406
2010-07-14 02:01:... 79128 147618
2010-07-14 02:02:... 11372 187679
2010-07-14 02:03:... 23665 19762 
2010-07-14 02:09:... 89502 299617
2010-07-14 02:10:... 33494 244214
2010-07-14 02:13:... 33843 11975 
2010-07-14 02:14:... 84107 780336
2010-07-14 02:15:... 718   54022 
2010-07-14 02:16:... 578   268521
Combined Cassandra-side & Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and vid=57871")
checkins.show(10)
ts                   uid  vid  
2010-10-01 01:57:... 1684 57871
Joining cassandra tables in Spark
// joining tables (the easy way, and just as fast)

val df_venues   = cc.sql("select vid, name from lbsn.venues").as("venues").cache()
val df_checkins = cc.sql("select ts, uid, lat, lon, vid from lbsn.checkins").as("checkins").cache()

val checkins_venues = df_checkins.join(df_venues, $"checkins.vid" === $"venues.vid", "inner").select("ts", "uid", "lat", "lon", "checkins.vid","name")

checkins_venues.show(10)
ts                   uid    lat           lon                vid     name                
2010-07-01 02:47:... 578    40.7490532543 -73.9680397511     11831   United Nations      
2010-07-02 18:27:... 991    40.7188502243 -73.99594579149999 818431  OK 218              
2010-07-03 02:07:... 34359  40.7348441565 -73.9995288849     123831  Kingswood           
2010-07-03 18:58:... 578    40.6838680433 -73.9786720276     105831  Pacific Street St...
2010-07-03 19:53:... 2737   40.6906938667 -73.9956976167     197431  Floyd, NY           
2010-07-03 23:11:... 49393  40.6997066969 -73.8085234165     28031   Jamaica LIRR Station
2010-07-04 03:40:... 119601 40.7490532543 -73.9680397511     11831   United Nations      
2010-07-05 06:49:... 128772 40.8344876113 -73.9385139942     1241631 Morris Jumel Mansion
2010-07-06 07:36:... 38706  40.7490532543 -73.9680397511     11831   United Nations      
2010-07-06 17:12:... 35105  40.6997066969 -73.8085234165     28031   Jamaica LIRR Station
Aggregate and group by
// top 10 checked in venues
checkins_venues.groupBy("name").count().sort($"count".desc).show(10)
name                 count
LGA LaGuardia Air... 1673 
JFK John F. Kenne... 1643 
Starbucks            1316 
Starbucks Coffee     1114 
EWR Newark Libert... 1084 
Times Square         1084 
Grand Central Ter... 1002 
Dunkin' Donuts       507  
Madison Square Ga... 426  
The Museum of Mod... 392
Machine Learning: k-means Clustering
// run k-means clustering on the data

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

val locs = checkins_venues.select("lat","lon").map(s => Vectors.dense(s.getDouble(0), s.getDouble(1))).cache()

val numClusters = 50
val numIterations = 20
val clusters = KMeans.train(locs, numClusters, numIterations)

val WSSSE = clusters.computeCost(locs)
println("Within Set Sum of Squared Errors = " + WSSSE + "\n")

clusters.clusterCenters.foreach(println)
WSSSE: Double = 11.051512212406834
Within Set Sum of Squared Errors = 11.051512212406834

[40.72760996107598,-73.98843612291218]
[40.64528778525469,-74.23562853133915]
[40.76587038674041,-73.982835425778]
[40.64524967416557,-73.78403168243933]
[40.80733971996237,-73.95276889804212]
[40.69057039696831,-74.17928359461052]
[40.80956608048502,-73.80586643992105]
[40.826054341673164,-74.21764724121967]
...
Machine Learning: Score predictions

import org.apache.spark.sql.functions.udf

val func = (lat:Double, lon:Double) => clusters.predict(Vectors.dense(lat,lon))
val sqlfunc = udf(func)

// add predictions as extra column, by using a user define function
// remember that clusters closes over the udf, and is broadcasted to the various executors
val locs_cid = checkins_venues.withColumn("cluster", sqlfunc(checkins_venues("lat"), checkins_venues("lon")))

locs_cid.show(10)
ts                   uid    lat           lon                vid     name                 cluster
2010-07-01 02:03:... 10231  40.663200646  -73.984763295      1225113 Thistle Hill Tavern  34     
2010-07-01 02:06:... 4907   40.74101965   -73.99416911670001 1078263 Limelight Marketp... 47     
2010-07-01 02:10:... 4929   40.747507     -73.989425         1175513 La Rosa Cigars       47     
2010-07-01 02:14:... 26851  40.76823395   -73.95315415       164621  David Copperfields   38     
2010-07-01 02:17:... 4929   40.74695265   -73.9857679833     141918  J.J. Hat Center      47     
2010-07-01 02:23:... 4929   40.7484436586 -73.9857316017     12313   Empire State Buil... 47     
2010-07-01 02:25:... 24712  40.6752607557 -73.9813770354     296249  Al di lá Trattoria   34     
2010-07-01 02:26:... 6639   40.7373236513 -73.9910423756     261515  Havana Central       47     
2010-07-01 02:27:... 158638 40.82951468   -73.92625695       153115  Yankees Ticket Of... 44     
2010-07-01 02:27:... 124703 40.826946067  -73.92811775210001 11720   Yankee Stadium       44
Spark gymnastics with Data Frames
val df = locs_cid.select("cluster", "name").
    groupBy("cluster", "name").
    agg(Map("name" -> "count")).
    sort($"cluster", $"COUNT(name)".desc).cache()

df.show()
cluster name                 COUNT(name)
0       Starbucks Coffee     172        
0       Regal Union Squar... 118        
0       Whole Foods Marke... 95         
0       AOL HQ               92         
0       Strand Bookstore     85         
0       Katz's Delicatessen  79         
0       Ippudo               73         
0       Trader Joe's         70         
0       Momofuku Noodle Bar  66         
0       Whole Foods Marke... 62         
0       The Grey Dog's Co... 59         
0       Webster Hall         58         
0       Momofuku Milk Bar    58         
0       Veselka              55         
0       Schiller's Liquor... 54         
0       Think Coffee         53         
0       Foursquare HQ        52         
0       Astor Place Station  50         
0       Momofuku Ssäm Bar    48         
0       Max Brenner          48
Spark Data Frames to RDD's
// from dataframe to rdd
val r = df.rdd.map(row => (row.getInt(0), (row.getString(1), row.getLong(2))) ).cache()
r.first
res67: (Int, (String, Long)) = (0,(Starbucks Coffee,172))
Spark RDD's gymnastics: groupByKey, map, flatMap
val topNPerGroup = r.groupByKey.map { 
   case (k, v) => 
       k -> v.toList.sortBy(-_._2).take(3)
}

topNPerGroup.collect.foreach(println)
(0,List((Starbucks Coffee,172), (Regal Union Square Stadium 14,118), (Whole Foods Market (Union Square),95)))
(1,List((Nuno's Pavillion,48), (Elizabeth Train Station,19), (Quick Check,17)))
(2,List((Starbucks,249), (Radio City Music Hall,237), (Columbus Circle,174)))
(3,List((JFK John F. Kennedy International,1643), (JFK Terminal 5,325), (Terminal 4 at JFK,115)))
(4,List((Columbia University,110), (The City College Of New York - NAC Building,58), (Harlem125th Street Metro North Station,49)))
(5,List((EWR Newark Liberty International,1084), (EWR Terminal A,167), (EWR Terminal C,162)))
(6,List((Throgs Neck Bridge Toll,117), (Throgs Neck Bridge,86), (Whitestone Bridge,25)))
(7,List((RHM Headquarters,36), (Egan's,22), (Edgemont Memorial Park,18)))
(8,List((Dunkin' Donuts,88), (Verrazano-Narrows Bridge,77), (New York Sports Club ,59)))
(9,List((Queensboro Plaza Station,51), (Bohemian Hall & Beer Garden,36), (Studio Square Beer Garden,31)))
(10,List((Secaucus Junction,113), (Xchange,16), (Starbucks,15)))
...
Flattening hierarchical RDD to a single RDD
// flattening this to a single list
val flattenedTopNPerGroup = 
    topNPerGroup.flatMap({case (k,v) => v.map(s => (k,s))})

flattenedTopNPerGroup.collect.foreach(println)
(0,(Starbucks Coffee,172))
(0,(Regal Union Square Stadium 14,118))
(0,(Whole Foods Market (Union Square),95))
(1,(Nuno's Pavillion,48))
(1,(Elizabeth Train Station,19))
(1,(Quick Check,17))
(2,(Starbucks,249))
(2,(Radio City Music Hall,237))
(2,(Columbus Circle,174))
(3,(JFK John F. Kennedy International,1643))
(3,(JFK Terminal 5,325))
(3,(Terminal 4 at JFK,115))
(4,(Columbia University,110))
(4,(The City College Of New York - NAC Building,58))
(4,(Harlem125th Street Metro North Station,49))
(5,(EWR Newark Liberty International,1084))
(5,(EWR Terminal A,167))
(5,(EWR Terminal C,162))
(6,(Throgs Neck Bridge Toll,117))
(6,(Throgs Neck Bridge,86))
(6,(Whitestone Bridge,25))
(7,(RHM Headquarters,36))
(7,(Egan's,22))
(7,(Edgemont Memorial Park,18))
(8,(Dunkin' Donuts,88))
(8,(Verrazano-Narrows Bridge,77))
(8,(New York Sports Club ,59))
(9,(Queensboro Plaza Station,51))
(9,(Bohemian Hall & Beer Garden,36))
(9,(Studio Square Beer Garden,31))
(10,(Secaucus Junction,113))
(10,(Xchange,16))
(10,(Starbucks,15))