學習筆記TF065:TensorFlowOnSpark

来源:http://www.cnblogs.com/libinggen/archive/2017/11/13/7824753.html
-Advertisement-
Play Games

Hadoop生態大數據系統分為Yam、 HDFS、MapReduce計算框架。TensorFlow分散式相當於MapReduce計算框架,Kubernetes相當於Yam調度系統。TensorFlowOnSpark,利用遠程直接記憶體訪問(Remote Direct Memory Access,RDM ...


Hadoop生態大數據系統分為Yam、 HDFS、MapReduce計算框架。TensorFlow分散式相當於MapReduce計算框架,Kubernetes相當於Yam調度系統。TensorFlowOnSpark,利用遠程直接記憶體訪問(Remote Direct Memory Access,RDMA)解決存儲功能和調度,實現深度學習和大數據融合。TensorFlowOnSpark(TFoS),雅虎開源項目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分散式TensorFlow訓練、預測。TensorFlowOnSpark提供橋接程式,每個Spark Executor啟動一個對應TensorFlow進程,通過遠程進程通信(RPC)交互。

TensorFlowOnSpark架構。TensorFlow訓練程式用Spark集群運行,管理Spark集群步驟:預留,在Executor執行每個TensorFlow進程保留一個埠,啟動數據消息監聽器。啟動,在Executor啟動TensorFlow主函數。數據獲取,TensorFlow Readers和QueueRunners機制直接讀取HDFS數據文件,Spark不訪問數據;Feeding,SparkRDD 數據發送TensorFlow節點,數據通過feed_dict機制傳入TensorFlow計算圖。關閉,關閉Executor TensorFlow計算節點、參數服務節點。Spark Driver->Spark Executor->參數伺服器->TensorFlow Core->gRPC、RDMA->HDFS數據集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。

TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一臺電腦。安裝 Spark、Hadoop。部署Java 1.8.0 JDK。下載Spark2.1.0版 http://spark.apache.org/downloads.html 。下載Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持較好。
修改配置文件,設置環境變數,啟動Hadoop:$HADOOP_HOME/sbin/start-all.sh。檢出TensorFlowOnSpark源代碼:

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代碼打包,提交任務使用:

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

設置TensorFlowOnSpark根目錄環境變數:

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

啟動Spark主節點(master):

$(SPARK_HOME)/sbin/start-master.sh

配置兩個工作節點(worker)實例,master-spark-URL連接主節點:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任務,MNIST zip文件轉換為HDFS RDD 數據集:

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看處理過的數據集:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存圖片、標記向量:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把訓練集、測試集分別保存RDD數據。
https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):
"""Serializes an image/label as a TFExample byte string"""
example = tf.train.Example(
features = tf.train.Features(
feature = {
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),
'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))
}
)
)
return example.SerializeToString()
def fromTFExample(bytestr):
"""Deserializes a TFExample from a byte string"""
example = tf.train.Example()
example.ParseFromString(bytestr)
return example
def toCSV(vec):
"""Converts a vector/array into a CSV string"""
return ','.join([str(i) for i in vec])
def fromCSV(s):
"""Converts a CSV string to a vector/array"""
return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
"""Writes MNIST image/label vectors into parallelized files on HDFS"""
# load MNIST gzip into memory
# MNIST圖像、標記向量寫入HDFS
with open(input_images, 'rb') as f:
images = numpy.array(mnist.extract_images(f))
with open(input_labels, 'rb') as f:
if format == "csv2":
labels = numpy.array(mnist.extract_labels(f, one_hot=False))
else:
labels = numpy.array(mnist.extract_labels(f, one_hot=True))
shape = images.shape
print("images.shape: {0}".format(shape)) # 60000 x 28 x 28
print("labels.shape: {0}".format(labels.shape)) # 60000 x 10
# create RDDs of vectors
imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)
labelRDD = sc.parallelize(labels, num_partitions)
output_images = output + "/images"
output_labels = output + "/labels"
# save RDDs as specific format
# RDDs保存特定格式
if format == "pickle":
imageRDD.saveAsPickleFile(output_images)
labelRDD.saveAsPickleFile(output_labels)
elif format == "csv":
imageRDD.map(toCSV).saveAsTextFile(output_images)
labelRDD.map(toCSV).saveAsTextFile(output_labels)
elif format == "csv2":
imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)
else: # format == "tfr":
tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
# Note: this creates TFRecord files w/o requiring a custom Input/Output format
# else: # format == "tfr":
# def writeTFRecords(index, iter):
# output_path = "{0}/part-{1:05d}".format(output, index)
# writer = tf.python_io.TFRecordWriter(output_path)
# for example in iter:
# writer.write(example)
# return [output_path]
# tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
# tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):
"""Reads/verifies previously created output"""
output_images = output + "/images"
output_labels = output + "/labels"
imageRDD = None
labelRDD = None
if format == "pickle":
imageRDD = sc.pickleFile(output_images)
labelRDD = sc.pickleFile(output_labels)
elif format == "csv":
imageRDD = sc.textFile(output_images).map(fromCSV)
labelRDD = sc.textFile(output_labels).map(fromCSV)
else: # format.startswith("tf"):
# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))
num_images = imageRDD.count()
num_labels = labelRDD.count() if labelRDD is not None else num_images
samples = imageRDD.take(10)
print("num_images: ", num_images)
print("num_labels: ", num_labels)
print("samples: ", samples)
if __name__ == "__main__":
import argparse
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")
parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)
parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")
parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
args = parser.parse_args()
print("args:",args)
sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize"))
if not args.read:
# Note: these files are inside the mnist.zip file
writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions)
writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions)
if args.read or args.verify:
readMNIST(sc, args.output + "/train", args.format)

