The personalized video recommendations which are served up by Netflix are a glossy front page which is a result of Research Engineers like Ehtsham Elahi rolling his sleeves up for machine learning challenges every day. Machine Learning is a core part of the Netflix product with everything impression being recommended by an algorithm.

One of the challenges for Netflix is scale which is where distributed machine learning helps out. GraphX is Apache Spark’s API for graphs and graph-parallel computation which for large-scale machine learning challenges allows an iterative approach to graph computation with a fast runtime and a growing library of algorithms.

The problems that Elahi talks about are twofold, to generate the ranking of items with respect to a given item from an interaction graph with graph diffusion algorithms, and finding clusters of related items using co-occurrence data, in particular probabilistic graphical models (Latent Dirichlet Allocation).

Elahi explains how Spark and GraphX are addressing these challenges as Netflix –

So how do you represent iterative algorithms in GraphX? Let’s say your data is represented as a graph where you have some vertices which have some attributes or properties associated with them called vertex attributes and then there are edges between those vertices and there are properties associated with the edges as well encoding some form of relationship between vertices called edge attributes.

GraphX lets you represent this graph as a distributed graph using RDDs which are called vertex RDDs, edge RDDs etc and then it provides APIs and methods to propagate and update these attributes and that’s what these iterative machine learning algorithms do, they update and propagate these attributes in every iteration and create new graphs and that’s how they follow and we’ll see a couple of examples of this sort in the presentation.

So let’s start with the graph diffusion algorithms, Topic Sensitive Pagerank is a popular graph diffusion algorithm, it lets us capture the importance of all the vertices in a graph with respect to a particular vertex. Taking the example of the Netflix dataset, imagine that all the titles in Netflix and the tags and metadata associated with those titles, they form the vertices in a graph.

The edges between those vertices capture the relationship of the titles with the metadata and also among the titles as well like how frequently they are played together. When you build such a graph and you run Topic Sensitive Pagerank and make a query for the tag ‘Seattle’ you would come up with a ranking which looks (something like this). It looks like a reasonable ranking for the tag Seattle because ‘Sleepless in Seattle’, set in Seattle, ‘Killing’ is set in Seattle, ‘Grey’s Anatomy’ and ‘Frasier’ as well so it looks like a reasonable answer for Seattle.

Looking at this example in a little more detail, let’s say with your graph the tag Seattle is represented as a vertex and also the titles which have some relationship with this tag and also among themselves as well like the titles that are also represented as vertices in this graph. We want to run Topic Sensitive Pagerank by activating the node ‘Seattle’ – how do you do that?

Well, with some probability, we follow the outbound edges and this probability is actually a parameter in the model or otherwise we stay at the origin. The vertex attributes in this graph are actually the activation probabilities of these nodes which basically mean that what is the probability of the node getting activated from any given starting node.

They are represented by the strength of the colors of these nodes and you apply this procedure for all the intermediate nodes as well and then if you follow the math you’ll figure out that this node feature is highlighted and accumulates the highest activation probability because it is connected with three parts to the starting node. Then you repeat this process until you reach some form of convergence.

So what can GraphX provide for this kind of algorithm? Well, let’s say you want to generate these rankings for all the vertices in a graph, as you can imagine easily that running that one by one would be slow so GraphX can allow you to run all of these propagations in parallel. How do you do that? You would create a graph where the attributes associated with the vertex are not scalers but rather a vector of activation probabilities where those activation probabilities that vector basically encodes the probability of reaching that node from any node in the graph.

When you apply this algorithm on the Netflix dataset you come up with these nice examples where for example if you make a query for Matrix, the ranking that you get over all the vertices looks something like this, which is very reasonable because Matrix and movies which are similar to Matrix are at top of the ranking. If you make a query for the tag Zombies, not necessarily a movie but a tag or piece of metadata you get this ranking which is about TV shows and movies which are about Zombies and then I already described the example for Seattle. So these are the kind of examples that you would get by applying graph diffusion using GraphX on the Netflix dataset.

Moving on to Distributed Clustering Algorithms, LDA, which is a popular clustering/latent factors model which when applied on the Netflix dataset it captures clusters of topics on related videos for example when applied on a dataset from Netflix which could be the search history of users or highly rated items of users, you find out a topic of (e.g.) animal documentaries.

Elahi goes into a lot more statistical depth about LDA and how it allows Netflix to serve up tidy clusters of recommended results although a transcript ideally needs to coincide with live slide explanation.

**Ehtsham Elahi, Senior Research Engineer, Personalization Science and Engineering Group at Netflix at MLconf SEA – 5/01/15**from

