分布式 TensorFlow
你好,分布式 TensorFlow !
要查看一个简单的 TensorFlow 集群,请执行以下操作:
# 以单进程“集群”模式启动一个 TensorFlow 服务器
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target) # 在服务器上创建一个会话
>>> sess.run(c)
'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server
创建一个集群
TensorFlow “集群”是一组参与分布式执行 TensorFlow 计算图的“任务(Task)”集合。每个任务都与一个 TensorFlow 服务器(Server) 相关联,TensorFlow 服务器中包含一个可以用来创建会话(sessions)的Master
,和一个在计算图中执行命令的Worker
。一个集群同样可以被分为一个或多个“作业(Job)”,每个作业又包含一个或多个任务。(译者注:集群由任务组成,任务被包含在特定作业中)
要创建一个群集,我们在群集中为每个任务启动一个 TensorFlow 服务器。通常每个任务运行在不同的机器上,但是这里我们在一台机器上运行多个任务(例如,控制不同的GPU设备)。 我们在每个任务中都做如下操作:
在集群中创建一个描述所有任务的
tf.train.ClusterSpec
。它对每个任务而言都应该是相同的。创建一个
tf.train.Server
,将tf.train.ClusterSpec
传给构造函数,并用工作名称标识本地任务和任务索引。
创建一个 tf.train.ClusterSpec
来描述集群
构造 tf.train.ClusterSpec | 可用的任务 |
---|---|
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) |
/job:local/task:0 |
tf.train.ClusterSpec({ "worker": [ "worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222" ], "ps": [ "ps0.example.com:2222", "ps1.example.com:2222" ]}) | /job:worker/task:0 /job:worker/task:1 /job:worker/task:2 /job:ps/task:0 /job:ps/task:1 |
在每个任务中创建一个 tf.train.Server
实例
例如,启动一个运行在 localhost:2222
和 localhost:2223
两台服务器上的集群,在本地机器的两个不同进程上运行以下代码:
# 任务 0 中:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# 任务 1 中:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
注意: 手动指定这些集群规范可能很乏味,特别是对于大型集群。我们正在开发可编程的任务启动工具,例如类似 Kubernetes 的集群管理器。如果你希望 Tensorflow 支持某种特定的集群管理器,请提出一个 GitHub issue。
指定模型中的分布式设备
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
在上面的例子中,变量是在 ps
作业中的两个任务上创建的,模型的计算密集部分是在 worker
作业中创建的。TensorFlow 将在作业之间插入适当的数据传输(正向传递时从 ps
到 worker
,反向传递时从 worker
到 ps
)。
重复训练
一种通用训练的配置,也被称为“并行数据”,包含了使用不同 mini-batch 来训练相同模型的 Worker
作业中的多个任务,更新 ps
作业中一个或多个任务里的共享参数。所有任务通常在不同的机器上运行。在 TensorFlow 中有很多方法可以指定任务分配的结构,我们正在开发简化指定复制模型工作的库。可能的方法包括:
图内复制 在这种方法中,客户端构建一个包含一组参数(在
tf.Variable
节点上固定
到/job:ps
)的tf.Graph
; 以及模型的计算密集型部分的多个副本,
每个副本固定对应到/job:worker
中不同的任务上。异步训练 在这种方法中,图的每个副本都有一个没有独立训练循环,不做协调就可以执行。它是兼容的以上两种形式的复制。
总结:示例训练程序
以下代码显示了分布式训练程序的框架,实现图间复制和异步训练。它包括参数服务器和 Worker
任务的代码。
import argparse
import sys
import tensorflow as tf
FLAGS = None
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# 从参数服务器和工作主机创建一个集群
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# 创建并启动本地任务的服务器
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# 默认情况下将操作分配给本地Worker
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# 建立模型...
loss = ...
global_step = tf.contrib.framework.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
# StopAtStepHook 在运行给定步骤后处理停止
hooks=[tf.train.StopAtStepHook(last_step=1000000)]
# MonitoredTrainingSession 负责会话初始化
# 从检查点恢复,保存到检查点,一旦完成或报错就关闭
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="/tmp/train_logs",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# 异步运行训练
# 有关如何执行同步训练的更多信息,请参见 `tf.train.SyncReplicasOptimizer`
# mon_sess.run 在被抢占 PS 的情况下处理 AbortedError
mon_sess.run(train_op)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
# 用于定义 tf.train.ClusterSpec 的标志
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
要启动两个参数服务器和两个 Worker
的训练,请使用下面的命令行脚本(假设脚本被称为 trainer.py
)
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
Glossary
客户端
客户端通常是一个程序,用来构建 TensorFlow 计算图和创建用于与集群交互的会话 tensorflow::Session
。通常用 Python 或 C++ 编写。一个客户端进程可以直接与多个 TensorFlow 服务器交互(参阅上面的“重复训练”),一台服务器可为多个客户端服务。
集群
作业
一份作业包括一份“任务”清单,通常用于一个共同的目的。例如,名为 ps
(即 parameter server,参数服务器)的作业通常包括存储和更新变量的节点; 而名为 worker
的作业通常包括执行计算密集型任务的无状态节点。工作中的任务通常运行在不同的机器上。这套工作角色是灵活的:例如,Worker
可能会保持某种状态。
主服务
提供远程调用控制一组分布式设备的 RPC 服务,并作为会话目标。 主服务实现了 tensorflow::Session
接口,负责协调一个或多个 worker服务
。所有的 TensorFlow 服务器都实现了 Master 服务。
任务
任务对应于特定的 TensorFlow 服务器,并且通常对应于到一个进程。一个任务属于一个特定的“作业”,并在该作业列表的索引中被唯一标识。
TensorFlow 服务器
Worker 服务
一个使用本地设备执行 TensorFlow 计算图中部分命令的 RPC 服务。Worker 服务实现了 worker_service.proto。所有的 TensorFlow 服务器都实现了 Worker 服务。