Distributed Training

1. Cluster Configuration

HybridBackend provides hb.context for cluster configurtion. For nodes with more than 1 GPUs, HybridBackend provides a launcher hybridbackend.run which reads environment variable NVIDIA_VISIBLE_DEVICES or CUDA_VISIBLE_DEVICES to generate multiple workers on each GPU.

1.1 APIs

hybridbackend.tensorflow.context()

Configurations for cluster and servers.

class hybridbackend.tensorflow.Context

Configurations for cluster and servers.

__init__()

Construct a server specification.

add_saving_listener(name, listener)

Register saving listener.

classmethod canonicalize(devices)

Canonicalize devices.

property cluster_spec

cluster spec.

property cpu_devices

CPU devices of all servers.

classmethod current_device()

Current device.

current_index()

Get global index of current device.

property default_device

default device of current server.

property devices

devices of all servers.

classmethod get()

Get singleton.

classmethod get_tf_config()

Get configuration from TF_CONFIG environment variable.

property has_gpu

True if current server has GPU.

property is_chief

True if current server is chief worker.

property local_cpu_device

CPU0 device of current server.

property local_devices

devices of current server.

property local_world_size

local devices for collective comm.

property num_gpus

Number of GPUs.

property options

global configurations.

property rank

Global index of default local device.

rank_at(device_or_tower_id)

Get global index of device or tower_id.

property saving_listeners

Get registered saving listeners.

classmethod scope(**kwargs)

Update params in context.

classmethod set_tf_config(task_type, task_id, worker_hosts, ps_hosts=None, has_evaluator=False)

Update TF_CONFIG environment variable.

Parameters
  • task_type – name of current job. ‘worker’ should be set for ‘chief’ or ‘evaluator’.

  • task_id – index of current task.

  • worker_hosts – List of workers.

  • ps_hosts – (Optional.) List of parameter servers. Empty by default.

  • has_evaluator – (Optional.) True if evaluator role is required. False by default.

property target

target of current server.

property task_id

task index of current server. 0 by default.

property task_type

job name of current server. localhost by default.

update_params(**kwargs)

Update parameters.

property world_size

Number of devices.

1.2 Example: Get rank and world size

import hybridbackend.tensorflow as hb

print(f'{hb.context.rank}-of-{hb.context.world_size}')

1.3 Example: Update options globally

hb.context.options.batch_size = 16

or

HB_GRAD_NBUCKETS=2 python xxx.py

1.4 Example: Launch workers on single machine of multiple GPUs

# Launch workers for each GPU by reading environment variable
# `NVIDIA_VISIBLE_DEVICES` or `CUDA_VISIBLE_DEVICES`.
python -m hybridbackend.run python /path/to/main.py

1.5 Example: Launch workers on multiple machines of multiple GPUs

# set the environment of `TF_CONFIG` with respect to machines. E.g., 
# TF_CONFIG='{"cluster":{"chief":["x.x.x.x:8860"],"worker":["x.x.x.x:8861"]}, "task":{"type":"chief","index":0}}'
# then set `NVIDIA_VISIBLE_DEVICES` or `CUDA_VISIBLE_DEVICES` for gpus per machine
python -m hybridbackend.run python /path/to/main.py

2. Data Parallelism

HybridBackend provides hb.scope to rewrite variables and optimizers for supporting data paralleism.

2.1 APIs

hybridbackend.tensorflow.scope(**kwargs)

Context manager that decorates for parallel model training.

2.2 Example: Training within a scope

import tensorflow as tf
import hybridbackend.tensorflow as hb

with hb.scope():
  # ...
  loss = tf.losses.get_total_loss()
  # predefined optimizer
  opt = tf.train.GradientDescentOptimizer(learning_rate=lr)

3. Embedding-Sharded Data Parallelism

HybridBackend provides a hb.embedding_scope to shard variables and support embedding-sharded data paralleism.

3.1 APIs

hybridbackend.tensorflow.metrics.accuracy(labels, predictions, weights=None, metrics_collections=None, updates_collections=None, name=None)

Calculates how often predictions matches labels.

The accuracy function creates two local variables, total and count that are used to compute the frequency with which predictions matches labels. This frequency is ultimately returned as accuracy: an idempotent operation that simply divides total by count.

For estimation of the metric over a stream of data, the function creates an update_op operation that updates these variables and returns the accuracy. Internally, an is_correct operation computes a Tensor with elements 1.0 where the corresponding elements of predictions and labels match and 0.0 otherwise. Then update_op increments total with the reduced sum of the product of weights and is_correct, and it increments count with the reduced sum of weights.

