Thursday, August 4, 2016

Streaming Analytics: A story of many tales

Streaming Analytics is a data processing paradigm which is gaining much traction lately, mainly because more and more data is available as events through web services and real-time sources rather than being collected and packaged in data batches.

Over the past years, we have seen a number of projects which are tackling this problem. Some are stemming from batch analysis and achieve streaming computing by scheduling data processing at a high rate (micro-batching). Other projects started by dealing with event processing and moved up to process bigger data collections.
Streaming analytics can be applied to both relational, NoSQL and document oriented data stores. The underlying datastore and persistency model has a profound effect on the nature of the streaming analytics solution devised.

Solutions, Tools, Engines ...

So, how can we classify streaming computing initiatives?
We can divide streaming computing into three different categories:
  • Streaming Data Solutions (operation and business users)
  • Tools (data analysts, data scientists)
  • Engines,  (data engineers, data scientists and statisticians)


Streaming processing engines

From here on, let's focus on the engines, as streaming computing engines are the foundation for any domain-specific streaming application. Streaming engines can be seen as distributed data OS'es. 
Streaming processing engines are best understood if grouped according to the following five dimensions:
  1. Generation:
    Development epoch and influences. Which came first, which ideas and lesson learned have influenced later projects?
  2. State:How is the state preserved? Is there a memory to disk spilling strategy? How are snapshots managed? What is the cost of a full state restore? Can events be replayed?
  3. Resiliency:How well can the distributed processing engine deal with failure? What about replaying incoming events? Does the architecture provide any guarantees about exactly-once data processing?
  4. Paradigm:
    Streaming computing paradigm. Event- or Batch- based? What is the sweet spot for a given engine in terms of streaming processing latency and throughput?
  5. Orchestration:
    Underlying resource orchestration, think for instance of Mesos, Yarn, etc.
Here below, a very opinionated chart of a number of bespoken streaming processing technologies which have been developed in the last years. Do please take the following picture with a good pinch of salt as it's more inspirational rather than being thorough or accurate in any ways.
The following list is roughly ordered by generation (a mix of both chronologically as in terms of evolution from previous paradigms/engines)

Flink

orchestrators: Yarn, Standalone, Cloud (EC2, GCE)
paradigm: event-driven
language: Java 47.2% C++ 24.7% Python 13.9%
community: github forks: 286
community: github first commit: Dec 13, 2015
url: https://flink.apache.org/
Apache Flink is a community-driven open source framework for distributed big data analytics, like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written in Java and Scala. It aims to bridge the gap between MapReduce-like systems and shared-nothing parallel database systems. Therefore, Flink executes arbitrary dataflow programs in a data-parallel and pipelined manner. Flink’s pipelined runtime system enables the execution of bulk/batch and stream processing programs. Furthermore, Flink’s runtime supports the execution of iterative algorithms natively.
Flink programs can be written in Java or Scala and are automatically compiled and optimized into dataflow programs that are executed in a cluster or cloud environment. Flink does not provide its own data storage system, input data must be stored in a distributed storage system like HDFS or HBase. For data stream processing, Flink consumes data from (reliable) message queues like Kafka.

Spark Streaming

orchestrators: Mesos, Yarn, Standalone, Cloud (EC2)
paradigm: batch
languages: Scala 77.2% Java 10.6%
community: github forks: 8261
community: github first commit: Mar 28, 2010
url: http://spark.apache.org/streaming/
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.
This guide shows you how to start writing Spark Streaming programs with DStreams. You can write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2), all of which are presented in this guide. You will find tabs throughout this guide that let you choose between code snippets of different languages.
Heronorchestrators: Standalone
paradigm: event-driven
language: Java 47.2% C++ 24.7% Python 13.9%
community: github forks: 286
community: github first commit: Dec 13, 2015
url: http://twitter.github.io/heron/
Essentially Storm 2.0, compatible with the storm API, but with better support for state and resources management. Heron is designed to be fully backward compatible with existing Apache Storm projects, which means that you can migrate an existing Storm topology to Heron by making just a few adjustments to the topology’s pom.xml Maven configuration file.

Kafka streams

orchestrators: Standalone
paradigm: event-driven
language: Java 57.3% Scala 36.7%
community: github forks: 1987
community: github first commit: Jul 31, 2011
url: http://docs.confluent.io/3.0.0/streams/
Embedded distributed computing in kafka queues. Provides standalone “processor” client besides the existing producer and consumer clients for processing data consumed from Kafka and storing results back to Kafka. A processor computes on a stream of messages, with each message composed as a key-value pair. Processor receives one message at a time and does not have access to the whole data set at once. The k-stream processor api supports per-message processing and time-triggered processing. Multiple processors should be able to chained up to form a DAG (i.e. the processor topology) for complex processing logic.
Users can define such processor topology in a exploring REPL manner: make an initial topology, deploy and run, check the results and intermediate values, and pause the job and edit the topology on-the-fly. Users can create state storage inside a processor that can be accessed locally. For example, a processor may retain a (usually most recent) subset of data for a join, aggregation / non-monolithic operations.

Samza

orchestrators: Yarn, Standalone
languages: Scala 48.0% Java 47.8%
paradigm: event-driven
maturity: medium
community: github forks: 93
community: github first commit: Aug 11, 2013
url: https://samza.apache.org/
Apache Samza is a distributed stream processing framework. It uses Apache Kafka for messaging, and Apache Hadoop YARN to provide fault tolerance, processor isolation, security, and resource management. Samza provides a callback-based “process message” API comparable to MapReduce. Samza manages snapshotting and restoration of a stream processor’s state. When the processor is restarted, Samza restores its state to a consistent snapshot. Samza is built to handle large amounts of state (many gigabytes per partition). Samza works with YARN to transparently migrate your tasks to another machine.

