読者です 読者をやめる 読者になる 読者になる

情報幾何の基礎とEMアルゴリズムの解釈

大学の演習で1ヶ月ほど(本気で取り組んだのは事実上最後の1週間だけだったが…)情報幾何について発表したので、資料を公開してみる。
数学的な厳密さよりも、できるだけ前提知識の無い人にもわかりやすく、を意識して説明を心がけたので多方面からマサカリが飛んできそうでこわい。

www.slideshare.net

余談だけど、弊学科は4年の後期ではじめて研究室配属されるという、全国的に見てもかなり配属の遅い学科であり、4年の前期はこうして3つほど研究室を回りながら1ヶ月ずつテーマを決めて取り組むといった演習をやっている。
以前は4年の頭から配属させろよと思っていたが、他学科の話を聴いたり、改めて演習を終えてみて、こういうシステムも悪く無いと思った。
前期から配属されても結局サーベイ輪講をやっているだけの研究室も多いらしいし。

d3.jsで通信トラフィック可視化をやってみた

f:id:levelfour:20160621075117p:plain:w300

ゼミでSparkのノード間通信を可視化しようという話になった。
肝心のクラスタがグローバルネットワークから隔離されており、ログインノードからしかアクセスできないという環境だったので、

  1. クラスタで収集したパケットキャプチャログをログインノードに送る
  2. ログインノードでログを元にして可視化アプリケーションを立てる

という方針にした。VNCとポートフォワーディングを頑張ればEtherApeとか使えたのかもしれないけど、面倒くさいのと、JavaScriptで可視化出来たほうが見た目がいじりやすいしなんとなくカッコいい。

というわけで、デモはここで見れる。
ちなみに、デモで可視化しているトラフィックはダミーである。

実際にはクラスタ上でパケットキャプチャプログラム(僕はtcpdumpのバックエンドのサブセットみたいなのをCで書いた)を走らせ、吐いたログをrsyncでログインノード側で取得し、JavaScriptで表示した。
JavaScriptで1秒おきにログの変更を監視しているが、

function watch() {
  var now = new Date();
  d3.json(status_file + "?timestamp=" + now.getTime(), function(error, graph) {
    var new_status = JSON.stringify(graph.links);

    if(saved_status !== new_status) {
      console.log("update");
      links.length = 0;
      Array.prototype.push.apply(links, graph.links);
      draw();
      saved_status = new_status;
    }
  });
}

のように、

status_file + "?timestamp=" + now.getTime()

タイムスタンプをつけてアクセスする必要がある。というのも、素朴にアクセスするとキャッシュにひっかかって更新差分がとれないことがあるからだ。

Apache Sparkで特異値分解

Slacaなんて2年ぶりくらいに書いたし、ぶっちゃけほとんど何も覚えてなかったので、今後Sparkを使う可能性がある上で備忘録として。
ちなみに、Scala, sbt, Sparkのセットアップについては触れない。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition

object LargeSvd {
  def measure(proc: => Unit) {
    val startTime = System.currentTimeMillis
    proc
    println((System.currentTimeMillis - startTime)/1000.0 + " [sec]")
  }

  def main(args: Array[String]) {
    val fileName = "hdfs://PATH/TO/input.txt"
    val conf = new SparkConf().setAppName("Large Svd")
    val sc = new SparkContext(conf)

    val csv = sc.textFile(fileName)
    val data = csv.map(line => line.split(" ").map(elem => elem.trim))
    val rows: RDD[Vector] = data.map(row => Vectors.dense(row.map(e => e.toDouble)))
    val mat: RowMatrix = new RowMatrix(rows)
    
    measure {
      val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(20, computeU = true)
      val U: RowMatrix = svd.U
      val s: Vector = svd.s
      val V: Matrix = svd.V
    }

    println("Singlar Value Composition Finished")
  }
}

入力データとして想定されているのは

0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

みたいなスペース区切りのテキストファイル。ちなみにこのフォーマットはMahoutの特異値分解の入力フォーマットである。CSVを扱いたいなら、

line.split(" ")

の部分を

line.split(",")

とすればOK。

意外と面倒だったのが特異値分解を計算するcomputeSVDメソッドを持つRowMatrixクラスを生成するところで、順番に(型を)追うと

RDD[String] (csv)
→ Array[Array[String]] (data)
→ Array[Array[Double]] (row.map(e => e.toDouble))
→ Array[Vector] (data.map(row => row.map(e=>e.toDouble)))
→ RDD[Vector] (rows)
→ RowMatrix (mat)

といった順になる。

後半の特異値分解自体のコードはSpark Tutorialより。

ビルドと実行

name := "Large Svd Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.1",
  "org.apache.spark" %% "spark-mllib" % "1.2.1"
)

このファイルをsimple.sbtという名前で保存し、以下のような階層に配置する。

.
├── simple.sbt
└── src
    └── main
        └── scala
            └── LargeSvd.scala

LargeSvd.scalaは上で示した特異値分解のコードである。

simple.sbtの配置されているディレクトリで

sbt package

でビルド完了。

Sparkでの実行は

SPARK_HOME/bin/spark-submit --class "LargeSvd" target/scala-2.10/large-svd-project_2.10-1.0.jar

で行える。

大学の課題用に収集したデータセット(約10000行×2000列の行列)の特異値分解には約25秒ほどかかった(当然ノード構成に依存するが)。

感想としては、SparkやMapReduceのシステムを意識せずに分散計算ができるというのは非常に画期的なのだが、Scalaのビルドがやたらと遅いのと、Scalaに用意されているライブラリでは手の痒いところになかなか手が届かないという点が気になる。
例えば、CSVの読み込みくらいは提供してほしいなと思った(用意されていたらごめんなさい。僕は見つけられなかった)
言語としてはScalaは柔軟で結構好きなのだけれども。