Skip to content

Commit

Permalink
MAHOUT-1837: Sparse/Dense Matrix analysis for Matrix Multiplication. c…
Browse files Browse the repository at this point in the history
…loses apache#228
  • Loading branch information
andrewpalumbo committed Jun 16, 2016
1 parent cfe52f2 commit d994048
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.mahout.math.scalabindings._

import scala.reflect.ClassTag
import org.apache.mahout.math.drm.logical.OpAewUnaryFunc

import collection._

package object drm {
Expand Down Expand Up @@ -354,7 +355,6 @@ package object drm {
keys block
}
}

}

package object indexeddataset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object MMul extends MMBinaryFunc {

val (af, bf) = (a.getFlavor, b.getFlavor)
val backs = (af.getBacking, bf.getBacking)
val sd = (af.getStructure, af.isDense, bf.getStructure, bf.isDense)
val sd = (af.getStructure, sparsityAnalysis(a), bf.getStructure, sparsityAnalysis(b))

val alg: MMulAlg = backs match {

Expand Down Expand Up @@ -124,7 +124,7 @@ object MMul extends MMBinaryFunc {
require(r.forall(mxR mxR.nrow == a.nrow && mxR.ncol == b.ncol))
val (m, n) = (a.nrow, b.ncol)

val mxR = r.getOrElse(if (a.getFlavor.isDense) a.like(m, n) else b.like(m, n))
val mxR = r.getOrElse(if (sparsityAnalysis(a)) a.like(m, n) else b.like(m, n))

for (row 0 until mxR.nrow; col 0 until mxR.ncol) {
// this vector-vector should be sort of optimized, right?
Expand Down Expand Up @@ -269,10 +269,10 @@ object MMul extends MMBinaryFunc {
val (m, n) = (a.nrow, b.ncol)

// Prefer col-wise result iff a is dense and b is sparse. In all other cases default to row-wise.
val preferColWiseR = a.getFlavor.isDense && !b.getFlavor.isDense
val preferColWiseR = sparsityAnalysis(a) && !sparsityAnalysis(b)

val mxR = r.getOrElse {
(a.getFlavor.isDense, preferColWiseR) match {
(sparsityAnalysis(a), preferColWiseR) match {
case (false, false) b.like(m, n)
case (false, true) b.like(n, m).t
case (true, false) a.like(m, n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.mahout.math

import org.apache.mahout.math.solver.EigenDecomposition

import collection._
import scala.util.Random

/**
* Mahout matrices and vectors' scala syntactic sugar
Expand All @@ -29,6 +31,12 @@ package object scalabindings {
// Reserved "ALL" range
final val `::`: Range = null

// values for stochastic sparsityAnalysis
final val z95 = 1.959964
final val z80 = 1.281552
final val maxSamples = 500
final val minSamples = 15

// Some enums
object AutoBooleanEnum extends Enumeration {
type T = Value
Expand Down Expand Up @@ -410,4 +418,57 @@ package object scalabindings {

def dist(mxX: Matrix, mxY: Matrix): Matrix = sqDist(mxX, mxY) := sqrt _

/**
* Check the density of an in-core matrix based on supplied criteria.
* Returns true if we think mx is densier than threshold with at least 80% confidence.
*
* @param mx The matrix to check density of.
* @param threshold the threshold of non-zero elements above which we consider a Matrix Dense
*/
def sparsityAnalysis(mx: Matrix, threshold: Double = 0.25): Boolean = {

require(threshold >= 0.0 && threshold <= 1.0)
var n = minSamples
var mean = 0.0
val rnd = new Random()
val dimm = mx.nrow
val dimn = mx.ncol
val pq = threshold * (1 - threshold)

for (s 0 until minSamples) {
if (mx(rnd.nextInt(dimm), rnd.nextInt(dimn)) != 0.0) mean += 1
}
mean /= minSamples
val iv = z80 * math.sqrt(pq / n)

if (mean < threshold - iv) return false // sparse
else if (mean > threshold + iv) return true // dense

while (n < maxSamples) {
// Determine upper bound we may need for n to likely relinquish the uncertainty. Here, we use
// confidence interval formula but solved for n.
val ivNeeded = math.abs(threshold - mean) max 1e-11

val stderr = ivNeeded / z80
val nNeeded = (math.ceil(pq / (stderr * stderr)).toInt max n min maxSamples) - n

var meanNext = 0.0
for (s 0 until nNeeded) {
if (mx(rnd.nextInt(dimm), rnd.nextInt(dimn)) != 0.0) meanNext += 1
}
mean = (n * mean + meanNext) / (n + nNeeded)
n += nNeeded

// Are we good now?
val iv = z80 * math.sqrt(pq / n)
if (mean < threshold - iv) return false // sparse
else if (mean > threshold + iv) return true // dense
}

return mean <= threshold

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.mahout.math.scalabindings

import org.apache.log4j.Level

import org.apache.mahout.logging._
import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings.RLikeOps._
Expand Down Expand Up @@ -234,4 +236,32 @@ class MathSuite extends FunSuite with MahoutSuite {
(mxDsq - mxDsqControl).norm should be < 1e-7
}

test("sparsity analysis") {
setLogLevel(Level.DEBUG)

val m = 500
val n = 800
val mxA = new DenseMatrix(m, n)

sparsityAnalysis(mxA) shouldBe false
sparsityAnalysis(mxA, .5) shouldBe false
sparsityAnalysis(mxA + 1) shouldBe true
sparsityAnalysis(mxA + 1, .95) shouldBe true

for (i 0 until m by 5) mxA(i, ::) := 1
info(s"20% detected as dense?:${sparsityAnalysis(mxA)}")
mxA := 0

for (i 0 until m by 3) mxA(i, ::) := 1
info(s"33% detected as dense?:${sparsityAnalysis(mxA)}")
mxA := 0

for (i 0 until m by 4) mxA(i, ::) := 1
info(s"25% detected as dense?:${sparsityAnalysis(mxA)}")

for (i 0 until m by 2) mxA(i, ::) := 1
info(s"50% detected as dense?:${sparsityAnalysis(mxA)}")

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag
import org.apache.mahout.sparkbindings._
import org.apache.mahout.math.drm.BlockifiedDrmTuple
import org.apache.mahout.sparkbindings.drm._
import org.apache.mahout.math.{SparseMatrix, Matrix, SparseRowMatrix}
import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, SparseRowMatrix}
import org.apache.mahout.math.drm.logical.OpABt
import org.apache.mahout.logging._
import org.apache.mahout.math.flavor.MatrixFlavor

/** Contains RDD plans for ABt operator */
object ABt {
Expand Down Expand Up @@ -116,7 +118,12 @@ object ABt {
// Empty combiner += value
createCombiner = (t: (Array[K], Array[Int], Matrix)) => {
val (rowKeys, colKeys, block) = t
val comb = new SparseMatrix(prodNCol, block.nrow).t

val comb = if (block.getFlavor == MatrixFlavor.SPARSELIKE) {
new SparseMatrix(prodNCol, block.nrow).t
} else {
new DenseMatrix(prodNCol, block.nrow).t
}

for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
rowKeys -> comb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,27 @@ package object drm {
val keys = data.map(t => t._1).toArray[K]
val vectors = data.map(t => t._2).toArray

val block = if (vectors(0).isDense) {
val block = new DenseMatrix(vectors.length, blockncol)
var row = 0
while (row < vectors.length) {
block(row, ::) := vectors(row)
row += 1
}
// create the block by default as dense.
// would probably be better to sample a subset of these
// vectors first before creating the entire matrix.
// so that we don't have the overhead of creating a full second matrix in
// the case that the matrix is not dense.
val block = new DenseMatrix(vectors.length, blockncol)
var row = 0
while (row < vectors.length) {
block(row, ::) := vectors(row)
row += 1
}

// Test the density of the data. If the matrix does not meet the
// requirements for density, convert the Vectors to a sparse Matrix.
val resBlock = if (sparsityAnalysis(block)) {
block
} else {
new SparseRowMatrix(vectors.length, blockncol, vectors, true, false)
}

Iterator(keys -> block)
Iterator(keys -> resBlock)
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite
}).norm should be < 1e-4
}

test("DRM blockify sparse -> SRM") {
test("DRM blockify sparse -> DRM") {

val inCoreA = sparse(
(1, 2, 3),
Expand All @@ -59,7 +59,7 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite

(inCoreA - drmA.mapBlock() {
case (keys, block) =>
if (!block.isInstanceOf[SparseRowMatrix])
if (block.isInstanceOf[SparseRowMatrix])
throw new AssertionError("Block must be dense.")
keys -> block
}).norm should be < 1e-4
Expand Down

0 comments on commit d994048

Please sign in to comment.