Akka Streams

orchestrators: Standalone
paradigm: event-driven
language: Scala 80.5% Java 17.4%
community: github forks: 1762
community: github first commit: Feb 15, 2009
url: http://doc.akka.io/docs/akka/2.4.9-RC1/scala/stream/
The purpose is to offer an intuitive and safe way to formulate stream processing setups such that we can then execute them efficiently and with bounded resource usage—no more OutOfMemoryErrors. In order to achieve this our streams need to be able to limit the buffering that they employ, they need to be able to slow down producers if the consumers cannot keep up. This feature is called back-pressure and is at the core of the Reactive Streams initiative of which Akka is a founding member.
For you this means that the hard problem of propagating and reacting to back-pressure has been incorporated in the design of Akka Streams already, so you have one less thing to worry about; it also means that Akka Streams interoperate seamlessly with all other Reactive Streams implementations (where Reactive Streams interfaces define the interoperability SPI while implementations like Akka Streams offer a nice user API).

Gearpump

orchestrators: Standalone, Yarn
paradigm: event-driven
language: Scala 83.0% JavaScript 9.0%
community: github forks: 17
community: github first commit: Jul 20, 2014
url: http://www.gearpump.io/overview.html

Gearpump is a real-time big data streaming engine. It is inspired by recent advances in the Akka framework and a desire to improve on existing streaming frameworks. Gearpump is event/message based and featured as low latency handling, high performance, exactly once semantics, dynamic topology update, Apache Storm compatibility, etc. Smaller project with a community at Intel.

Akka

orchestrators: Standalone
paradigm: event-driven
language: Scala 80.5% Java 17.4%
community: github forks: 1762
community: github first commit: Feb 15, 2009
url: http://akka.io/
Akka deliver the Actor Model as a programming abstraction to to build scalable, resilient and responsive applications. For fault-tolerance we adopt the "let it crash" model which the telecom industry has used with great success to build applications that self-heal and systems that never stop. Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.
Logstash / Elastic Searchorchestrators: Standaloneparadigm: event-driven (logstash), batch (elasticsearch)language: Ruby 83.8% Java 10.8%community: github forks: 1794community: github first commit: Aug 2, 2009url: https://www.elastic.co/
Log stash can also be used for streaming processing and aggregations. In particular log stash has a large collection of plugins for source extraction, data transformation, and output connectors. Elastic search can also be used to provide a micro batch analysis of the results. This approach is however weaker in terms of management of the resources and the guarantees for resiliency and durability.
Stormorchestrators: Standalone
paradigm: event-driven
language: Java 78.0% Clojure 10.9% Python 7.9%
community: github forks: 2428
community: github first commit: Sep 11, 2011
url: http://storm.apache.org/
Apache Storm is a distributed computation framework written predominantly in the Clojure programming language. Originally created by Nathan Marz and team at BackType the project was open sourced after being acquired by Twitter. It uses custom created “spouts” and “bolts” to define information sources and manipulations to allow batch, distributed processing of streaming data. The initial release was on 17 September 2011.
Storm application is designed as a “topology” in the shape of a directed acyclic graph (DAG) with spouts and bolts acting as the graph vertices. Edges on the graph are named streams and direct data from one node to another. Together, the topology acts as a data transformation pipeline. At a superficial level the general topology structure is similar to a MapReduce job, with the main difference being that data is processed in real time as opposed to in individual batches. Additionally, Storm topologies run indefinitely until killed, while a MapReduce job DAG must eventually end.
Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency distributed querying. Trident has support for joins, aggregations, grouping, functions, and filters. In addition to these, Trident adds primitives for doing stateful, incremental processing on top of any database or persistence store. Trident has consistent, exactly-once semantics, so it is easy to reason about Trident topologies.

Friday, July 8, 2016

The current state of Predictive Analytics: Strata Interview with Jenn Webb

During last Strata conference in London I had the pleasure to share some thoughs on the current state and the challenges of predictive analytics with Jenn Webb, O’Reilly Radar’s managing editor for Design, Hardware, Data, Business, and Emerging Tech spaces.
We touched on a number of subjects related to Data Science, Machine Learning and their applications: the advent of predictive APIs fueled by big data and machine learned models, the advantages and limits of deep learning, and the current and future applications of predictive analytics to financial services and marketing.


How would you describe the current state of predictive analytics? And what are the biggest challenges you’re facing in that space today?

The problem of data representation

In general, predictive analytics is based on the process of extracting and crafting features and discovering relevant patterns in data, and by doing predictive analytics can be used to learn models which can be applied to forecast and score new datapoints. Models work by translating the original data fields into variables - also known as features - which are better at describing the problem than the raw data fields.
For instance, if you would have a problem of classification and you would like to classify persons which have an inclination to buy a certain product, you would convert the available information into a set of features to best describe this problem. Those features could capture a higher meaning which is not directly recorded in the raw data, such as for instance the customer’s mood and intent or the customer’s propensity to buy. In short, I would say that predictive analytics is centered around the idea of extracting and defining good features, which conversely capture the patterns in the data.

Engineering features

Today, features are most of the time handcrafted, so one of the task of a data scientist/analyist today would actually consist in looking for structures in the dataset and understand and unravel which patterns are hidden in it. Consequentely, he/she would define a number of transformations which that would describe well the problem at hand with this new set of features.
The task of extracting and defining features which correctly represent the mechanism and structure embedded in the data is called feature engineering. Coming with a good set of features is still a very hard problem and relies on the creativity, the skills and the knowledge of data science teams. They need to provide ways about how to transform the original set of data fields into a meaningful and representative set of features.
Feature engineering can be a very hard task especially for cybersecurity or certain marketing datasets where patterns are sparse and variables are not very descriptive. In these cases, it’s quite hard to craft and engineer features because the understanding of those initial variables might not be sufficient to extract the right features or structures from the data.

Feature learning

Lately, next to feature engineering we see another approach where those features and patterns are actually themselves “machine learned”. Deep neural networks and hierarchical machine learning approaches are able to capture and identify semantically relevant features for a given problem in a automated, algorithmic way.

Automatic feature extraction lifts the task of feature engineering from data scientists at the cost of explainability. For instance, recently the Google Go has learned to play the game Go at professional level, however we don’t know exactly what are the patterns which had been learned by the machine in order to perform the task so well.

Predictive analytics

The biggest change which is happening in the last years in predictive analytics is indeed moving from narrow AI to strong AI by using feature extraction and layered machine learning models such as deep learning rather than feature engineering. These techniques were already available for the last 30 years but only now we are have sufficient data and sufficient computing power in order to perform this task well.
Diving a little bit deeper into this, in a recent talk you outlined machine learning techniques that businesses can implement today and you talked about how predictive models can be embedded as microservices. So what are some of the more accessible techniques that businesses can use and what are some of the more interesting microservices applications you’re seeing?
Microservices are data services for the hipster generation. Conceptually, microservices are still exposing a programming interface, but, when compared to traditional Service Oriented Architectures (SOA) they tend to be more intuitive, easier to learn, adopt and use. When done properly, microservices provide a better separation of concerns and better “concept of one” as each microservice would provide one purpose only rather than providing a large set of functions and uses.

A cognitive pyramid of API’s

Most of the APIs and micro services developed so far were catalogs. So a typical API would allow you (man and machine alike) an interface to insert/modify/delete records. Today, we see that API are starting to be stacked in layers, where at the bottom we find catalog-like API, dealing with data, which are conversely used by predictive APIs which deal with classification, prediction and recommendations, which eventually will lead to APIs which offer a full AI interface and interaction. The ultimate API layer would be a sentient or cognitive API layer which would answer complex reasoning tasks to facilitate some aspects of our life.
In my opinion, the quality will create a sort of “natural selection” mechanism for predictive APIs. Those API which will provide high quality predictions, classifications, and recomendations will thrive while other will be not used and will eventually fade away. In a sense, we can talk of a predictive API ecosystem where both machine and people provide feedback on the quality of the predictive services offered. This interaction produces more data which conversely allows those API to better learn. I would say, that we are experiencing a renaissance of data and automated data analyses which might have quite some significant implications on our future lives.
You mentioned marketing and one of your areas of expertise look at solutions around personalized marketing applications. What sorts of applications are you seeing today already and what do you expect to see in the future?
I believe that marketing is moving into even increasingly and deeper understanding of the customer and the customer’s context. In the past, we used to create models and segments, and product-customer offers based on simple analytics and marketeers’ hunches and intuitions and a limited understanding of the customer intent, also because of not all touchpoints and customer interactions were not fully captured.
Today, more and more information and customer data is captured through digital touchpoints, via devices, apps, and sensors. Therefore, it is possible today to create more complex and richer models for each individual. If I interact with an retail webapp, it’s very relevant to know, if I am browsing around looking for inspiration in a sort of “discovery mode” or a if I would like to quickly close a purchase, and I am in “analysis mode”, and interested in understanding the details of a given product.
Capturing the intent of a customer is also marketing. Slowly but surely, new marketing tools and services are coming up which provide more “cognitive” and proactive approach to marketing.
Shifting gears just a little bit, another area that you’ve been exploring is machine learning and financial services. You recently participated on a panel. What interesting applications are you seeing in that space?
There are a number of financial APIs and services which are based on machine learning.
Some are meant to speed up the customer journey and the financial service scrutiny process. Processes which would require days are brought down to minutes. This is possible because the models and the risk calculations involved are based on big data and machine learning algorihtm rather than only on the advisor or expert resources.
Others are meant to simplify and streamline our lives, by for instance providing a better overview on how and when we spend. These predictive techniques can potentially relieve us from the task of remebering when a payment is due and providing an indication of the “free to spend” money each month. ING a company where I worked in the past has recently released a new feature in their mobile app about predicting recurring payments.
These are just a few examples of machine learning applied to financial services. I am sure that we will see more of this data-driven tools in finance in the coming months.

Friday, November 20, 2015

Why is the SMACK stack all the rage lately?

In the past decade, we have learned how to handle and process increasingly bigger datasets. The focus so far has been mostly on how to collect, aggregate and crunch those large dataset in a reasonable amount of time.


However, today it's not always about Big Data. In many use cases, we need not one, but different data analysis paradigms. The following diagram shows how data can be categorized according to the way it's used.

Big, Fast and Event Data Processing

Time plays an important role in many data-driven products and applications. If you would put time on one axis and facts (transactions, customer events, touch points, interactions) on the other axis, you can see how data can be segmented for a specific data analysis.

Why Big Data?
High quality and robust models rely on the quality and the amount of available data. The size of a dataset can be decisive in order to identify outliers and to provide enough samples for the machine learning algorithms.


More data can help reducing the generalization error of the model, a common problem in data science called "overfitting". More data also allows us to detect weaker signals and small clusters which might not be visible in a smaller dataset. A problem usually defined as "data bias".


What's really important about Big Data, is to keep the productivity of the data pipeline as high as possible. So more nodes help, and so does more memory per node. A good improvement from the original MapReduce idea is to leverage data locality as much as possible in memory as done for instance in Spark.

Why Fast Data?

We understand that recent information is valuable, in particular, aggregated trends over recent facts, say of last minute, hours, day. By analyzing how those trends and aggregated signals evolve is the first step towards a new generation of machine aided apps, which will providing facts highlighting, personalized filtering on the event stream and, in general, better insight about the past events, and hinted prediction about future ones.