提交訓練任務,開始訓練,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 構建TensorFlow 分散式任務,定義分散式任務主函數,啟動TensorFlow主函數map_fun,數據獲取方式Feeding。獲取TensorFlow集群和伺服器實例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode調用tfspark.zip TFNode.py文件。

mnist_spark.py文件是訓練主程式,TensorFlowOnSpark部署步驟:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.NullWritable")
def toNumpy(bytestr):
example = tf.train.Example()
example.ParseFromString(bytestr)
features = example.features.feature
image = numpy.array(features['image'].int64_list.value)
label = numpy.array(features['label'].int64_list.value)
return (image, label)
dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
if args.format == "csv":
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
else: # args.format == "pickle":
images = sc.pickleFile(args.images)
labels = sc.pickleFile(args.labels)
print("zipping images and labels")
dataRDD = images.zip(labels)
#1.為在Executor執行每個TensorFlow進程保留一個埠
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.啟動Tensorflow主函數
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":
#3.訓練
cluster.train(dataRDD, args.epochs)
else:
#3.預測
labelRDD = cluster.inference(dataRDD)
labelRDD.saveAsTextFile(args.output)
#4.關閉Executor TensorFlow計算節點、參數服務節點
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

預測命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

還可以Amazon EC2運行及在Hadoop集群採用YARN模式運行。

參考資料:
《TensorFlow技術解析與實戰》

歡迎推薦上海機器學習工作機會,我的微信:qingxingfengzi


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • redis應用場景:實現計數器-防止刷單 最近由於雙11要來臨,公司需要在介面請求上,做一下併發限制的處理,或者做一個防止刷單的安全攔截:比如:一個介面請求,限制每秒請求總數為200次,超過200次就等待,等下一秒,再次請求,這裡用到一個redis作為一個計數器的模式來實現。 調用redis的方法: ...
  • Debian部署python3+flask+uwsgi+Nginx+Supervisor 一、安裝編譯用的包 如果在root下就不用輸入 sudo。在子用戶下就在命令前加上 sudo。 $ sudo apt-get install build-essential $ sudo apt-get ins ...
  • 配套視頻: 1.什麼是構造器 2.構造器特點 3.自定義構造器 4.構造器重載 一、什麼是構造器? 我們看下麵一行代碼 new 後面像不像是調用一個方法 我們稱在創建對象時自動調用的方法,稱為構造器 在類中找不到預設的構造器,通過反編譯工具可以看到預設的構造方法 可以看到, 在編譯成位元組碼後,裡面會 ...
  • MVC模型: MVC(Model View Controller 模型-視圖-控制器)是一種Web架構的模式,它把業務邏輯、模型數據、用戶界面分離開來,讓開發者將數據與表現解耦,前端工程師可以只改頁面效果部分而不用接觸後端代碼,DBA可以重新命名數據表並且只需更改一個地方,無需從一大堆文件中進行查找 ...
  • 題目內容: 寫一個將華氏溫度轉換成攝氏溫度的程式,轉換的公式是: °F = (9/5)*°C + 32 其中C表示攝氏溫度,F表示華氏溫度。 程式的輸入是一個整數,表示華氏溫度。輸出對應的攝氏溫度,也是一個整數。 提示,為了把計算結果的浮點數轉換成整數,需要使用下麵的表達式: (int)x; 其中x ...
  • druapl 的核心可能會有漏洞,這時就需要我們去打補丁。很多補丁都已經有人寫好了,我這裡講的就是如何去打這些已經寫好的補丁。 對於這個問題:drupal8 核心有bug導致了兩個相同的錯誤提示的出現 1.打開項目最外層中的composer.json文件 2.確保 "enable-patching" ...
  • 1.查找文件find / -name filename.txt 根據名稱查找/目錄下的filename.txt文件。find . -name “*.xml” 遞歸查找所有的xml文件2.查看一個程式是否運行ps –ef|grep tomcat 查看所有有關tomcat的進程3.終止線程kill -9 ...
  • 在項目開發裡面,我遇到了這麼一個需求,就是對於node的title欄位,編輯內容的角色不允許對title進行編輯。title欄位是創建內容類型時自動生成的欄位,不能在drupal8後臺直接配置許可權,所以我需要用代碼自定義一個許可權。 1.在/modules/custom下自定義一個模塊,我的模塊名為o ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...