**SessionsEvents**

In describing the GraphX performance comparison against other implementations, Elahi runs through more graph comparisons so, again ideally this needs accompanying slides, above or video below –

Just to point out the implementations we have for this comparison, we have the Topic Sensitive Pagerank, so one implementation is the GraphX implementation. The other implementation is a distributed multithreaded implementation, a Scala/Breeze code library which is triggered by Spark and for LDA one implementation is GraphX and the alternative implementation is in single machine multithreaded java code and all the implementations are Netflix internal code, we are only using GraphX and Spark for distributed implementations.

This is what the performance comparison looks like, we performed experiments for both Topic Sensitive Pagerank and Distributed Gibbs Sampling and I’ll described them both one by one.

The graph on the left relates to Topic Sensitive Pagerank, we use the open source dbpedia dataset for these experiments, the x-axis represents the number of vertices you propagated in parallel and the y-axis represents the time to make a certain number of iterations and the number of iterations are fixed.

We have two implementations being compared here, one is the Scala/Breeze implementation and the other one is the GraphX implementation. For both of these implementations we have two versions, one is running on a cluster twice the size of, for example, GraphX. The orange curve is running on a cluster twice the size.

So, a couple of interesting observations to be made from this graph, for GraphX with the number of vertices, you’ll see a sub-linear rise in time, whereas in the alternative implementation we see a linear rise in time which means that there is a cross over point in this graph after which the GraphX implementation is faster.

We also observed that doubling the size of the cluster leads to a two times speed up in the alternative implementation because the data is roughly getting split in half for every instance to process whereas it leads to only 1.2x speed up in GraphX. Then when we perform a large number of vertices of propagation in parallel it leads to a large amount of shuffle data which causes failure in GraphX, for example a smaller cluster failed when we performed 10,000 vertices in parallel and the bigger cluster failed at 100,000 vertices propagated in parallel whereas the alternative implementation just keeps going.

Now I’ll talk about the second set of experiments which were about Distributed Gibbs Sampling for LDA. It’s an internal Netflix dataset and the number of topics for this experiment was chosen to be 100. The x-axis represents the number of edges because the run-time is proportionate to the number of edges and number of totals you have in the dataset. The y-axis is time and again we have two implementations being compared here, one single machine multithreaded java code and the other one is GraphX implementation.

In the single machine multithreaded java code we have two versions, one is the uncollapsed Gibbs Sampler which was the last one that I presented and the blue curve is the semi collapsed Gibbs Sampler. Just to point out that with the GraphX set up, it has eight times the resources of the multi core set up. Just to give a sense of the scale of this graph, Databricks performed some experiments with the Wikipedia dataset that had roughly 1.1billion edges, they were using a cluster of half the capacity of what I am using here and they reported times of up to three minutes or something per iteration. You can see the experiments that we are performing are at least five times bigger than those experiments.

Again, you notice that GraphX, for a very large datasets does overtake the multicore uncollapsed implementation but you can see that point lies very far on the right whereas the semi collapsed implementation, which is actually a better implementation approach for single machine algorithms because you have access to all the parameters which are available in memory, so that outperforms all the implementations throughout the entirety of the graph.

**Lessons Learned**

So what are the main lessons we learned through these experiments?

When you are using GraphX, it is very important that you determine where the crossover point is for your iterative machine learning algorithm because only beyond that point you’ll see benefit from platforms like GraphX. Then we also found out that GraphX lets you throw more hardware at a problem which leads to roughly 1.2x speed up in your algorithms.

Beyond machine learning GraphX is very useful for other processing, graph processing tasks for example data pre-processing, it has very nice ability to form efficient joins in your dataset.

Another important thing when you are writing iterative algorithms using GraphX or any other distributed platform is that there is an inherent degree of failure, even if you have a 99.9% success rate to make a single iteration, what is the probability you’ll be able to make 1,000 iterations for your machine learning algorithm because 1,000 iterations are pretty common in Gibbs sampling.

You can guess this is just supplying a metric distribution on the failure rate that you’ll only have a chance of 36% of successfully making 1,000 iterations so it’s extremely important to regularly save the state of your iterative algorithm after small iterations.

Then at the end, multi-core machine learning, when you have machines available, r3 xl which is an Amazon AWS instance which lets you run 32 threads and has 220GB of memory, they are pretty efficient given the fact you are able to store all of your data in memory of a single machine, so these are the main lessons we had from our experiments with working with GraphX and I hope you find the experiments we performed to be interesting