Why Event Processing?
Finally, you can follow a single stream of events related to a single user, customer, account. It allows us to build applications and products which quickly react on events as soon as they are coming in the processing pipeline. This way of computing is a must have for mission critical processing which must be completed within very short latency budgets.


This technologies can be combined in order to support model training, fast alerting/notifications and resource access via Web API’s

The Smack Stack

An architecture pattern which is quite popular today is the SMACK pattern.
It consists of the following technologies:
  •  Spark
  •  Mesos
  •  Akka
  •  Cassandra
  •  Kafka
Kafka takes care to event transport, Cassandra is used to persist and distribute events. While Spark and Akka can be combined to build various data analysis pipelines for both large data sets, event processing, in order to meet the required throughput, and latency constraints. Mesos serves as a task coordinator, facilitating the distribution of tasks and jobs in the cluster.

SMACK stack: how to put the pieces together
Spark
In contrast to Hadoop's two-stage disk-based MapReduce paradigm, Spark's multi-stage in-memory primitives provides performance up to 100 times faster for certain applications. By allowing user programs to load data into a cluster's memory and query it repeatedly, Spark is well-suited to machine learning algorithms, and exposes its api in Scala, Python, R and Java. The approach of Spark is to provide a unified interface that can be used to mix SQL queries, machine learning, graph analysis, and streaming (micro-batched) processing.

Mesos
Apache Mesos is an open-source cluster manager that was developed at the University of California, Berkeley. It provides efficient resource isolation and sharing across distributed applications, or frameworks. The software enables resource sharing in a fine-grained manner, improving cluster utilization.


Mesos uses a two-level scheduling mechanism where resource offers are made to frameworks (applications that run on top of Mesos). The Mesos master node decides how many resources to offer each framework, while each framework determines the resources it accepts and what application to execute on those resources. This method of resource allocation allows near-optimal data locality when sharing a cluster of nodes amongst diverse frameworks.

Akka

Akka is an open-source toolkit and runtime simplifying the construction of concurrent and distributed applications on the JVM. Akka supports multiple programming models for concurrency, but it emphasizes actor-based concurrency, with inspiration drawn from Erlang.


The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

Cassandra

Apache Cassandra is an open source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.


Cassandra also places a high value on performance. In 2012, University of Toronto researchers studying NoSQL systems concluded that "In terms of scalability, there is a clear winner throughout our experiments. Cassandra achieves the highest throughput for the maximum number of nodes in all experiments" although "this comes at the price of high write and read latencies."

Kafka

The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. The design is heavily influenced by advances in the area of transaction logs, e.g database commit logs. Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.

Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers.

Thursday, July 9, 2015

Clustering check-ins with Spark and Cassandra

Spark and Cassandra provide a good alternative to traditional big data architectures based on Hadoop hdfs, map-reduce and hive. In this post/tutorial I will show how to combine Cassandra and Spark to get the best of those two technologies and provide faster analytics on an operational low-latency data store.

As a demo, we will cluster check-in transactions in New York using the gowalla dataset. Clustering will be performed on Spark using the mllib library while extracting data from a cassandra keyspace using the Spark-Cassandra connector.

Cassandra

http://cassandra.apache.org/

Apache Cassandra is an open source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

Cassandra's data model is a partitioned row store with tunable consistency. Rows are organized into tables; the first component of a table's primary key is the partition key; within a partition, rows are clustered by the remaining columns of the key. Other columns may be indexed separately from the primary key. Tables may be created, dropped, and altered at runtime without blocking updates and queries.

Cassandra does not currently support joins or subqueries. Rather, Cassandra emphasizes denormalization through features like collections. However as shown in this post, this limitations can be overcome by using Spark and Cassandra together.

Spark

http://spark.apache.org/

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Spark Cassandra Connector


This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

Zeppelin (Apache incubator)


A web-based notebook that enables interactive data analytics. You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.
Zeppelin is really early days, but very promising and a nice alternative to ipython. Currently, it can only be build from sources. Check the master branch on github.

Install and Setup


The full tutorial, including install script, data munging and analysis with scikit-learn in python notebooks and data analysis with Spark zeppeling notebooks are available as a github project. Check out the code at https://github.com/natalinobusa/gowalla-spark-demo

Versions
Matching version is quite important when working with this set of technologies. The demo provided here below is tested on the following versions;
  • cassandra 2.1.7
  • spark 1.3.1
  • spark-cassandra connector 1.3
  • zeppelin (master github)
Requirements
  • jdk 1.7 or higher
  • git
  • npm
  • maven
  • curl
  • wget
Setup
You need to have Cassandra, Spark and Zeppeling running on your system in order to proceed. For those of you who want to give it a try, I have written a very rudimental script which downloads, builds and install all the necessary tools on a local directory (no admin required).

First clone the project from github, then run the install.sh script. Since the script will build Spark and Zeppelin from source, it's gonna take a while. I am planning to test the pre-built Spark when I have some more time.

git clone https://github.com/natalinobusa/gowalla-spark-demo.git gowalla-spark-demo
cd gowalla-spark-demo
./install.sh
Run it!
Again, I have prepared a script start-all.sh, which runs all the various ingredients. A similar script stop-all.sh is available to stop all services. Spark is configured to run in cluster mode (albeit on a single node), a password might be prompted, since the master and the workers of spark communicate via ssh.

Some headstart:

Cassandra can be accessed with the cqlsh command line interface. After installing and setting up the system, type ./apache-cassandra-2.1.7/bin/cqlsh from the root of the git project, to start the cql client.

Data
The venues id and names are taken from the following datasets:

loc-gowalla_totalCheckins.txt.gz | https://snap.stanford.edu/data/loc-gowalla.html
E. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011.

