Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20677][MLLIB][ML] Follow-up to ALS recommend-all performance PRs #17919

Closed
wants to merge 4 commits into from

Conversation

MLnick
Copy link
Contributor

@MLnick MLnick commented May 9, 2017

Small clean ups from #17742 and #17845.

How was this patch tested?

Existing unit tests.

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76665 has finished for PR 17919 at commit 0b1eaa3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor Author

MLnick commented May 9, 2017

cc @mpjlu @jkbradley

@mpjlu
Copy link

mpjlu commented May 9, 2017

Thanks, I am ok for this change.

@jkbradley
Copy link
Member

taking a look

Copy link
Member

@jkbradley jkbradley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just 1 question/comment

Thanks for doing this!

@@ -451,6 +439,8 @@ class ALSModel private[ml] (
@Since("1.6.0")
object ALSModel extends MLReadable[ALSModel] {

@transient private[recommendation] val _f2jBLAS = new F2jBLAS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require significant initialization? You could use org.apache.spark.ml.linalg.BLAS.f2jBLAS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more or less than using ml.linalg.BLAS - I did think of that but the var needs to be exposed as private[ml]. If we're ok with that then it'll be slightly cleaner to use that, yes.

@MLnick
Copy link
Contributor Author

MLnick commented May 11, 2017

Just decided to use ml.BLAS and expose f2jBLAS as ml / mllib private.

@SparkQA
Copy link

SparkQA commented May 11, 2017

Test build #76792 has finished for PR 17919 at commit 9dfad1b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor Author

MLnick commented May 11, 2017

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented May 11, 2017

Test build #76801 has finished for PR 17919 at commit 9dfad1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor Author

MLnick commented May 16, 2017

Merged to master/branch-2.2

asfgit pushed a commit that referenced this pull request May 16, 2017
Small clean ups from #17742 and #17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes #17919 from MLnick/SPARK-20677-als-perf-followup.

(cherry picked from commit 25b4f41)
Signed-off-by: Nick Pentreath <[email protected]>
@asfgit asfgit closed this in 25b4f41 May 16, 2017
@auskalia
Copy link

Hi, @MLnick, We find that just do repartition for userFeatures and productFeatures can improve the efficiency significantly on the ALS recommendForAll().

Here is our procedure:

  1. Train ALS model
  2. Save model as hdfs file
  3. Submit new spark mission
  4. Load model from hdfs file
  5. do recommendForAll()

Firstly, when you submit spark mission with "spark.default.parallelism=x" the stage for recommendForAll will be splited the number of x^2 tasks, due to the partition number of userFeatures is equal to x and productFeatures number is equal to x. This is not reasonable. Too much network I/O operation to finish the stage.

Secondly, submitting spark mission with "spark.dynamicAllocation.enabled=true" may cause data uneven distribution on executors. We found that some executors may take n GB data(who start early), but others may just take m MB data(who start later). This may cause a few executors execute tasks slowly with high GC or crash by OOM.

We did some test to repartition on the userFeatures and productFeatures. Here is it.

case 1:
users: 480 thousand, products: 4 million, rank 25
executors: 600, default.parallelism: 100, executor-memory: 20G, executor-cores: 8
without repartition, recommendforall spent 24min
after repartition, userFeatures.repartition(100), productFeatures.repartition(100) , recommendforall spent 8min
result: 3x faster

case 2:
users: 12 million, products: 7.2 million, rank 20
executors: 800, default.parallelism: 600, executor-memory: 16G, executor-cores: 8
without repartition, recommendforall spent 16 hours
after repartition, userFeatures.repartition(800), productFeatures.repartition(100) recommendforall spent 30 mins
result: 32x faster

Note that the partition number of userFeatures and productFeatures may be different.

Above test based on the fix #17742 and #17845.

We strongly suggest that provide interface to user to have a chance to do re-partition for 2 kinds of features.

Thanks

Here is the patch for mllib, with 2 new public function of MatrixFactorizationModel

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index d45866c..d4412f7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -56,8 +56,8 @@ import org.apache.spark.util.BoundedPriorityQueue
@SInCE("0.8.0")
class MatrixFactorizationModel @SInCE("0.8.0") (
@SInCE("0.8.0") val rank: Int,

  • @SInCE("0.8.0") val userFeatures: RDD[(Int, Array[Double])],
  • @SInCE("0.8.0") val productFeatures: RDD[(Int, Array[Double])])
  • @SInCE("0.8.0") var userFeatures: RDD[(Int, Array[Double])],
  • @SInCE("0.8.0") var productFeatures: RDD[(Int, Array[Double])])
    extends Saveable with Serializable with Logging {

require(rank > 0)
@@ -154,6 +154,39 @@ class MatrixFactorizationModel @SInCE("0.8.0") (
predict(usersProducts.rdd.asInstanceOf[RDD[(Int, Int)]]).toJavaRDD()
}

  • /**
    • Repartition UserFeatures
    • @param partitionNum the value you want to do reparition on the userFeatures in Model
  • */
  • @SInCE("2.2.0")
  • def repartitionUserFeatures(partitionNum: Int = 0): Unit =
  • {
  • if (partitionNum > 0)
  • {
  •    userFeatures = userFeatures.repartition(partitionNum)
    
  • }
  • else
  • {
  •    userFeatures = userFeatures.repartition(userFeatures.getNumPartitions)
    
  • }
  • }
  • /**
    • Repartition ProductFeatures
    • @param partitionNum the value you want to do reparition on the ProductFeatures in Model
  • */
  • @SInCE("2.2.0")
  • def repartitionProductFeatures(partitionNum: Int = 0): Unit =
  • {
  • if (partitionNum > 0)
  • {
  •  productFeatures = productFeatures.repartition(partitionNum)
    
  • }
  • else
  • {
  •  productFeatures = productFeatures.repartition(productFeatures.getNumPartitions)
    
  • }
  • }
    /**
    • Recommends products to a user.

@mpjlu
Copy link

mpjlu commented May 17, 2017

Hi @auskalia , you are right. repartition can improve the performance of recommendForAll.
In my experiment for PR 17742, I have 120 cores, I use 20 partition for userFeatures, and itemFeatures.
I also consider to provide interface to user to have a chance to do re-partition.
Since you can set the partition number when train the model, I did not do that.

@auskalia
Copy link

Hi @mpjlu , your are right. But I consider that sometimes we have to use several spark mission to finish our work, especially the resource is insufficient in hadoop cluster. Due to save and reload file in different mission is a common method for engineering application. So I recommend to export an interface to try do repartition features for client.

robert3005 pushed a commit to palantir/spark that referenced this pull request May 19, 2017
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
Small clean ups from apache#17742 and apache#17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <[email protected]>

Closes apache#17919 from MLnick/SPARK-20677-als-perf-followup.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants