Hadoop生態大數據系統分為Yam、 HDFS、MapReduce計算框架。TensorFlow分散式相當於MapReduce計算框架,Kubernetes相當於Yam調度系統。TensorFlowOnSpark,利用遠程直接記憶體訪問(Remote Direct Memory Access,RDM ...
Hadoop生態大數據系統分為Yam、 HDFS、MapReduce計算框架。TensorFlow分散式相當於MapReduce計算框架,Kubernetes相當於Yam調度系統。TensorFlowOnSpark,利用遠程直接記憶體訪問(Remote Direct Memory Access,RDMA)解決存儲功能和調度,實現深度學習和大數據融合。TensorFlowOnSpark(TFoS),雅虎開源項目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分散式TensorFlow訓練、預測。TensorFlowOnSpark提供橋接程式,每個Spark Executor啟動一個對應TensorFlow進程,通過遠程進程通信(RPC)交互。
TensorFlowOnSpark架構。TensorFlow訓練程式用Spark集群運行,管理Spark集群步驟:預留,在Executor執行每個TensorFlow進程保留一個埠,啟動數據消息監聽器。啟動,在Executor啟動TensorFlow主函數。數據獲取,TensorFlow Readers和QueueRunners機制直接讀取HDFS數據文件,Spark不訪問數據;Feeding,SparkRDD 數據發送TensorFlow節點,數據通過feed_dict機制傳入TensorFlow計算圖。關閉,關閉Executor TensorFlow計算節點、參數服務節點。Spark Driver->Spark Executor->參數伺服器->TensorFlow Core->gRPC、RDMA->HDFS數據集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。
TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一臺電腦。安裝 Spark、Hadoop。部署Java 1.8.0 JDK。下載Spark2.1.0版 http://spark.apache.org/downloads.html 。下載Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持較好。
修改配置文件,設置環境變數,啟動Hadoop:$HADOOP_HOME/sbin/start-all.sh。檢出TensorFlowOnSpark源代碼:
git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx
源代碼打包,提交任務使用:
cd TensorflowOnSpark/src
zip -r ../tfspark.zip *
設置TensorFlowOnSpark根目錄環境變數:
cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)
啟動Spark主節點(master):
$(SPARK_HOME)/sbin/start-master.sh
配置兩個工作節點(worker)實例,master-spark-URL連接主節點:
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)
提交任務,MNIST zip文件轉換為HDFS RDD 數據集:
$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv
查看處理過的數據集:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv
查看保存圖片、標記向量:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels
把訓練集、測試集分別保存RDD數據。
https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):
"""Serializes an image/label as a TFExample byte string"""
example = tf.train.Example(
features = tf.train.Features(
feature = {
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),
'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))
}
)
)
return example.SerializeToString()
def fromTFExample(bytestr):
"""Deserializes a TFExample from a byte string"""
example = tf.train.Example()
example.ParseFromString(bytestr)
return example
def toCSV(vec):
"""Converts a vector/array into a CSV string"""
return ','.join([str(i) for i in vec])
def fromCSV(s):
"""Converts a CSV string to a vector/array"""
return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
"""Writes MNIST image/label vectors into parallelized files on HDFS"""
# load MNIST gzip into memory
# MNIST圖像、標記向量寫入HDFS
with open(input_images, 'rb') as f:
images = numpy.array(mnist.extract_images(f))
with open(input_labels, 'rb') as f:
if format == "csv2":
labels = numpy.array(mnist.extract_labels(f, one_hot=False))
else:
labels = numpy.array(mnist.extract_labels(f, one_hot=True))
shape = images.shape
print("images.shape: {0}".format(shape)) # 60000 x 28 x 28
print("labels.shape: {0}".format(labels.shape)) # 60000 x 10
# create RDDs of vectors
imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)
labelRDD = sc.parallelize(labels, num_partitions)
output_images = output + "/images"
output_labels = output + "/labels"
# save RDDs as specific format
# RDDs保存特定格式
if format == "pickle":
imageRDD.saveAsPickleFile(output_images)
labelRDD.saveAsPickleFile(output_labels)
elif format == "csv":
imageRDD.map(toCSV).saveAsTextFile(output_images)
labelRDD.map(toCSV).saveAsTextFile(output_labels)
elif format == "csv2":
imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)
else: # format == "tfr":
tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
# Note: this creates TFRecord files w/o requiring a custom Input/Output format
# else: # format == "tfr":
# def writeTFRecords(index, iter):
# output_path = "{0}/part-{1:05d}".format(output, index)
# writer = tf.python_io.TFRecordWriter(output_path)
# for example in iter:
# writer.write(example)
# return [output_path]
# tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
# tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):
"""Reads/verifies previously created output"""
output_images = output + "/images"
output_labels = output + "/labels"
imageRDD = None
labelRDD = None
if format == "pickle":
imageRDD = sc.pickleFile(output_images)
labelRDD = sc.pickleFile(output_labels)
elif format == "csv":
imageRDD = sc.textFile(output_images).map(fromCSV)
labelRDD = sc.textFile(output_labels).map(fromCSV)
else: # format.startswith("tf"):
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))
num_images = imageRDD.count()
num_labels = labelRDD.count() if labelRDD is not None else num_images
samples = imageRDD.take(10)
print("num_images: ", num_images)
print("num_labels: ", num_labels)
print("samples: ", samples)
if __name__ == "__main__":
import argparse
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")
parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)
parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")
parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
args = parser.parse_args()
print("args:",args)
sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize"))
if not args.read:
# Note: these files are inside the mnist.zip file
writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions)
writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions)
if args.read or args.verify:
readMNIST(sc, args.output + "/train", args.format)
提交訓練任務,開始訓練,在HDFS生成mnist_model,命令:
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model
mnist_dist.py 構建TensorFlow 分散式任務,定義分散式任務主函數,啟動TensorFlow主函數map_fun,數據獲取方式Feeding。獲取TensorFlow集群和伺服器實例:
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
TFNode調用tfspark.zip TFNode.py文件。
mnist_spark.py文件是訓練主程式,TensorFlowOnSpark部署步驟:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
def toNumpy(bytestr):
example = tf.train.Example()
example.ParseFromString(bytestr)
features = example.features.feature
image = numpy.array(features['image'].int64_list.value)
label = numpy.array(features['label'].int64_list.value)
return (image, label)
dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
if args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
else: # args.format == "pickle":
images = sc.pickleFile(args.images)
labels = sc.pickleFile(args.labels)
print("zipping images and labels")
dataRDD = images.zip(labels)
#1.為在Executor執行每個TensorFlow進程保留一個埠
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.啟動Tensorflow主函數
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":
#3.訓練
cluster.train(dataRDD, args.epochs)
else:
#3.預測
labelRDD = cluster.inference(dataRDD)
labelRDD.saveAsTextFile(args.output)
#4.關閉Executor TensorFlow計算節點、參數服務節點
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))
預測命令:
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions
還可以Amazon EC2運行及在Hadoop集群採用YARN模式運行。
參考資料:
《TensorFlow技術解析與實戰》
歡迎推薦上海機器學習工作機會,我的微信:qingxingfengzi