gowalla-dataset.zip | https://code.google.com/p/locrec/downloads/detail?name=gowalla-dataset.zip
The project is being developed in the context of the SInteliGIS project financed by the Portuguese Foundation for Science and Technology (FCT) through project grant PTDC/EIA-EIA/109840/2009.

Generating a sample extract using python


The csv files provided in this demo have been extracted using a python notebook. Some info about those files. The checking file checkins.csv has year, month, day and utc checkin date and provides the coordinates of the checking, the id of the venue and the id of the user. The venues.csv provides the id, the coordinates and the name of some of those historical venues.


Here below a few pics showing how gowalla app ramped up during 2010, and an overlay of the hottest checked in spots in New York during 2010, generated using google maps and highchart, clustered using scikit k-means. Please refer to the ipython notebook (available on the github repo https://github.com/natalinobusa/gowalla-spark-demo) for more details about how this graphs, and the csv files were generated.

Data Modeling with Cassandra

Create keyspace and tables


First, create the keyspace and the tables. Since in this demo we are focusing on queries and data science, we are not going to set up proper replication as it should be for an actual cassandra cluster keyspace.

CREATE KEYSPACE IF NOT EXISTS lbsn 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
For the checkins table, I am gonna model the primary key, using a partition key with year and month, and a cluster key of days and timestamp. For this specific example we use a time field expressing the time of the day in seconds, since the date is already available in the other partition and clustering keys.

This specific modeling of time will turn up useful later on since I can query a range of days in a month or a given time window during the day, by making using of the properties of the clustering key. Finally we add the uid as a clustering key to avoid overwrites in case that two users check in at exactly the same date and time.

CREATE TABLE checkins (
  year  int, 
  month int, 
  day   int,
  time  int,
  ts    timestamp,
  uid   bigint,
  lat   double,
  lon   double,
  vid   bigint,
  PRIMARY KEY ((year, month), day, time, uid)
) WITH CLUSTERING ORDER BY (day DESC, ts DESC);
The venue table is much easier to model and I will just provide a key based on the partition by venue id.

CREATE TABLE lbsn.venues (
  vid   bigint, 
  name  text,
  lat   double,
  lon   double,
  PRIMARY KEY (vid)
);

Loading data

Data can be loaded with the load_tables.cql script, located in datasets/cassandra-cql
See here below the gist of it:

COPY checkins (year,month,day,time,ts,uid,lat,lon,vid) FROM '../checkins.csv' ;
COPY lbsn.venues (vid,name,lat,lon) FROM '../venues.csv' ;

Queries with Cassandra CQL

/* query a given day */
select * from lbsn.checkins where year=2010 and month=9 and day=13 limit 5;

 year | month | day | time | uid    | lat      | lon       | ts                       | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  13 |   39 | 114507 | 40.80794 | -73.96409 | 2010-09-13 00:00:39+0000 |   78472
 2010 |     9 |  13 |  138 |  37571 | 40.62563 | -74.25763 | 2010-09-13 00:02:18+0000 | 1663634
 2010 |     9 |  13 |  147 |  11321 | 40.73396 | -73.99301 | 2010-09-13 00:02:27+0000 |   35372
 2010 |     9 |  13 |  597 | 165226 | 40.64388 | -73.78281 | 2010-09-13 00:09:57+0000 |   23261
 2010 |     9 |  13 |  641 |  12719 | 40.72978 | -74.00121 | 2010-09-13 00:10:41+0000 |  758447
/* range by date day */
select * from checkins where year=2010 and month=9 and day<16 and day>13 limit 5;

 year | month | day | time | uid    | lat      | lon       | ts                       | vid
------+-------+-----+------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  14 |   91 |    853 | 40.73474 | -73.87434 | 2010-09-14 00:01:31+0000 |  917955
 2010 |     9 |  14 |  328 |   4516 | 40.72585 | -73.99289 | 2010-09-14 00:05:28+0000 |   37160
 2010 |     9 |  14 |  344 |   2964 | 40.67621 | -73.98405 | 2010-09-14 00:05:44+0000 |  956870
 2010 |     9 |  14 |  359 |  48555 | 40.76068 | -73.98699 | 2010-09-14 00:05:59+0000 | 3026508
 2010 |     9 |  14 |  688 | 189786 | 40.71588 | -74.00663 | 2010-09-14 00:11:28+0000 | 1036251
You can range by day, as long as you provide the composite partition key (year,month). Also since we have defined two clustering keys we can also define range queries within a single day, as defined below:

/* within a day, range by time between 17:00:00 and 18:00:00*/
select * from checkins where year=2010 and month=9 and day=13 and time>61200 and time<64800  limit 5;

 year | month | day | time  | uid    | lat      | lon       | ts                       | vid
------+-------+-----+-------+--------+----------+-----------+--------------------------+---------
 2010 |     9 |  13 | 61365 |  22137 | 40.71551 | -73.99062 | 2010-09-13 17:02:45+0000 | 1091950
 2010 |     9 |  13 | 61437 | 159418 | 40.74139 | -73.98125 | 2010-09-13 17:03:57+0000 |  101317
 2010 |     9 |  13 | 61519 |  41643 | 40.76386 | -73.97293 | 2010-09-13 17:05:19+0000 |   12535
 2010 |     9 |  13 | 62031 |  11695 | 40.74103 | -73.99337 | 2010-09-13 17:13:51+0000 |  918429
 2010 |     9 |  13 | 62149 |  16328 | 40.70582 |  -73.9967 | 2010-09-13 17:15:49+0000 |   11794
Beware that clustering keys ordering is important. You will be able to query a range only on the clustering key provided as last. This is because clustering keys are internally stored in a tree-like fashion. The following query, for instance, is invalid:

/* INVALID!: range by date and by time */
select * from checkins where year=2010 and month=9 and day>13 and day<16 and time>61200 and time<64800  limit 5;
InvalidRequest: code=2200 [Invalid query] message="PRIMARY KEY column "time" cannot be restricted (preceding column "day" is restricted by a non-EQ relation)"

Spark time

First, open the zeppelin console on the url http://localhost:8888/, then open up the notebook Location Based Social Networks: Spark and Cassandra.


You can go through the demo yourself, I will present here below an extract of what you can do with spark, cassandra, and the cassandra driver for spark.

Load The Cassandra-Spark Connector Library
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

val checkins = sc.cassandraTable("lbsn", "checkins")
val venues   = sc.cassandraTable("lbsn", "venues")
Basic operations
Table import can be done as RDD's or as Spark SQL/DataFrames. When done as RDD's, it's the developer responsibility to provide the right casting from the CassandraRow type to scala native types.
Checkins
checkins.count()
checkins.first()
res4: Long = 138449
res5: com.datastax.spark.connector.CassandraRow = CassandraRow{year: 2010, month: 7, day: 1, time: 11, uid: 74226, lat: 40.75548535, lon: -73.99116677, ts: 2010-07-01 02:00:11+0200, vid: 1365122}
Venues
venues.count()
venues.first()
res7: Long = 28699
res8: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 754108, lat: 40.7480915333, lon: -73.9891771, name: My Suit NY }
Query the collection using the cassandraRow API
The where clause translates back into cql, therefore it would allow selection on partition key and range on clustering key.
// how many checkins in new york during the valentine weekend
checkins.where("year = 2010 and month=2 and day>12 and day<15").count()
res12: Long = 887
Where is Central Park?
venues.where("vid = 7239827").first()
res14: com.datastax.spark.connector.CassandraRow = CassandraRow{vid: 7239827, lat: 40.758265613, lon: -73.994356727, name: Central Park Manhattan }
Column selection
venues.select("vid", "name").take(10).foreach(println)
CassandraRow{vid: 754108, name: My Suit NY }
CassandraRow{vid: 249755, name: UA Court Street Stadium 12}
CassandraRow{vid: 6919688, name: Nasty London!}
CassandraRow{vid: 521775, name: Calico Jacks Cantina}
CassandraRow{vid: 866136, name: Pho Bac}
CassandraRow{vid: 3841997, name: cave of the ridiculous}
CassandraRow{vid: 246266, name: Sky Asian Bistro}
CassandraRow{vid: 6762177, name: All Star Car Wash}
CassandraRow{vid: 797962, name: Blush}
CassandraRow{vid: 535067, name: Thompson Lower East Side}
Converting to native scala types, for instance tuples
The CassandraTable produces an RDD of type CassandraRow. If you want to move away from the CassandraRow type you can map to other types, using the usual spark transformations (map, reduce, flatmap, etc.)
How many checked in Soho?
// soho bounding box: -74.0055, 40.7187, -73.9959, 40.7296
// from CassandraRow to scala Tuples
val coords = checkins.map(row => (row.getDouble("lat"), row.getDouble("lon")))

