Skip to content

Commit

Permalink
[SPARK-20677][MLLIB][ML] Follow-up to ALS recommend-all performance PRs
Browse files Browse the repository at this point in the history
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
  • Loading branch information
Nick Pentreath committed May 16, 2017
1 parent 6af7b43 commit 25b4f41
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[spark] object BLAS extends Serializable {
@transient private var _nativeBLAS: NetlibBLAS = _

// For level-1 routines, we use Java implementation.
private def f2jBLAS: NetlibBLAS = {
private[ml] def f2jBLAS: NetlibBLAS = {
if (_f2jBLAS == null) {
_f2jBLAS = new F2jBLAS
}
Expand Down
26 changes: 7 additions & 19 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContex
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -363,7 +364,7 @@ class ALSModel private[ml] (
* relatively efficient, the approach implemented here is significantly more efficient.
*
* This approach groups factors into blocks and computes the top-k elements per block,
* using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]].
* using dot product and an efficient [[BoundedPriorityQueue]] (instead of gemm).
* It then computes the global top-k by aggregating the per block top-k elements with
* a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data.
* This is the DataFrame equivalent to the approach used in
Expand Down Expand Up @@ -393,31 +394,18 @@ class ALSModel private[ml] (
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, Int, Float)](m * n)
var j = 0
var i = 0
val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
/*
* The below code is equivalent to
* `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
* This handwritten version is as or more efficient as BLAS calls in this case.
*/
var score = 0.0f
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
val (dstId, score) = pqIter.next()
output(j + i) = (srcId, dstId, score)
pq.foreach { case (dstId, score) =>
output(i) = (srcId, dstId, score)
i += 1
}
j += n
pq.clear()
}
output.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] object BLAS extends Serializable with Logging {
@transient private var _nativeBLAS: NetlibBLAS = _

// For level-1 routines, we use Java implementation.
private def f2jBLAS: NetlibBLAS = {
private[mllib] def f2jBLAS: NetlibBLAS = {
if (_f2jBLAS == null) {
_f2jBLAS = new F2jBLAS
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
Expand All @@ -33,7 +31,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.BLAS
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -263,6 +261,19 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

/**
* Makes recommendations for all users (or products).
*
* Note: the previous approach used for computing top-k recommendations aimed to group
* individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
* be used for efficiency. However, this causes excessive GC pressure due to the large
* arrays required for intermediate result storage, as well as a high sensitivity to the
* block size used.
*
* The following approach still groups factors into blocks, but instead computes the
* top-k elements per block, using dot product and an efficient [[BoundedPriorityQueue]]
* (instead of gemm). This avoids any large intermediate data structures and results
* in significantly reduced GC pressure as well as shuffle data, which far outweighs
* any cost incurred from not using Level 3 BLAS operations.
*
* @param rank rank
* @param srcFeatures src features to receive recommendations
* @param dstFeatures dst features used to make recommendations
Expand All @@ -277,46 +288,22 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(srcFeatures)
val dstBlocks = blockify(dstFeatures)
/**
* The previous approach used for computing top-k recommendations aimed to group
* individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
* be used for efficiency. However, this causes excessive GC pressure due to the large
* arrays required for intermediate result storage, as well as a high sensitivity to the
* block size used.
* The following approach still groups factors into blocks, but instead computes the
* top-k elements per block, using a simple dot product (instead of gemm) and an efficient
* [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results
* in significantly reduced GC pressure as well as shuffle data, which far outweighs
* any cost incurred from not using Level 3 BLAS operations.
*/
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
var i = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
/*
* The below code is equivalent to
* `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)`
* This handwritten version is as or more efficient as BLAS calls in this case.
*/
var score: Double = 0
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
output(j + i) = (srcId, pqIter.next())
pq.foreach { case (dstId, score) =>
output(i) = (srcId, (dstId, score))
i += 1
}
j += n
pq.clear()
}
output.toSeq
Expand Down

0 comments on commit 25b4f41

Please sign in to comment.