Define a RunnerΒΆ

A runner runs the inputter and the modeler. It also use callbacks for auxiliary jobs:

# Create callback configuations
callback_config = CallbackConfig(
  mode="train",
  batch_size_per_gpu=64,
  gpu_count=1,
  model_dir="~/demo/model/image_classification_cifar10",
  log_every_n_iter=10,
  save_summary_steps=10)

# Create callbacks
callback_names = ["train_basic", "train_loss", "train_accuracy",
                  "train_speed", "train_summary"]
callbacks = []
for name in callback_names:
      callback = importlib.import_module(
        "source.callback." + name).build(callback_config)
      callbacks.append(callback)

# Create run config
runner_config = RunnerConfig(
  mode="train",
  batch_size_per_gpu=64,
  gpu_count=1,
  summary_names=["loss,accuracy", "learning_rate"])

# Create a runner
runner_name = "source.runner.parameter_server_runner"
runner = importlib.import_module(runner_name).build(runner_config, inputter, modeler, callbacks)

There are two main tasks for a runner: First, running some operators in a Tensorflow session. Second, distributes the computation across multiple-devices if it is needed.

The run member function implements the run:

def run(self):

  # Create the computation graph
  self.create_graph()

  # Create a Tensorflow session
  with tf.Session(config=self.session_config) as self.sess:

    # Do auxiliary jobs before running the graph
    self.before_run()

    # Set up the global step and the maximum step to run
    global_step = 0
    if self.config.mode == "train":
      # For resuming training from the last checkpoint
      global_step = self.sess.run(self.global_step_op)

    max_step = self.sess.run(self.max_step_op)

    # Run the job until max_step
    while global_step < max_step:

      # Do auxiliary jobs before running a step
      self.before_step()

      # Run a step
      self.outputs = self.sess.run(self.run_ops)

      # Do auxiliary jobs after running a step
      self.after_step()

      global_step = global_step + 1

    # Do auxiliary jobs after finishing the run
    self.after_run()

The second task is to distribute computation across multiple device if it is necessary. In this example we use dsynchronized multi-GPU training with a CPU as the parameter server. To do so we use a parameter_server_runner that splits the input data across multiple-GPUs, run computation in parallel on these GPUs, and gather the results for parameter update. The key logic is implemented in its replicate_graph member function.

def replicate_graph(self):

  # Fetch input daaa
  batch = self.inputter.input_fn()

  if self.config.mode == "infer":

    # Use a single GPU for inference
    with tf.device(self.assign_to_device("/gpu:{}".format(0),
                   ps_device="/cpu:0")):
      ops = self.modeler.model_fn(batch)
      return ops

  else:

    output = {}
    # Distribute work across multiple GPUs
    for i in range(self.config.gpu_count):
      with tf.device(self.assign_to_device("/gpu:{}".format(i),
                     ps_device="/cpu:0")):

        # Get the split for the i-th GPU
        x = self.batch_split(batch, i)
        y = self.modeler.model_fn(x)

        # Gather output from the i-th GPU
        if i == 0:
          for key in y:
            output[key] = [y[key]]
        else:
          for key in y:
            output[key].append(y[key])

    # Average results
    reduced_ops = {}
    for key in output:
      reduced_ops[key] = self.reduce_op(output[key])

    # Return the operation to run averaged results
    return reduced_ops

  # Run the application
  runner.run()

To run the application, simply call runner.run().