import breeze.linalg.{*, Axis, DenseVector, argmax, convert, fliplr, flipud, kron, max}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
import breeze.numerics._
import java.io.File
import java.util.{ArrayList, List}
import org.apache.flink.api.scala._
import breeze.linalg._
import org.apache.flink.ml.MLUtils
import org.apache.flink.ml.math.Vector
object cnn {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val svm = MLUtils.readLibSVM(env, "C:\\Users\\Kannan.b\\Desktop\\w2v\\MLP.txt")
svm.print()
val traintest = Splitter.trainTestSplit(svm, 0.7, true)
val train = traintest.training
val astroTest: DataSet[(Double, Vector)] = MLUtils.readLibSVM(env, "C:\\Users\\Kannan.b\\Desktop\\w2v\\MLP.txt")
.map(x => (x.label, x.vector))
val test = traintest.testing
val model = new Sequential()
val one = 5
val two = 1
val three = 3
val four = 3
val five = 2
val six = 2
model.add(new Convolution2D(one, two, three, four, five, six))
model.add(new Activation("relu"))
model.add(new Dropout(0.5))
model.add(new Dense(20, 10))
model.add(new Activation("softmax"))
val res = model.fit(astroTest)
}
case class Ml(input: Double, output: Int)
abstract class Layer {
// Initialize variables that are shared by all layers. Note that not every variable will
// be used by ever layer type (for example, the activation layers do not have weights).
var layer_type: String = null
var delta: BDM[Double] = null
var weights: BDM[Double] = null
var moment1: BDM[Double] = null
var moment2: BDM[Double] = null
// Initialize functions that are shared by all layers. Note that not every layer type will
// use every function (for example, the activation layers do not call compute_gradient).
def forward(forward_data: BDM[Double]): BDM[Double]
def prev_delta(delta: BDM[Double]): BDM[Double]
def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double]
def init_conv_parameters(num_filters: Int, prev_channels: Int, filter_height: Int,
filter_width: Int, kind: String): BDM[Double] = {
// Convolutional filter matrix is a vertical stack of filters. Order is that for each
// previous channel, fill in the filters that map to the next impage map.
val filters = BDM.zeros[Double](filter_height * num_filters * prev_channels, filter_width)
val r = scala.util.Random
for (j <- 0 until prev_channels) {
for (i <- 0 until num_filters) {
val y_index = filter_height * j + filter_height * i
if (kind == "weights") {
//filters(y_index until y_index + filter_height, ::) :=
BDM.ones[Double](filter_height, filter_width).map(x => r.nextDouble - 0.5) :* .01
} else {
filters(y_index until y_index + filter_height, ::) :=
BDM.zeros[Double](filter_height, filter_width)
}
}
}
filters
}
}
class Sequential {
var layers: List[Layer] = new ArrayList[Layer]
var optimizer: Optimizer = null
var lr: Double = .01
var s: Double = .9
var r: Double = .999
var loss: String = "categorical_crossentropy"
var metrics: String = "accuracy"
var optimizer_type: String = "adam"
def add(new_layer: Layer): Unit = {
layers.add(new_layer)
// Allows Dropout layer to be specified by user without manually adding activation function
if (new_layer.layer_type == "Dropout") {
layers.add(new Activation("linear"))
}
}
def evaluate(train_eval: BDM[Double], labels: BDV[Double]): Unit = {
var f = train_eval
// Forward through topology
for (layer <- 0 to this.layers.size - 1) {
f = this.layers.get(layer).forward(f)
}
val softmax = f
// Column in softmax with maximum value corresponds to prediction
val predictions = argmax(softmax, Axis._1)
// Compute proportion of labels that are correct
val diff = predictions - convert(labels, Int)
var count = 0
for (i <- 0 to diff.size - 1) {
if (diff(i) == 0) {
count += 1
}
else {}
}
print("Train Accuracy: ")
println(count.toDouble / diff.size.toDouble)
}
def get_batch(x_train: BDM[Double], y_train: BDV[Int], batch_size: Int):
(BDM[Double], BDV[Int]) = {
val rand = scala.util.Random
val x_batch = BDM.zeros[Double](batch_size, x_train.cols)
val y_batch = BDV.zeros[Int](batch_size)
for (i <- 0 until batch_size) {
val batch_index = rand.nextInt(x_train.rows - batch_size)
x_batch(i, ::) := x_train(batch_index, ::)
y_batch(i) = y_train(batch_index)
}
(x_batch, y_batch)
}
def compile(loss: String, optimizer: Optimizer, metrics: String): Unit = {
this.optimizer = optimizer
this.loss = loss
this.metrics = metrics
}
def fit(DS: DataSet[(Double, Vector)], num_iters: Int = 1000, batch_size: Int = 16): Sequential = {
// Grab relevant variables for optimizaiton from the optimizer object.
val lr = this.optimizer.lr
val s = this.optimizer.s
val r = this.optimizer.r
val optimizer = this.optimizer.optimizer_type
// Convert Spark Dataframe to Breeze Dense Matrix and Dense Vector
// Convert Spark Dataframe to Breeze Dense Matrix and Dense Vector
// For Reference
val xArray = DenseVector(DS.map(v => v._1).collect())
val xArray1 = new BDV[Double](xArray.length)
for (i <- 0 until xArray.length) {
xArray1(i) = xArray(i)(0).asInstanceOf[Double]
}
val x = new BDM[Double](xArray.length, xArray(0).length)
val x1 = for (i <- 0 until xArray.length) {
x(i, ::) := xArray1(i).t
}
val yArray = DenseVector(DS.map { pair => pair._2 }.collect());
val y = new BDV[Double](yArray.length)
for (i <- 0 until yArray.length) {
y(i) = yArray(i)(0).asInstanceOf[Double]
}
//val xArray = DS.map(v => v.input)
//.map(v => new BDV[Double](v).collect()
//var xArray= BDV(DS.map{pair => pair.input }.collect());
//val ones = DenseMatrix.ones[Double](x.rows, 1)
val x_train = x
val y_train = convert(y, Int)
val class_count = this.layers.get(this.layers.size - 2).asInstanceOf[Dense].get_num_hidden
def conditional(value: Int, seek: Int): Int = {
if (value == seek) {
-1
} else {
0
}
}
val numerical_stability = .00000001
val rand = scala.util.Random
for (iterations <- 0 to num_iters) {
val (x_batch, y_batch) = get_batch(x_train, y_train, batch_size)
var f = x_batch
// Forward
for (layer <- 0 to this.layers.size - 1) {
f = this.layers.get(layer).forward(f)
}
var softmax = f
// Backward
val softmax_delta = BDM.zeros[Double](batch_size, class_count)
for (i <- 0 to softmax_delta.cols - 1) {
softmax_delta(::, i) := softmax_delta(::, i) + convert(convert(y_batch, Int)
.map(x => conditional(x, i)), Double)
}
softmax = softmax + softmax_delta
this.layers.get(this.layers.size - 1).delta = softmax :/ batch_size.toDouble
// Compute Errors
for (i <- this.layers.size - 2 to 0 by -1) {
this.layers.get(i).delta = this.layers.get(i).prev_delta(this.layers.get(i + 1).delta)
}
// Compute and Update Gradients
for (i <- this.layers.size - 2 to 0 by -1) {
if (this.layers.get(i).layer_type == "Dense" ||
this.layers.get(i).layer_type == "Convolution2D") {
val gradient =
if (i == 0) {
if (this.layers.get(i).layer_type == "Dense") {
this.layers.get(i).asInstanceOf[Dense].compute_gradient(x_batch,
this.layers.get(i + 1).delta)
} else {
this.layers.get(i).asInstanceOf[Convolution2D].compute_gradient(x_batch,
this.layers.get(i + 1).delta)
}
} else {
if (this.layers.get(i).layer_type == "Dense") {
this.layers.get(i).asInstanceOf[Dense].compute_gradient(this.layers.get(i - 1)
.asInstanceOf[Activation].hidden_layer, this.layers.get(i + 1).delta)
} else {
this.layers.get(i).asInstanceOf[Convolution2D].compute_gradient(this.layers.get(i - 1)
.asInstanceOf[Activation].hidden_layer, this.layers.get(i + 1).delta)
}
}
val layer = this.layers.get(i)
if (optimizer == "sgd") {
layer.weights -= lr * gradient
}
else if (optimizer == "momentum") {
layer.moment1 = s * layer.moment1 + lr * gradient
layer.weights -= layer.moment1
}
else if (optimizer == "adam") {
layer.moment1 = s * layer.moment1 + (1 - s) * gradient
layer.moment2 = r * layer.moment2 + (1 - r) * (gradient :* gradient)
val m1_unbiased = layer.moment1 :/ (1 - (math.pow(s, iterations + 1)))
val m2_unbiased = layer.moment2 :/ (1 - (math.pow(r, iterations + 1)))
layer.weights -= lr * m1_unbiased :/ (sqrt(m2_unbiased) + numerical_stability)
}
}
}
if (iterations % 10 == 0) {
evaluate(x(0 until 101, ::), y(0 until 101))
}
}
// println(evaluate(x(0 until 1000, ::), y(0 until 1000)))
this
}
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) {
val p = new java.io.PrintWriter(f)
try {
op(p)
} finally {
p.close()
}
}
def save(name: String): Unit = {
val model = this
var layers: Array[String] = new Array[String](model.layers.size)
var weights: List[BDM[Double]] = new ArrayList[BDM[Double]]
for (i <- 0 until model.layers.size) {
layers(i) = model.layers.get(i).layer_type
if (model.layers.get(i).layer_type == "Dense" || model.layers.get(i).layer_type ==
"Convolution2D") {
weights.add(model.layers.get(i).weights)
} else {}
}
val dir = new File(name)
dir.mkdir()
this.printToFile(new File(name + "/layers.txt")) {
p => layers.foreach(p.println)
}
var weights_index = 0
for (i <- 0 until model.layers.size) {
if (model.layers.get(i).layer_type == "Dense" || model.layers.get(i).layer_type ==
"Convolution2D") {
breeze.linalg.csvwrite(new File(name + "/weights" + i.toString),
weights.get(weights_index))
weights_index += 1
}
}
}
}
class Model() {
def load(name: String): Sequential = {
val seq: Sequential = new Sequential()
val lines = scala.io.Source.fromFile(name + "/layers.txt").mkString.split("\n")
for (i <- 0 until lines.length) {
if (lines(i) == "Convolution2D") {
seq.add(new Convolution2D(8, 1, 3, 3, 28, 28))
seq.layers.get(i).weights = breeze.linalg.csvread(new File(
name + "/weights" + i.toString))
}
if (lines(i) == "Dense") {
seq.add(new Dense(6272, 10))
seq.layers.get(i).weights = breeze.linalg.csvread(new File(name + "/weights" + i.toString))
}
if (lines(i) == "Activation" && i == 1) {
seq.add(new Activation("relu"))
}
if (lines(i) == "Activation" && i == 3) {
seq.add(new Activation("softmax"))
}
}
seq
}
}
class Dense(input_shape: Int, num_hidden: Int) extends Layer {
val r = scala.util.Random
this.weights = BDM.ones[Double](input_shape, num_hidden).map(x => r.nextDouble - 0.5) :* .01
this.moment1 = BDM.zeros[Double](input_shape, num_hidden)
this.moment2 = BDM.zeros[Double](input_shape, num_hidden)
var hidden_layer: BDM[Double] = null
this.delta = null
this.layer_type = "Dense"
def get_num_hidden: Int = num_hidden
def get_input_shape: Int = input_shape
override def forward(forward_data: BDM[Double]): BDM[Double] = {
// Breeze's matrix multiplication syntax allows us to simply use *
hidden_layer = forward_data * weights
hidden_layer
}
override def prev_delta(delta: BDM[Double]): BDM[Double] = {
delta * weights.t
}
override def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double] = {
backward_data.t * delta
}
}
class Dropout(proportion: Double) extends Layer {
val r = scala.util.Random
this.layer_type = "Dropout"
override def forward(forward_data: BDM[Double]): BDM[Double] = {
// May need to do something fancy at inference time
forward_data.map(x => if (r.nextDouble < proportion) {
0
} else {
x
})
}
override def prev_delta(delta: BDM[Double]): BDM[Double] = {
delta
}
override def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double] = {
println("MAJOR ERROR - ACTIVATION LAYER SHOULD NOT COMPUTE GRADIENT")
backward_data
}
}
class Convolution2D(
num_filters: Int, // Number of filters in convolutional layer
prev_channels: Int, // Number of filters in previous layer
filter_height: Int, // Height of filters in this convoltional layer
filter_width: Int, // Width of filters in this convolutional layer
img_height: Int, // Height of filter map input to this layer
img_width: Int // Width of filter map input to this layer
) extends Layer {
this.layer_type = "Convolution2D"
this.weights = init_conv_parameters(num_filters,
prev_channels, filter_height, filter_width, "weights")
this.moment1 = init_conv_parameters(num_filters,
prev_channels, filter_height, filter_width, "moment")
this.moment2 = init_conv_parameters(num_filters,
prev_channels, filter_height, filter_width, "moment")
this.delta = null
val len = img_height * img_width
def convolution(image: BDM[Double], filter: BDM[Double]): BDM[Double] = {
val image_height = image.rows
val image_width = image.cols
val local_filter_height = filter.rows
val local_filter_width = filter.cols
val padded = BDM.zeros[Double](image_height + 2 * (filter_height / 2),
image_width + 2 * (filter_width / 2))
for (i <- 0 until image_height) {
for (j <- 0 until image_width) {
padded(i + (filter_height / 2), j + (filter_height / 2)) = image(i, j)
}
}
val convolved = BDM.zeros[Double](image_height - local_filter_height + 1 + 2 *
(filter_height / 2), image_width - local_filter_width + 1 + 2 * (filter_width / 2))
for (i <- 0 until convolved.rows) {
for (j <- 0 until convolved.cols) {
var aggregate = 0.0
for (k <- 0 until local_filter_height) {
for (l <- 0 until local_filter_width) {
aggregate += padded(i + k, j + l) * filter(k, l)
}
}
convolved(i, j) = aggregate
}
}
convolved
}
override def forward(input_data: BDM[Double]): BDM[Double] = {
val outputs = BDM.zeros[Double](input_data.rows, img_height * img_width *
num_filters * prev_channels)
for (i <- 0 until input_data.rows) {
for (j <- 0 until prev_channels) {
for (k <- 0 until prev_channels) {
for (l <- 0 until num_filters) {
val index1 = l * len + k * len
val index2 = (l + 1) * len + k * len
val data_index1 = j * len
val data_index2 = (j + 1) * len
val filter_index = k * filter_height + l * filter_height
val img = input_data(i, data_index1 until data_index2).t.toDenseMatrix
.reshape(img_height, img_width)
val fil = weights(filter_index until (filter_index + filter_height), ::)
outputs(i, index1 until index2) := convolution(img, fil)
.reshape(1, img_height * img_width).toDenseVector.t
}
}
}
}
outputs
}
override def prev_delta(delta: BDM[Double]): BDM[Double] = {
val output_delta = BDM.zeros[Double](delta.rows, delta.cols / num_filters)
for (i <- 0 until delta.rows) {
for (j <- 0 until prev_channels) {
for (k <- 0 until num_filters) {
val filter_index = filter_height * j + filter_height * k
val x_index = j * len + k * len
val img = delta(i, x_index until x_index + len).t.toDenseMatrix
.reshape(img_height, img_width)
val filter = flipud(fliplr(weights(filter_index until filter_index + filter_height, ::)))
val x_output = j * len
output_delta(i, x_output until x_output + len) +=
convolution(img, filter).reshape(1, img_height * img_width).toDenseVector.t
}
}
}
output_delta :/ num_filters.toDouble
}
override def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double] = {
val gradient = BDM.zeros[Double](filter_height * num_filters * prev_channels, filter_width)
for (i <- 0 until backward_data.rows) {
for (j <- 0 until prev_channels) {
for (k <- 0 until num_filters) {
val y_index = filter_height * j + filter_height * k
val data_index = prev_channels * j
val delta_index = k * len
gradient(y_index until y_index + filter_height, ::) +=
convolution(backward_data(i, data_index until (data_index + len)).t.toDenseMatrix.
reshape(img_height, img_width), delta(i, delta_index until (delta_index + len)).
t.toDenseMatrix.reshape(img_height, img_width))
}
}
}
gradient :/ backward_data.rows.toDouble
}
}
class MaxPooling2D(pool_height: Int, pool_width: Int, pool_stride_x: Int, pool_stride_y: Int,
prev_channels: Int, num_filters: Int, img_height: Int, img_width: Int) extends Layer {
this.layer_type = "MaxPooling2D"
val len = img_height * img_width
override def forward(input_data: BDM[Double]): BDM[Double] = {
val outputs = BDM.zeros[Double](input_data.rows, (img_height * img_width *
prev_channels) / (pool_width * pool_height))
for (i <- 0 until input_data.rows) {
for (j <- 0 until prev_channels) {
val img = input_data(i, j * len until (j + 1) * len).t.toDenseMatrix
.reshape(img_height, img_width).t
for (k <- 0 until img_height by pool_stride_y) {
for (l <- 0 until img_width by pool_stride_x) {
outputs(i, j * img_height / pool_height * img_width / pool_width +
k / pool_stride_y * img_width / pool_stride_x + l / pool_stride_x) =
max(img(k until k + pool_height, l until l + pool_width))
}
}
}
}
outputs
}
override def prev_delta(delta: BDM[Double]): BDM[Double] = {
val output_delta = BDM.zeros[Double](delta.rows, len * prev_channels)
for (i <- 0 until delta.rows) {
for (j <- 0 until prev_channels) {
val x_index = j * img_height / pool_height * img_width / pool_width
val img = delta(i, x_index until x_index + img_height / pool_height *
img_width / pool_width).t.toDenseMatrix.reshape(img_height / pool_height,
img_width / pool_width)
val x_output = j * len
output_delta(i, x_output until x_output + len) :=
kron(img, BDM.ones[Double](pool_height, pool_width)).reshape(1,
img_height * img_width).toDenseVector.t
}
}
output_delta
}
override def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double] = {
println("MAJOR ERROR - POOLING LAYER SHOULD NOT COMPUTE GRADIENT")
backward_data
}
}
class Optimizer() {
var lr: Double = 0.01
var s: Double = 0.9
var r: Double = 0.999
var optimizer_type: String = "adam"
def adam(lr: Double = 0.01, s: Double = 0.9, r: Double = 0.999): Optimizer = {
this.lr = lr
this.s = s
this.r = r
this.optimizer_type = "adam"
this
}
def momentum(lr: Double = 0.01, s: Double = 0.9): Optimizer = {
this.lr = lr
this.s = s
this.optimizer_type = "momentum"
this
}
def SGD(lr: Double = 0.01): Optimizer = {
this.lr = lr
this.optimizer_type = "sgd"
this
}
}
class Activation(var kind: String) extends Layer {
this.layer_type = "Activation"
var hidden_layer: BDM[Double] = null
this.delta = null
var output_softmax: BDM[Double] = null
override def forward(input_data: BDM[Double]): BDM[Double] = {
if (kind == "relu") {
hidden_layer = input_data.map(x => max(0.0, x))
hidden_layer
}
else if (kind == "linear") {
hidden_layer = input_data
hidden_layer
}
else if (kind == "softmax") {
val softmax = exp(input_data)
val divisor = breeze.linalg.sum(softmax(*, ::))
for (i <- 0 to softmax.cols - 1) {
softmax(::, i) := softmax(::, i) :/ divisor
}
softmax
}
else {
println("MAJOR ERROR1")
input_data
}
}
def relu_grad(value: Double): Double = {
if (value <= 0) {
0
} else {
1
}
}
override def prev_delta(delta: BDM[Double]): BDM[Double] = {
if (kind == "relu") {
delta :* hidden_layer.map(relu_grad)
delta
}
else if (kind == "linear") {
delta
}
else {
println("MAJOR ERROR2")
delta
}
}
override def compute_gradient(backward_data: BDM[Double], delta: BDM[Double]): BDM[Double] = {
println("MAJOR ERROR - ACTIVATION LAYER SHOULD NOT COMPUTE GRADIENT")
backward_data
}
}
Примечание. Это код Scala CNN, который мы пытаемся выполнить на Apache Flink, но он выдает следующую ошибку
Ошибка: (143, 67) Символ «term .shapeless» отсутствуетиз класса.Этот символ требуется для значения breeze.linalg.operators.MatrixGenericOps.v1ne.Убедитесь, что термин бесформенный находится в вашем пути к классам, и проверьте наличие конфликтующих зависимостей с помощью -Ylog-classpath
.Полная перестройка может помочь, если 'MatrixGenericOps.class' был скомпилирован с несовместимой версией.BDM.ones [Double] (filter_height, filter_width) .map (x => r.nextDouble - 0.5): * .01