coords.
  filter(coord => coord._1 > 40.7187 & coord._1 < 40.7296).
  filter(coord => coord._2 > -74.0055 & coord._2 < -73.9959).
  count()
res596: Long = 4825
filtering on Columns, Map to Tuples
// from CassandraRow to some other object

// via cassandraRow methods
venues.select("vid", "name").map(row => (row.getLong("vid"), row.getString("name"))).take(10).foreach(println)
(1162492,Hillary Flowers)
(1165309,Roadside Entertainment)
(703772,Pan Aqua)
(396792,O'Reilly's Pub)
(447153,Kam Wei Kitchen)
(7558564,SouthWestNY)
(7410669,Windsor Hotel)
(6856641,New Rochelle High School)
(267286,US Flag Plaza, Liberty Park, NJ)
(262669,Galloway Castle)
Filtering on columns, Map to case class
// by defining a case class
case class Venue(vid: Long, name: String)
sc.cassandraTable[Venue]("lbsn", "venues").take(10).foreach(println)
defined class Venue
Venue(1162492,Hillary Flowers)
Venue(1165309,Roadside Entertainment)
Venue(703772,Pan Aqua)
Venue(396792,O'Reilly's Pub)
Venue(447153,Kam Wei Kitchen)
Venue(7558564,SouthWestNY)
Venue(7410669,Windsor Hotel)
Venue(6856641,New Rochelle High School)
Venue(267286,US Flag Plaza, Liberty Park, NJ)
Venue(262669,Galloway Castle)
SQL schema extraction with Spark SQL
By using the CassandraSQLContext, the schema is extracted from a sample of the read data. Therefore no explicit casting/mapping is required, and you can move faster to standard scala native types.
Table import with Spark SQL/DataFrame
// by converting to a spark sql/dataframe
import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
val df_venues = cc.sql("select vid, name from lbsn.venues")

df_venues.show(10)
vid     name                
754108  My Suit NY          
249755  UA Court Street S...
6919688 Nasty London!       
521775  Calico Jacks Cantina
866136  Pho Bac             
3841997 cave of the ridic...
246266  Sky Asian Bistro    
6762177 All Star Car Wash   
797962  Blush               
535067  Thompson Lower Ea...
Cassandra-side vs Spark-side Filtering
When the query is compatible with cql filtering and selection is executed fully on the cassandra side. By using the CassandraSQL context you can also define a query which would require also some filtering on the spark side.
Cassandra-side filtering
As RDD of CassandraRows
val checkins = sc.cassandraTable("lbsn", "checkins").select("ts", "uid", "vid").where("year=2010 and month=9 and day<16 and day>13")
checkins.take(10).foreach(println)
CassandraRow{ts: 2010-09-14 02:01:31+0200, uid: 853, vid: 917955}
CassandraRow{ts: 2010-09-14 02:05:28+0200, uid: 4516, vid: 37160}
CassandraRow{ts: 2010-09-14 02:05:44+0200, uid: 2964, vid: 956870}
CassandraRow{ts: 2010-09-14 02:05:59+0200, uid: 48555, vid: 3026508}
CassandraRow{ts: 2010-09-14 02:11:28+0200, uid: 189786, vid: 1036251}
CassandraRow{ts: 2010-09-14 02:14:33+0200, uid: 33841, vid: 1502210}
CassandraRow{ts: 2010-09-14 02:16:07+0200, uid: 12719, vid: 1078872}
CassandraRow{ts: 2010-09-14 02:18:17+0200, uid: 105012, vid: 341495}
CassandraRow{ts: 2010-09-14 02:19:24+0200, uid: 1214, vid: 1205097}
CassandraRow{ts: 2010-09-14 02:22:20+0200, uid: 189786, vid: 541535}

