Apache Sparkで特異値分解

Slacaなんて2年ぶりくらいに書いたし、ぶっちゃけほとんど何も覚えてなかったので、今後Sparkを使う可能性がある上で備忘録として。
ちなみに、Scala, sbt, Sparkのセットアップについては触れない。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition

object LargeSvd {
  def measure(proc: => Unit) {
    val startTime = System.currentTimeMillis
    proc
    println((System.currentTimeMillis - startTime)/1000.0 + " [sec]")
  }

  def main(args: Array[String]) {
    val fileName = "hdfs://PATH/TO/input.txt"
    val conf = new SparkConf().setAppName("Large Svd")
    val sc = new SparkContext(conf)

    val csv = sc.textFile(fileName)
    val data = csv.map(line => line.split(" ").map(elem => elem.trim))
    val rows: RDD[Vector] = data.map(row => Vectors.dense(row.map(e => e.toDouble)))
    val mat: RowMatrix = new RowMatrix(rows)
    
    measure {
      val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(20, computeU = true)
      val U: RowMatrix = svd.U
      val s: Vector = svd.s
      val V: Matrix = svd.V
    }

    println("Singlar Value Composition Finished")
  }
}

入力データとして想定されているのは

0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

みたいなスペース区切りのテキストファイル。ちなみにこのフォーマットはMahoutの特異値分解の入力フォーマットである。CSVを扱いたいなら、

line.split(" ")

の部分を

line.split(",")

とすればOK。

意外と面倒だったのが特異値分解を計算するcomputeSVDメソッドを持つRowMatrixクラスを生成するところで、順番に(型を)追うと

RDD[String] (csv)
→ Array[Array[String]] (data)
→ Array[Array[Double]] (row.map(e => e.toDouble))
→ Array[Vector] (data.map(row => row.map(e=>e.toDouble)))
→ RDD[Vector] (rows)
→ RowMatrix (mat)

といった順になる。

後半の特異値分解自体のコードはSpark Tutorialより。

ビルドと実行

name := "Large Svd Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.1",
  "org.apache.spark" %% "spark-mllib" % "1.2.1"
)

このファイルをsimple.sbtという名前で保存し、以下のような階層に配置する。

.
├── simple.sbt
└── src
    └── main
        └── scala
            └── LargeSvd.scala

LargeSvd.scalaは上で示した特異値分解のコードである。

simple.sbtの配置されているディレクトリで

sbt package

でビルド完了。

Sparkでの実行は

SPARK_HOME/bin/spark-submit --class "LargeSvd" target/scala-2.10/large-svd-project_2.10-1.0.jar

で行える。

大学の課題用に収集したデータセット(約10000行×2000列の行列)の特異値分解には約25秒ほどかかった(当然ノード構成に依存するが)。

感想としては、SparkやMapReduceのシステムを意識せずに分散計算ができるというのは非常に画期的なのだが、Scalaのビルドがやたらと遅いのと、Scalaに用意されているライブラリでは手の痒いところになかなか手が届かないという点が気になる。
例えば、CSVの読み込みくらいは提供してほしいなと思った(用意されていたらごめんなさい。僕は見つけられなかった)
言語としてはScalaは柔軟で結構好きなのだけれども。