As a demo, we will cluster check-in transactions in New York using the gowalla dataset. Clustering will be performed on Spark using the mllib library while extracting data from a cassandra keyspace using the Spark-Cassandra connector.
Cassandra
http://cassandra.apache.org/Apache Cassandra is an open source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.
Cassandra's data model is a partitioned row store with tunable consistency. Rows are organized into tables; the first component of a table's primary key is the partition key; within a partition, rows are clustered by the remaining columns of the key. Other columns may be indexed separately from the primary key. Tables may be created, dropped, and altered at runtime without blocking updates and queries.
Cassandra does not currently support joins or subqueries. Rather, Cassandra emphasizes denormalization through features like collections. However as shown in this post, this limitations can be overcome by using Spark and Cassandra together.
Spark
http://spark.apache.org/Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Spark Cassandra Connector
This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.
Zeppelin (Apache incubator)
Zeppelin is really early days, but very promising and a nice alternative to ipython. Currently, it can only be build from sources. Check the master branch on github.
Install and Setup
Versions
Matching version is quite important when working with this set of technologies. The demo provided here below is tested on the following versions;- cassandra 2.1.7
- spark 1.3.1
- spark-cassandra connector 1.3
- zeppelin (master github)
Requirements
- jdk 1.7 or higher
- git
- npm
- maven
- curl
- wget
Setup
You need to have Cassandra, Spark and Zeppeling running on your system in order to proceed. For those of you who want to give it a try, I have written a very rudimental script which downloads, builds and install all the necessary tools on a local directory (no admin required).First clone the project from github, then run the
install.sh
script. Since the script will build Spark and Zeppelin from source, it's gonna take a while. I am planning to test the pre-built Spark when I have some more time.git clone https://github.com/natalinobusa/gowalla-spark-demo.git gowalla-spark-demo
cd gowalla-spark-demo
./install.sh
Run it!
Again, I have prepared a scriptstart-all.sh
, which runs all the various ingredients. A similar script stop-all.sh
is available to stop all services.
Spark is configured to run in cluster mode (albeit on a single node), a password might be prompted, since the master and the workers of spark communicate via ssh.Some headstart:
- Spark web interface: http://localhost:8080/
- Zeppelin: http://localhost:8888/
Cassandra can be accessed with the cqlsh command line interface. After installing and setting up the system, type
./apache-cassandra-2.1.7/bin/cqlsh
from the root of the git project, to start the cql client.Data
The venues id and names are taken from the following datasets:loc-gowalla_totalCheckins.txt.gz
| https://snap.stanford.edu/data/loc-gowalla.htmlE. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011.
gowalla-dataset.zip
| https://code.google.com/p/locrec/downloads/detail?name=gowalla-dataset.zip The project is being developed in the context of the SInteliGIS project financed by the Portuguese Foundation for Science and Technology (FCT) through project grant PTDC/EIA-EIA/109840/2009.
Generating a sample extract using python
The csv files provided in this demo have been extracted using a python notebook. Some info about those files. The checking file
checkins.csv
has year, month, day and utc checkin date and provides the coordinates of the checking, the id of the venue and the id of the user. The venues.csv
provides the id, the coordinates and the name of some of those historical venues. Here below a few pics showing how gowalla app ramped up during 2010, and an overlay of the hottest checked in spots in New York during 2010, generated using google maps and highchart, clustered using scikit k-means. Please refer to the ipython notebook (available on the github repo https://github.com/natalinobusa/gowalla-spark-demo) for more details about how this graphs, and the csv files were generated.
Data Modeling with Cassandra
Create keyspace and tables
First, create the keyspace and the tables. Since in this demo we are focusing on queries and data science, we are not going to set up proper replication as it should be for an actual cassandra cluster keyspace.
CREATE KEYSPACE IF NOT EXISTS lbsn
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
For the checkins table, I am gonna model the primary key, using a partition key with year and month, and a cluster key of days and timestamp. For this specific example we use a time field expressing the time of the day in seconds, since the date is already available in the other partition and clustering keys.This specific modeling of time will turn up useful later on since I can query a range of days in a month or a given time window during the day, by making using of the properties of the clustering key. Finally we add the uid as a clustering key to avoid overwrites in case that two users check in at exactly the same date and time.
CREATE TABLE checkins (
year int,
month int,
day int,
time int,
ts timestamp,
uid bigint,
lat double,
lon double,
vid bigint,
PRIMARY KEY ((year, month), day, time, uid)
) WITH CLUSTERING ORDER BY (day DESC, ts DESC);
The venue table is much easier to model and I will just provide a key based on the partition by venue id.CREATE TABLE lbsn.venues (
vid bigint,
name text,
lat double,
lon double,
PRIMARY KEY (vid)
);
Loading data
Data can be loaded with theload_tables.cql
script, located in datasets/cassandra-cql
See here below the gist of it:
COPY checkins (year,month,day,time,ts,uid,lat,lon,vid) FROM '../checkins.csv' ;
COPY lbsn.venues (vid,name,lat,lon) FROM '../venues.csv' ;
Queries with Cassandra CQL
/* query a given day */
select * from lbsn.checkins where year=2010 and month=9 and day=13 limit 5;
year | month | day | time | uid | lat | lon | ts | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
2010 | 9 | 13 | 39 | 114507 | 40.80794 | -73.96409 | 2010-09-13 00:00:39+0000 | 78472
2010 | 9 | 13 | 138 | 37571 | 40.62563 | -74.25763 | 2010-09-13 00:02:18+0000 | 1663634
2010 | 9 | 13 | 147 | 11321 | 40.73396 | -73.99301 | 2010-09-13 00:02:27+0000 | 35372
2010 | 9 | 13 | 597 | 165226 | 40.64388 | -73.78281 | 2010-09-13 00:09:57+0000 | 23261
2010 | 9 | 13 | 641 | 12719 | 40.72978 | -74.00121 | 2010-09-13 00:10:41+0000 | 758447
/* range by date day */
select * from checkins where year=2010 and month=9 and day<16 and day>13 limit 5;
year | month | day | time | uid | lat | lon | ts | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
2010 | 9 | 14 | 91 | 853 | 40.73474 | -73.87434 | 2010-09-14 00:01:31+0000 | 917955
2010 | 9 | 14 | 328 | 4516 | 40.72585 | -73.99289 | 2010-09-14 00:05:28+0000 | 37160
2010 | 9 | 14 | 344 | 2964 | 40.67621 | -73.98405 | 2010-09-14 00:05:44+0000 | 956870
2010 | 9 | 14 | 359 | 48555 | 40.76068 | -73.98699 | 2010-09-14 00:05:59+0000 | 3026508
2010 | 9 | 14 | 688 | 189786 | 40.71588 | -74.00663 | 2010-09-14 00:11:28+0000 | 1036251
You can range by day, as long as you provide the composite partition key (year,month). Also since we have defined two clustering keys we can also define range queries within a single day, as defined below:/* within a day, range by time between 17:00:00 and 18:00:00*/
select * from checkins where year=2010 and month=9 and day=13 and time>61200 and time<64800 limit 5;
year | month | day | time | uid | lat | lon | ts | vid
------+-------+-----+-------+--------+----------+-----------+--------------------------+---------
2010 | 9 | 13 | 61365 | 22137 | 40.71551 | -73.99062 | 2010-09-13 17:02:45+0000 | 1091950
2010 | 9 | 13 | 61437 | 159418 | 40.74139 | -73.98125 | 2010-09-13 17:03:57+0000 | 101317
2010 | 9 | 13 | 61519 | 41643 | 40.76386 | -73.97293 | 2010-09-13 17:05:19+0000 | 12535
2010 | 9 | 13 | 62031 | 11695 | 40.74103 | -73.99337 | 2010-09-13 17:13:51+0000 | 918429
2010 | 9 | 13 | 62149 | 16328 | 40.70582 | -73.9967 | 2010-09-13 17:15:49+0000 | 11794
Beware that clustering keys ordering is important. You will be able to query a range only on the clustering key provided as last. This is because clustering keys are internally stored in a tree-like fashion. The following query, for instance, is invalid:/* INVALID!: range by date and by time */
select * from checkins where year=2010 and month=9 and day>13 and day<16 and time>61200 and time<64800 limit 5;
InvalidRequest: code=2200 [Invalid query] message="PRIMARY KEY column "time" cannot be restricted (preceding column "day" is restricted by a non-EQ relation)"
Spark time
First, open the zeppelin console on the urlhttp://localhost:8888/
, then open up the notebook Location Based Social Networks: Spark and Cassandra
.
You can go through the demo yourself, I will present here below an extract of what you can do with spark, cassandra, and the cassandra driver for spark.
Load The Cassandra-Spark Connector Library
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
val checkins = sc.cassandraTable("lbsn", "checkins")
val venues = sc.cassandraTable("lbsn", "venues")
Basic operations
Table import can be done as RDD's or as Spark SQL/DataFrames. When done as RDD's, it's the developer responsibility to provide the right casting from the CassandraRow type to scala native types.Checkins
checkins.count()
checkins.first()
res4: Long = 138449
res5: com.datastax.spark.connector.CassandraRow = CassandraRow{year: 2010, month: 7, day: 1, time: 11, uid: 74226, lat: 40.75548535, lon: -73.99116677, ts: 2010-07-01 02:00:11+0200, vid: 1365122}
Venues
venues.count()
venues.first()
res7: Long = 28699
res8: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 754108, lat: 40.7480915333, lon: -73.9891771, name: My Suit NY }
Query the collection using the cassandraRow API
The where clause translates back into cql, therefore it would allow selection on partition key and range on clustering key.// how many checkins in new york during the valentine weekend
checkins.where("year = 2010 and month=2 and day>12 and day<15").count()
res12: Long = 887
Where is Central Park?
venues.where("vid = 7239827").first()
res14: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 7239827, lat: 40.758265613, lon: -73.994356727, name: Central Park Manhattan }
Column selection
venues.select("vid", "name").take(10).foreach(println)
CassandraRow{vid: 754108, name: My Suit NY }
CassandraRow{vid: 249755, name: UA Court Street Stadium 12}
CassandraRow{vid: 6919688, name: Nasty London!}
CassandraRow{vid: 521775, name: Calico Jacks Cantina}
CassandraRow{vid: 866136, name: Pho Bac}
CassandraRow{vid: 3841997, name: cave of the ridiculous}
CassandraRow{vid: 246266, name: Sky Asian Bistro}
CassandraRow{vid: 6762177, name: All Star Car Wash}
CassandraRow{vid: 797962, name: Blush}
CassandraRow{vid: 535067, name: Thompson Lower East Side}
Converting to native scala types, for instance tuples
The CassandraTable produces an RDD of type CassandraRow. If you want to move away from the CassandraRow type you can map to other types, using the usual spark transformations (map, reduce, flatmap, etc.)How many checked in Soho?
// soho bounding box: -74.0055, 40.7187, -73.9959, 40.7296
// from CassandraRow to scala Tuples
val coords = checkins.map(row => (row.getDouble("lat"), row.getDouble("lon")))
coords.
filter(coord => coord._1 > 40.7187 & coord._1 < 40.7296).
filter(coord => coord._2 > -74.0055 & coord._2 < -73.9959).
count()
res596: Long = 4825
filtering on Columns, Map to Tuples
// from CassandraRow to some other object
// via cassandraRow methods
venues.select("vid", "name").map(row => (row.getLong("vid"), row.getString("name"))).take(10).foreach(println)
(1162492,Hillary Flowers)
(1165309,Roadside Entertainment)
(703772,Pan Aqua)
(396792,O'Reilly's Pub)
(447153,Kam Wei Kitchen)
(7558564,SouthWestNY)
(7410669,Windsor Hotel)
(6856641,New Rochelle High School)
(267286,US Flag Plaza, Liberty Park, NJ)
(262669,Galloway Castle)
Filtering on columns, Map to case class
// by defining a case class
case class Venue(vid: Long, name: String)
sc.cassandraTable[Venue]("lbsn", "venues").take(10).foreach(println)
defined class Venue
Venue(1162492,Hillary Flowers)
Venue(1165309,Roadside Entertainment)
Venue(703772,Pan Aqua)
Venue(396792,O'Reilly's Pub)
Venue(447153,Kam Wei Kitchen)
Venue(7558564,SouthWestNY)
Venue(7410669,Windsor Hotel)
Venue(6856641,New Rochelle High School)
Venue(267286,US Flag Plaza, Liberty Park, NJ)
Venue(262669,Galloway Castle)
SQL schema extraction with Spark SQL
By using the CassandraSQLContext, the schema is extracted from a sample of the read data. Therefore no explicit casting/mapping is required, and you can move faster to standard scala native types.Table import with Spark SQL/DataFrame
// by converting to a spark sql/dataframe
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
val df_venues = cc.sql("select vid, name from lbsn.venues")
df_venues.show(10)
vid name
754108 My Suit NY
249755 UA Court Street S...
6919688 Nasty London!
521775 Calico Jacks Cantina
866136 Pho Bac
3841997 cave of the ridic...
246266 Sky Asian Bistro
6762177 All Star Car Wash
797962 Blush
535067 Thompson Lower Ea...
Cassandra-side vs Spark-side Filtering
When the query is compatible with cql filtering and selection is executed fully on the cassandra side. By using the CassandraSQL context you can also define a query which would require also some filtering on the spark side.Cassandra-side filtering
As RDD of CassandraRows
val checkins = sc.cassandraTable("lbsn", "checkins").select("ts", "uid", "vid").where("year=2010 and month=9 and day<16 and day>13")
checkins.take(10).foreach(println)
CassandraRow{ts: 2010-09-14 02:01:31+0200, uid: 853, vid: 917955}
CassandraRow{ts: 2010-09-14 02:05:28+0200, uid: 4516, vid: 37160}
CassandraRow{ts: 2010-09-14 02:05:44+0200, uid: 2964, vid: 956870}
CassandraRow{ts: 2010-09-14 02:05:59+0200, uid: 48555, vid: 3026508}
CassandraRow{ts: 2010-09-14 02:11:28+0200, uid: 189786, vid: 1036251}
CassandraRow{ts: 2010-09-14 02:14:33+0200, uid: 33841, vid: 1502210}
CassandraRow{ts: 2010-09-14 02:16:07+0200, uid: 12719, vid: 1078872}
CassandraRow{ts: 2010-09-14 02:18:17+0200, uid: 105012, vid: 341495}
CassandraRow{ts: 2010-09-14 02:19:24+0200, uid: 1214, vid: 1205097}
CassandraRow{ts: 2010-09-14 02:22:20+0200, uid: 189786, vid: 541535}
As a SchemaRDD
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and day<16 and day>13")
checkins.show(10)
ts uid vid
2010-09-14 02:01:... 853 917955
2010-09-14 02:05:... 4516 37160
2010-09-14 02:05:... 2964 956870
2010-09-14 02:05:... 48555 3026508
2010-09-14 02:11:... 189786 1036251
2010-09-14 02:14:... 33841 1502210
2010-09-14 02:16:... 12719 1078872
2010-09-14 02:18:... 105012 341495
2010-09-14 02:19:... 1214 1205097
2010-09-14 02:22:... 189786 541535
Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where day<16 and day>13")
checkins.show(10)
ts uid vid
2010-07-14 02:00:... 9024 107406
2010-07-14 02:01:... 79128 147618
2010-07-14 02:02:... 11372 187679
2010-07-14 02:03:... 23665 19762
2010-07-14 02:09:... 89502 299617
2010-07-14 02:10:... 33494 244214
2010-07-14 02:13:... 33843 11975
2010-07-14 02:14:... 84107 780336
2010-07-14 02:15:... 718 54022
2010-07-14 02:16:... 578 268521
Combined Cassandra-side & Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and vid=57871")
checkins.show(10)
ts uid vid
2010-10-01 01:57:... 1684 57871
Joining cassandra tables in Spark
// joining tables (the easy way, and just as fast)
val df_venues = cc.sql("select vid, name from lbsn.venues").as("venues").cache()
val df_checkins = cc.sql("select ts, uid, lat, lon, vid from lbsn.checkins").as("checkins").cache()
val checkins_venues = df_checkins.join(df_venues, $"checkins.vid" === $"venues.vid", "inner").select("ts", "uid", "lat", "lon", "checkins.vid","name")
checkins_venues.show(10)
ts uid lat lon vid name
2010-07-01 02:47:... 578 40.7490532543 -73.9680397511 11831 United Nations
2010-07-02 18:27:... 991 40.7188502243 -73.99594579149999 818431 OK 218
2010-07-03 02:07:... 34359 40.7348441565 -73.9995288849 123831 Kingswood
2010-07-03 18:58:... 578 40.6838680433 -73.9786720276 105831 Pacific Street St...
2010-07-03 19:53:... 2737 40.6906938667 -73.9956976167 197431 Floyd, NY
2010-07-03 23:11:... 49393 40.6997066969 -73.8085234165 28031 Jamaica LIRR Station
2010-07-04 03:40:... 119601 40.7490532543 -73.9680397511 11831 United Nations
2010-07-05 06:49:... 128772 40.8344876113 -73.9385139942 1241631 Morris Jumel Mansion
2010-07-06 07:36:... 38706 40.7490532543 -73.9680397511 11831 United Nations
2010-07-06 17:12:... 35105 40.6997066969 -73.8085234165 28031 Jamaica LIRR Station
Aggregate and group by
// top 10 checked in venues
checkins_venues.groupBy("name").count().sort($"count".desc).show(10)
name count
LGA LaGuardia Air... 1673
JFK John F. Kenne... 1643
Starbucks 1316
Starbucks Coffee 1114
EWR Newark Libert... 1084
Times Square 1084
Grand Central Ter... 1002
Dunkin' Donuts 507
Madison Square Ga... 426
The Museum of Mod... 392
Machine Learning: k-means Clustering
// run k-means clustering on the data
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
val locs = checkins_venues.select("lat","lon").map(s => Vectors.dense(s.getDouble(0), s.getDouble(1))).cache()
val numClusters = 50
val numIterations = 20
val clusters = KMeans.train(locs, numClusters, numIterations)
val WSSSE = clusters.computeCost(locs)
println("Within Set Sum of Squared Errors = " + WSSSE + "\n")
clusters.clusterCenters.foreach(println)
WSSSE: Double = 11.051512212406834
Within Set Sum of Squared Errors = 11.051512212406834
[40.72760996107598,-73.98843612291218]
[40.64528778525469,-74.23562853133915]
[40.76587038674041,-73.982835425778]
[40.64524967416557,-73.78403168243933]
[40.80733971996237,-73.95276889804212]
[40.69057039696831,-74.17928359461052]
[40.80956608048502,-73.80586643992105]
[40.826054341673164,-74.21764724121967]
...
Machine Learning: Score predictions
import org.apache.spark.sql.functions.udf
val func = (lat:Double, lon:Double) => clusters.predict(Vectors.dense(lat,lon))
val sqlfunc = udf(func)
// add predictions as extra column, by using a user define function
// remember that clusters closes over the udf, and is broadcasted to the various executors
val locs_cid = checkins_venues.withColumn("cluster", sqlfunc(checkins_venues("lat"), checkins_venues("lon")))
locs_cid.show(10)
ts uid lat lon vid name cluster
2010-07-01 02:03:... 10231 40.663200646 -73.984763295 1225113 Thistle Hill Tavern 34
2010-07-01 02:06:... 4907 40.74101965 -73.99416911670001 1078263 Limelight Marketp... 47
2010-07-01 02:10:... 4929 40.747507 -73.989425 1175513 La Rosa Cigars 47
2010-07-01 02:14:... 26851 40.76823395 -73.95315415 164621 David Copperfields 38
2010-07-01 02:17:... 4929 40.74695265 -73.9857679833 141918 J.J. Hat Center 47
2010-07-01 02:23:... 4929 40.7484436586 -73.9857316017 12313 Empire State Buil... 47
2010-07-01 02:25:... 24712 40.6752607557 -73.9813770354 296249 Al di lá Trattoria 34
2010-07-01 02:26:... 6639 40.7373236513 -73.9910423756 261515 Havana Central 47
2010-07-01 02:27:... 158638 40.82951468 -73.92625695 153115 Yankees Ticket Of... 44
2010-07-01 02:27:... 124703 40.826946067 -73.92811775210001 11720 Yankee Stadium 44
Spark gymnastics with Data Frames
val df = locs_cid.select("cluster", "name").
groupBy("cluster", "name").
agg(Map("name" -> "count")).
sort($"cluster", $"COUNT(name)".desc).cache()
df.show()
cluster name COUNT(name)
0 Starbucks Coffee 172
0 Regal Union Squar... 118
0 Whole Foods Marke... 95
0 AOL HQ 92
0 Strand Bookstore 85
0 Katz's Delicatessen 79
0 Ippudo 73
0 Trader Joe's 70
0 Momofuku Noodle Bar 66
0 Whole Foods Marke... 62
0 The Grey Dog's Co... 59
0 Webster Hall 58
0 Momofuku Milk Bar 58
0 Veselka 55
0 Schiller's Liquor... 54
0 Think Coffee 53
0 Foursquare HQ 52
0 Astor Place Station 50
0 Momofuku Ssäm Bar 48
0 Max Brenner 48
Spark Data Frames to RDD's
// from dataframe to rdd
val r = df.rdd.map(row => (row.getInt(0), (row.getString(1), row.getLong(2))) ).cache()
r.first
res67: (Int, (String, Long)) = (0,(Starbucks Coffee,172))
Spark RDD's gymnastics: groupByKey, map, flatMap
val topNPerGroup = r.groupByKey.map {
case (k, v) =>
k -> v.toList.sortBy(-_._2).take(3)
}
topNPerGroup.collect.foreach(println)
(0,List((Starbucks Coffee,172), (Regal Union Square Stadium 14,118), (Whole Foods Market (Union Square),95)))
(1,List((Nuno's Pavillion,48), (Elizabeth Train Station,19), (Quick Check,17)))
(2,List((Starbucks,249), (Radio City Music Hall,237), (Columbus Circle,174)))
(3,List((JFK John F. Kennedy International,1643), (JFK Terminal 5,325), (Terminal 4 at JFK,115)))
(4,List((Columbia University,110), (The City College Of New York - NAC Building,58), (Harlem—125th Street Metro North Station,49)))
(5,List((EWR Newark Liberty International,1084), (EWR Terminal A,167), (EWR Terminal C,162)))
(6,List((Throgs Neck Bridge Toll,117), (Throgs Neck Bridge,86), (Whitestone Bridge,25)))
(7,List((RHM Headquarters,36), (Egan's,22), (Edgemont Memorial Park,18)))
(8,List((Dunkin' Donuts,88), (Verrazano-Narrows Bridge,77), (New York Sports Club ,59)))
(9,List((Queensboro Plaza Station,51), (Bohemian Hall & Beer Garden,36), (Studio Square Beer Garden,31)))
(10,List((Secaucus Junction,113), (Xchange,16), (Starbucks,15)))
...
Flattening hierarchical RDD to a single RDD
// flattening this to a single list
val flattenedTopNPerGroup =
topNPerGroup.flatMap({case (k,v) => v.map(s => (k,s))})
flattenedTopNPerGroup.collect.foreach(println)
(0,(Starbucks Coffee,172))
(0,(Regal Union Square Stadium 14,118))
(0,(Whole Foods Market (Union Square),95))
(1,(Nuno's Pavillion,48))
(1,(Elizabeth Train Station,19))
(1,(Quick Check,17))
(2,(Starbucks,249))
(2,(Radio City Music Hall,237))
(2,(Columbus Circle,174))
(3,(JFK John F. Kennedy International,1643))
(3,(JFK Terminal 5,325))
(3,(Terminal 4 at JFK,115))
(4,(Columbia University,110))
(4,(The City College Of New York - NAC Building,58))
(4,(Harlem—125th Street Metro North Station,49))
(5,(EWR Newark Liberty International,1084))
(5,(EWR Terminal A,167))
(5,(EWR Terminal C,162))
(6,(Throgs Neck Bridge Toll,117))
(6,(Throgs Neck Bridge,86))
(6,(Whitestone Bridge,25))
(7,(RHM Headquarters,36))
(7,(Egan's,22))
(7,(Edgemont Memorial Park,18))
(8,(Dunkin' Donuts,88))
(8,(Verrazano-Narrows Bridge,77))
(8,(New York Sports Club ,59))
(9,(Queensboro Plaza Station,51))
(9,(Bohemian Hall & Beer Garden,36))
(9,(Studio Square Beer Garden,31))
(10,(Secaucus Junction,113))
(10,(Xchange,16))
(10,(Starbucks,15))