As a SchemaRDD
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and day<16 and day>13")
checkins.show(10)
ts                   uid    vid    
2010-09-14 02:01:... 853    917955 
2010-09-14 02:05:... 4516   37160  
2010-09-14 02:05:... 2964   956870 
2010-09-14 02:05:... 48555  3026508
2010-09-14 02:11:... 189786 1036251
2010-09-14 02:14:... 33841  1502210
2010-09-14 02:16:... 12719  1078872
2010-09-14 02:18:... 105012 341495 
2010-09-14 02:19:... 1214   1205097
2010-09-14 02:22:... 189786 541535

Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where day<16 and day>13")
checkins.show(10)
ts                   uid   vid   
2010-07-14 02:00:... 9024  107406
2010-07-14 02:01:... 79128 147618
2010-07-14 02:02:... 11372 187679
2010-07-14 02:03:... 23665 19762 
2010-07-14 02:09:... 89502 299617
2010-07-14 02:10:... 33494 244214
2010-07-14 02:13:... 33843 11975 
2010-07-14 02:14:... 84107 780336
2010-07-14 02:15:... 718   54022 
2010-07-14 02:16:... 578   268521
Combined Cassandra-side & Spark-side filtering
val checkins = cc.sql("select ts, uid, vid from lbsn.checkins where year=2010 and month=9 and vid=57871")
checkins.show(10)
ts                   uid  vid  
2010-10-01 01:57:... 1684 57871
Joining cassandra tables in Spark
// joining tables (the easy way, and just as fast)

val df_venues   = cc.sql("select vid, name from lbsn.venues").as("venues").cache()
val df_checkins = cc.sql("select ts, uid, lat, lon, vid from lbsn.checkins").as("checkins").cache()

val checkins_venues = df_checkins.join(df_venues, $"checkins.vid" === $"venues.vid", "inner").select("ts", "uid", "lat", "lon", "checkins.vid","name")

checkins_venues.show(10)
ts                   uid    lat           lon                vid     name                
2010-07-01 02:47:... 578    40.7490532543 -73.9680397511     11831   United Nations      
2010-07-02 18:27:... 991    40.7188502243 -73.99594579149999 818431  OK 218              
2010-07-03 02:07:... 34359  40.7348441565 -73.9995288849     123831  Kingswood           
2010-07-03 18:58:... 578    40.6838680433 -73.9786720276     105831  Pacific Street St...
2010-07-03 19:53:... 2737   40.6906938667 -73.9956976167     197431  Floyd, NY           
2010-07-03 23:11:... 49393  40.6997066969 -73.8085234165     28031   Jamaica LIRR Station
2010-07-04 03:40:... 119601 40.7490532543 -73.9680397511     11831   United Nations      
2010-07-05 06:49:... 128772 40.8344876113 -73.9385139942     1241631 Morris Jumel Mansion
2010-07-06 07:36:... 38706  40.7490532543 -73.9680397511     11831   United Nations      
2010-07-06 17:12:... 35105  40.6997066969 -73.8085234165     28031   Jamaica LIRR Station
Aggregate and group by
// top 10 checked in venues
checkins_venues.groupBy("name").count().sort($"count".desc).show(10)
name                 count
LGA LaGuardia Air... 1673 
JFK John F. Kenne... 1643 
Starbucks            1316 
Starbucks Coffee     1114 
EWR Newark Libert... 1084 
Times Square         1084 
Grand Central Ter... 1002 
Dunkin' Donuts       507  
Madison Square Ga... 426  
The Museum of Mod... 392
Machine Learning: k-means Clustering
// run k-means clustering on the data

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

val locs = checkins_venues.select("lat","lon").map(s => Vectors.dense(s.getDouble(0), s.getDouble(1))).cache()

val numClusters = 50
val numIterations = 20
val clusters = KMeans.train(locs, numClusters, numIterations)

val WSSSE = clusters.computeCost(locs)
println("Within Set Sum of Squared Errors = " + WSSSE + "\n")

clusters.clusterCenters.foreach(println)
WSSSE: Double = 11.051512212406834
Within Set Sum of Squared Errors = 11.051512212406834

[40.72760996107598,-73.98843612291218]
[40.64528778525469,-74.23562853133915]
[40.76587038674041,-73.982835425778]
[40.64524967416557,-73.78403168243933]
[40.80733971996237,-73.95276889804212]
[40.69057039696831,-74.17928359461052]
[40.80956608048502,-73.80586643992105]
[40.826054341673164,-74.21764724121967]
...
Machine Learning: Score predictions

import org.apache.spark.sql.functions.udf

val func = (lat:Double, lon:Double) => clusters.predict(Vectors.dense(lat,lon))
val sqlfunc = udf(func)

// add predictions as extra column, by using a user define function
// remember that clusters closes over the udf, and is broadcasted to the various executors
val locs_cid = checkins_venues.withColumn("cluster", sqlfunc(checkins_venues("lat"), checkins_venues("lon")))