If weights is None, weights default to 1. Use weights of 0 to mask values.

Parameters
  • labels – The ground truth values, a Tensor whose shape matches predictions.

  • predictions – The predicted values, a Tensor of any shape.

  • weights – Optional Tensor whose rank is either 0, or the same rank as labels, and must be broadcastable to labels (i.e., all dimensions must be either 1, or the same as the corresponding labels dimension).

  • metrics_collections – An optional list of collections that accuracy should be added to.

  • updates_collections – An optional list of collections that update_op should be added to.

  • name – An optional variable_scope name.

Returns

A Tensor representing the accuracy, the value of total divided

by count.

update_op: An operation that increments the total and count variables

appropriately and whose value matches accuracy.

Return type

accuracy

Raises
  • ValueError – If predictions and labels have mismatched shapes, or if weights is not None and its shape doesn’t match predictions, or if either metrics_collections or updates_collections are not a list or tuple.

  • RuntimeError – If eager execution is enabled.

hybridbackend.tensorflow.metrics.auc(labels, predictions, weights=None, num_thresholds=200, metrics_collections=None, updates_collections=None, curve='ROC', name=None, summation_method='trapezoidal')

Computes the approximate AUC via a Riemann sum.

The auc function creates four local variables, true_positives, true_negatives, false_positives and false_negatives that are used to compute the AUC. To discretize the AUC curve, a linearly spaced set of thresholds is used to compute pairs of recall and precision values. The area under the ROC-curve is therefore computed using the height of the recall values by the false positive rate, while the area under the PR-curve is the computed using the height of the precision values by the recall.

This value is ultimately returned as auc, an idempotent operation that computes the area under a discretized curve of precision versus recall values (computed using the aforementioned variables). The num_thresholds variable controls the degree of discretization with larger numbers of thresholds more closely approximating the true AUC. The quality of the approximation may vary dramatically depending on num_thresholds.

For best results, predictions should be distributed approximately uniformly in the range [0, 1] and not peaked around 0 or 1. The quality of the AUC approximation may be poor if this is not the case. Setting summation_method to ‘minoring’ or ‘majoring’ can help quantify the error in the approximation by providing lower or upper bound estimate of the AUC.

For estimation of the metric over a stream of data, the function creates an update_op operation that updates these variables and returns the auc.

If weights is None, weights default to 1. Use weights of 0 to mask values.

