分布式 TensorFlow

底层 API 编程介绍

你好,分布式 TensorFlow !

要查看一个简单的 TensorFlow 集群,请执行以下操作:

# 以单进程“集群”模式启动一个 TensorFlow 服务器
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # 在服务器上创建一个会话
>>> sess.run(c)
'Hello, distributed TensorFlow!'

tf.train.Server.create_local_server

创建一个集群

TensorFlow “集群”是一组参与分布式执行 TensorFlow 计算图的“任务(Task)”集合。每个任务都与一个 TensorFlow 服务器(Server) 相关联,TensorFlow 服务器中包含一个可以用来创建会话(sessions)的Master,和一个在计算图中执行命令的Worker。一个集群同样可以被分为一个或多个“作业(Job)”,每个作业又包含一个或多个任务。(译者注:集群由任务组成,任务被包含在特定作业中)

要创建一个群集,我们在群集中为每个任务启动一个 TensorFlow 服务器。通常每个任务运行在不同的机器上,但是这里我们在一台机器上运行多个任务(例如,控制不同的GPU设备)。 我们在每个任务中都做如下操作:

  1. 在集群中创建一个描述所有任务的 tf.train.ClusterSpec。它对每个任务而言都应该是相同的。

  2. 创建一个 tf.train.Server,将 tf.train.ClusterSpec 传给构造函数,并用工作名称标识本地任务和任务索引。

创建一个 tf.train.ClusterSpec 来描述集群

tf.train.ClusterSpec

 
构造 tf.train.ClusterSpec 可用的任务
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1

在每个任务中创建一个 tf.train.Server 实例

tf.Session

例如,启动一个运行在 localhost:2222localhost:2223 两台服务器上的集群,在本地机器的两个不同进程上运行以下代码:

# 任务 0 中:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# 任务 1 中:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

注意: 手动指定这些集群规范可能很乏味,特别是对于大型集群。我们正在开发可编程的任务启动工具,例如类似 Kubernetes 的集群管理器。如果你希望 Tensorflow 支持某种特定的集群管理器,请提出一个 GitHub issue

指定模型中的分布式设备

tf.device

with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

在上面的例子中,变量是在 ps 作业中的两个任务上创建的,模型的计算密集部分是在 worker 作业中创建的。TensorFlow 将在作业之间插入适当的数据传输(正向传递时从 psworker,反向传递时从 workerps)。

重复训练

一种通用训练的配置,也被称为“并行数据”,包含了使用不同 mini-batch 来训练相同模型的 Worker 作业中的多个任务,更新 ps 作业中一个或多个任务里的共享参数。所有任务通常在不同的机器上运行。在 TensorFlow 中有很多方法可以指定任务分配的结构,我们正在开发简化指定复制模型工作的库。可能的方法包括:

  • 图内复制 在这种方法中,客户端构建一个包含一组参数(在 tf.Variable 节点上固定
       到 /job:ps )的 tf.Graph; 以及模型的计算密集型部分的多个副本,
       每个副本固定对应到 /job:worker 中不同的任务上。

  • tf.train.replica_device_setter

  • 异步训练 在这种方法中,图的每个副本都有一个没有独立训练循环,不做协调就可以执行。它是兼容的以上两种形式的复制。

  • tf.train.SyncReplicasOptimizer

总结:示例训练程序

以下代码显示了分布式训练程序的框架,实现图间复制异步训练。它包括参数服务器和 Worker 任务的代码。

import argparse
import sys

import tensorflow as tf

FLAGS = None


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # 从参数服务器和工作主机创建一个集群
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # 创建并启动本地任务的服务器
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # 默认情况下将操作分配给本地Worker
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # 建立模型...
      loss = ...
      global_step = tf.contrib.framework.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

    # StopAtStepHook 在运行给定步骤后处理停止
    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

    # MonitoredTrainingSession 负责会话初始化
    # 从检查点恢复,保存到检查点,一旦完成或报错就关闭
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs",
                                           hooks=hooks) as mon_sess:
      while not mon_sess.should_stop():
        # 异步运行训练
        # 有关如何执行同步训练的更多信息,请参见 `tf.train.SyncReplicasOptimizer`
        # mon_sess.run 在被抢占 PS 的情况下处理 AbortedError
        mon_sess.run(train_op)


if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # 用于定义 tf.train.ClusterSpec 的标志
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

要启动两个参数服务器和两个 Worker 的训练,请使用下面的命令行脚本(假设脚本被称为 trainer.py

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1

Glossary

客户端

客户端通常是一个程序,用来构建 TensorFlow 计算图和创建用于与集群交互的会话 tensorflow::Session。通常用 Python 或 C++ 编写。一个客户端进程可以直接与多个 TensorFlow 服务器交互(参阅上面的“重复训练”),一台服务器可为多个客户端服务。

集群

tf.train.ClusterSpec

作业

一份作业包括一份“任务”清单,通常用于一个共同的目的。例如,名为 ps(即 parameter server,参数服务器)的作业通常包括存储和更新变量的节点; 而名为 worker 的作业通常包括执行计算密集型任务的无状态节点。工作中的任务通常运行在不同的机器上。这套工作角色是灵活的:例如,Worker 可能会保持某种状态。

主服务

提供远程调用控制一组分布式设备的 RPC 服务,并作为会话目标。 主服务实现了 tensorflow::Session 接口,负责协调一个或多个 worker服务。所有的 TensorFlow 服务器都实现了 Master 服务。

任务

任务对应于特定的 TensorFlow 服务器,并且通常对应于到一个进程。一个任务属于一个特定的“作业”,并在该作业列表的索引中被唯一标识。

TensorFlow 服务器

tf.train.Server

Worker 服务

一个使用本地设备执行 TensorFlow 计算图中部分命令的 RPC 服务。Worker 服务实现了 worker_service.proto。所有的 TensorFlow 服务器都实现了 Worker 服务。