locs_cid.show(10)
ts                   uid    lat           lon                vid     name                 cluster
2010-07-01 02:03:... 10231  40.663200646  -73.984763295      1225113 Thistle Hill Tavern  34     
2010-07-01 02:06:... 4907   40.74101965   -73.99416911670001 1078263 Limelight Marketp... 47     
2010-07-01 02:10:... 4929   40.747507     -73.989425         1175513 La Rosa Cigars       47     
2010-07-01 02:14:... 26851  40.76823395   -73.95315415       164621  David Copperfields   38     
2010-07-01 02:17:... 4929   40.74695265   -73.9857679833     141918  J.J. Hat Center      47     
2010-07-01 02:23:... 4929   40.7484436586 -73.9857316017     12313   Empire State Buil... 47     
2010-07-01 02:25:... 24712  40.6752607557 -73.9813770354     296249  Al di lá Trattoria   34     
2010-07-01 02:26:... 6639   40.7373236513 -73.9910423756     261515  Havana Central       47     
2010-07-01 02:27:... 158638 40.82951468   -73.92625695       153115  Yankees Ticket Of... 44     
2010-07-01 02:27:... 124703 40.826946067  -73.92811775210001 11720   Yankee Stadium       44
Spark gymnastics with Data Frames
val df = locs_cid.select("cluster", "name").
    groupBy("cluster", "name").
    agg(Map("name" -> "count")).
    sort($"cluster", $"COUNT(name)".desc).cache()

df.show()
cluster name                 COUNT(name)
0       Starbucks Coffee     172        
0       Regal Union Squar... 118        
0       Whole Foods Marke... 95         
0       AOL HQ               92         
0       Strand Bookstore     85         
0       Katz's Delicatessen  79         
0       Ippudo               73         
0       Trader Joe's         70         
0       Momofuku Noodle Bar  66         
0       Whole Foods Marke... 62         
0       The Grey Dog's Co... 59         
0       Webster Hall         58         
0       Momofuku Milk Bar    58         
0       Veselka              55         
0       Schiller's Liquor... 54         
0       Think Coffee         53         
0       Foursquare HQ        52         
0       Astor Place Station  50         
0       Momofuku Ssäm Bar    48         
0       Max Brenner          48
Spark Data Frames to RDD's
// from dataframe to rdd
val r = df.rdd.map(row => (row.getInt(0), (row.getString(1), row.getLong(2))) ).cache()
r.first
res67: (Int, (String, Long)) = (0,(Starbucks Coffee,172))
Spark RDD's gymnastics: groupByKey, map, flatMap
val topNPerGroup = r.groupByKey.map { 
   case (k, v) => 
       k -> v.toList.sortBy(-_._2).take(3)
}

topNPerGroup.collect.foreach(println)
(0,List((Starbucks Coffee,172), (Regal Union Square Stadium 14,118), (Whole Foods Market (Union Square),95)))
(1,List((Nuno's Pavillion,48), (Elizabeth Train Station,19), (Quick Check,17)))
(2,List((Starbucks,249), (Radio City Music Hall,237), (Columbus Circle,174)))
(3,List((JFK John F. Kennedy International,1643), (JFK Terminal 5,325), (Terminal 4 at JFK,115)))
(4,List((Columbia University,110), (The City College Of New York - NAC Building,58), (Harlem125th Street Metro North Station,49)))
(5,List((EWR Newark Liberty International,1084), (EWR Terminal A,167), (EWR Terminal C,162)))
(6,List((Throgs Neck Bridge Toll,117), (Throgs Neck Bridge,86), (Whitestone Bridge,25)))
(7,List((RHM Headquarters,36), (Egan's,22), (Edgemont Memorial Park,18)))
(8,List((Dunkin' Donuts,88), (Verrazano-Narrows Bridge,77), (New York Sports Club ,59)))
(9,List((Queensboro Plaza Station,51), (Bohemian Hall & Beer Garden,36), (Studio Square Beer Garden,31)))
(10,List((Secaucus Junction,113), (Xchange,16), (Starbucks,15)))
...
Flattening hierarchical RDD to a single RDD
// flattening this to a single list
val flattenedTopNPerGroup = 
    topNPerGroup.flatMap({case (k,v) => v.map(s => (k,s))})

flattenedTopNPerGroup.collect.foreach(println)
(0,(Starbucks Coffee,172))
(0,(Regal Union Square Stadium 14,118))
(0,(Whole Foods Market (Union Square),95))
(1,(Nuno's Pavillion,48))
(1,(Elizabeth Train Station,19))
(1,(Quick Check,17))
(2,(Starbucks,249))
(2,(Radio City Music Hall,237))
(2,(Columbus Circle,174))
(3,(JFK John F. Kennedy International,1643))
(3,(JFK Terminal 5,325))
(3,(Terminal 4 at JFK,115))
(4,(Columbia University,110))
(4,(The City College Of New York - NAC Building,58))
(4,(Harlem125th Street Metro North Station,49))
(5,(EWR Newark Liberty International,1084))
(5,(EWR Terminal A,167))
(5,(EWR Terminal C,162))
(6,(Throgs Neck Bridge Toll,117))
(6,(Throgs Neck Bridge,86))
(6,(Whitestone Bridge,25))
(7,(RHM Headquarters,36))
(7,(Egan's,22))
(7,(Edgemont Memorial Park,18))
(8,(Dunkin' Donuts,88))
(8,(Verrazano-Narrows Bridge,77))
(8,(New York Sports Club ,59))
(9,(Queensboro Plaza Station,51))
(9,(Bohemian Hall & Beer Garden,36))
(9,(Studio Square Beer Garden,31))
(10,(Secaucus Junction,113))
(10,(Xchange,16))
(10,(Starbucks,15))