Apache Sparkで特異値分解
Slacaなんて2年ぶりくらいに書いたし、ぶっちゃけほとんど何も覚えてなかったので、今後Sparkを使う可能性がある上で備忘録として。
ちなみに、Scala, sbt, Sparkのセットアップについては触れない。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.SingularValueDecomposition object LargeSvd { def measure(proc: => Unit) { val startTime = System.currentTimeMillis proc println((System.currentTimeMillis - startTime)/1000.0 + " [sec]") } def main(args: Array[String]) { val fileName = "hdfs://PATH/TO/input.txt" val conf = new SparkConf().setAppName("Large Svd") val sc = new SparkContext(conf) val csv = sc.textFile(fileName) val data = csv.map(line => line.split(" ").map(elem => elem.trim)) val rows: RDD[Vector] = data.map(row => Vectors.dense(row.map(e => e.toDouble))) val mat: RowMatrix = new RowMatrix(rows) measure { val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(20, computeU = true) val U: RowMatrix = svd.U val s: Vector = svd.s val V: Matrix = svd.V } println("Singlar Value Composition Finished") } }
入力データとして想定されているのは
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
みたいなスペース区切りのテキストファイル。ちなみにこのフォーマットはMahoutの特異値分解の入力フォーマットである。CSVを扱いたいなら、
line.split(" ")
の部分を
line.split(",")
とすればOK。
意外と面倒だったのが特異値分解を計算するcomputeSVDメソッドを持つRowMatrixクラスを生成するところで、順番に(型を)追うと
RDD[String] (csv) → Array[Array[String]] (data) → Array[Array[Double]] (row.map(e => e.toDouble)) → Array[Vector] (data.map(row => row.map(e=>e.toDouble))) → RDD[Vector] (rows) → RowMatrix (mat)
といった順になる。
後半の特異値分解自体のコードはSpark Tutorialより。
ビルドと実行
name := "Large Svd Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.6.1", "org.apache.spark" %% "spark-mllib" % "1.2.1" )
このファイルをsimple.sbtという名前で保存し、以下のような階層に配置する。
. ├── simple.sbt └── src └── main └── scala └── LargeSvd.scala
LargeSvd.scalaは上で示した特異値分解のコードである。
simple.sbtの配置されているディレクトリで
sbt package
でビルド完了。
Sparkでの実行は
SPARK_HOME/bin/spark-submit --class "LargeSvd" target/scala-2.10/large-svd-project_2.10-1.0.jar
で行える。
大学の課題用に収集したデータセット(約10000行×2000列の行列)の特異値分解には約25秒ほどかかった(当然ノード構成に依存するが)。
感想としては、SparkやMapReduceのシステムを意識せずに分散計算ができるというのは非常に画期的なのだが、Scalaのビルドがやたらと遅いのと、Scalaに用意されているライブラリでは手の痒いところになかなか手が届かないという点が気になる。
例えば、CSVの読み込みくらいは提供してほしいなと思った(用意されていたらごめんなさい。僕は見つけられなかった)
言語としてはScalaは柔軟で結構好きなのだけれども。