Parameters
  • labels – A Tensor whose shape matches predictions. Will be cast to bool.

  • predictions – A floating point Tensor of arbitrary shape and whose values are in the range [0, 1].

  • weights – Optional Tensor whose rank is either 0, or the same rank as labels, and must be broadcastable to labels (i.e., all dimensions must be either 1, or the same as the corresponding labels dimension).

  • num_thresholds – The number of thresholds to use when discretizing the roc curve.

  • metrics_collections – An optional list of collections that auc should be added to.

  • updates_collections – An optional list of collections that update_op should be added to.

  • curve – Specifies the name of the curve to be computed, ‘ROC’ [default] or ‘PR’ for the Precision-Recall-curve.

  • name – An optional variable_scope name.

  • summation_method – Specifies the Riemann summation method used (https://en.wikipedia.org/wiki/Riemann_sum): ‘trapezoidal’ [default] that applies the trapezoidal rule; ‘careful_interpolation’, a variant of it differing only by a more correct interpolation scheme for PR-AUC - interpolating (true/false) positives but not the ratio that is precision; ‘minoring’ that applies left summation for increasing intervals and right summation for decreasing intervals; ‘majoring’ that does the opposite. Note that ‘careful_interpolation’ is strictly preferred to ‘trapezoidal’ (to be deprecated soon) as it applies the same method for ROC, and a better one (see Davis & Goadrich 2006 for details) for the PR curve.

Returns

A tuple of a scalar Tensor representing the current

area-under-curve and an operation that increments the true_positives, true_negatives, false_positives and false_negatives variables appropriately and whose value matches auc.

Return type

(auc, update_op)

Raises
  • ValueError – If predictions and labels have mismatched shapes, or if weights is not None and its shape doesn’t match predictions, or if either metrics_collections or updates_collections are not a list or tuple.

  • RuntimeError – If eager execution is enabled.

hybridbackend.tensorflow.train.EvaluationHook(fn, steps=100, every_n_iter=1000, summary_dir=None, history=None)

Hook to make evaluation along with training.

hybridbackend.tensorflow.train.export(export_dir_base, checkpoint_path, signature_def_fn, assets_extra=None, as_text=False, clear_devices=True, strip_default_attrs=True, mode='infer')

Build a SavedModel from variables in checkpoint.

Parameters
  • export_dir_base – A string containing a directory to write the exported graph and checkpoints.

  • checkpoint_path – A path to a checkpoint.

  • signature_def_fn – Function returns a signature_def.

  • assets_extra – A dict specifying how to populate the assets.extra directory within the exported SavedModel. Each key should give the destination path (including the filename) relative to the assets.extra directory. The corresponding value gives the full path of the source file to be copied. For example, the simple case of copying a single file without renaming it is specified as {‘my_asset_file.txt’: ‘/path/to/my_asset_file.txt’}.

  • as_text – Whether or not to write the SavedModel proto in text format.

  • clear_devices – Whether or not to clear the device field.

  • strip_default_attrs – Whether or not to remove default-valued attributes from the NodeDefs.

  • mode – PREDICT, TRAIN ot TEST

Returns

Export directory if it’s chief.

3.2 Example: Sharding embedding weights within a scope

import tensorflow as tf
import hybridbackend.tensorflow as hb

def foo():
  # ...
  with hb.scope():
    with hb.embedding_scope():
      embedding_weights = tf.get_variable(
        'emb_weights', shape=[bucket_size, dim_size])
    embedding = tf.nn.embedding_lookup(embedding_weights, ids)
    # ...
    loss = tf.losses.get_total_loss()
    # predefined optimizer
    opt = tf.train.GradientDescentOptimizer(learning_rate=lr)

3.3 Example: Evaluation

import tensorflow as tf
import hybridbackend.tensorflow as hb

def eval_fn():
  # ...
  auc_and_update = hb.metrics.auc(
    labels=eval_labels,
    predictions=eval_logits)
  return {'auc': auc_and_update}

with tf.Graph().as_default():
  with hb.scope():
    batch = tf.data.make_one_shot_iterator(train_ds).get_next()
    # ...
    with hb.embedding_scope():
      embedding_weights = tf.get_variable(
        'emb_weights', shape=[bucket_size, dim_size])
    embedding = tf.nn.embedding_lookup(embedding_weights, ids)
    # ...
    hooks.append(
      hb.train.EvaluationHook(
        eval_fn, every_n_iter=1000, summary_dir='/path/to/summary'))

    with tf.train.MonitoredTrainingSession('', hooks=hooks) as sess:
      while not sess.should_stop():
        sess.run(train_op)

3.4 Example: Exporting to SavedModel

import tensorflow as tf
import hybridbackend.tensorflow as hb

# Export trained models.
def _on_export():
  example_spec = {}
  for f in numeric_fields:
    example_spec[f] = tf.io.FixedLenFeature([1], dtype=tf.int64)
  for f in categorical_fields:
    example_spec[f] = tf.io.VarLenFeature(dtype=tf.int64)

  serialized_examples = tf.placeholder(
      dtype=tf.string,
      shape=[None],
      name='input')
  inputs = tf.io.parse_example(serialized_examples, example_spec)

  outputs = my_model(inputs, training=False)
  return tf.saved_model.predict_signature_def(inputs, outputs)

checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
hb.train.export(export_dir_base, checkpoint_path, _on_export)

4. Sync training with unbalanced data across workers.

In training data across distributed workers, it is likely that some of the workers have been assigned less batches of data than the others. Hence, these workers shall run out of data ahead of other workers. HybridBackend provides users of two strategy to process remained training data on some of the workers.

  1. set data_sync_drop_remainder=True (by default) in hb.scope()

import tensorflow as tf
import hybridbackend.tensorflow as hb

if __name__ == '__main__':
...
with hb.scope(data_sync_drop_remainder=True):
  main()

By doing so, whenever one of the workers has finished assigned training data, HybridBackend would drop remained training data on other workers to end the training task.

  1. set data_sync_drop_remainder=False in hb.scope(). As a result, whenever a worker has finished its training data, it will keep producing empty data (tensor) to join the synchronous training along with other workers until all of the workers have finished their training data. It is worth noting that the users shall ensure a compatibility of their customized TF operators or other implementation to allow such emtpy data (tensor) in their executions.