CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
pytorch

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: pytorch/tutorials
Path: blob/main/intermediate_source/TCPStore_libuv_backend.rst
Views: 712
Introduction to Libuv TCPStore Backend
======================================
**Authors**: `Xilun Wu <https://github.com/XilunWu>`_

.. note::
    |edit| View and edit this tutorial in `github <https://github.com/pytorch/tutorials/blob/main/intermediate_source/TCPStore_libuv_backend.rst>`__.

.. grid:: 2

   .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn
      :class-card: card-prerequisites
  
      *  What is the new TCPStore backend
      *  Compare the new libuv backend against the legacy backend
      *  How to enable to use the legacy backend


   .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites
      :class-card: card-prerequisites

      * PyTorch 2.4 or later
      * Read about the `TCPStore API <https://pytorch.org/docs/main/distributed.html#torch.distributed.TCPStore>`__.


Introduction
------------

Recently, we have rolled out a new TCPStore server backend using `libuv <https://github.com/libuv/libuv>`__, a third-party library for asynchronous I/O. This new server backend aims to
address scalability and robustness challenges in large-scale distributed training jobs, such as those with more than 1024 ranks. We ran a series of
benchmarks to compare the libuv backend against the old one, and the experiment results demonstrated significant improvements in store initialization
time and maintained a comparable performance in store I/O operations.

As a result of these findings, the libuv backend has been set as the default TCPStore server backend in PyTorch 2.4. This change is expected to enhance
the performance and scalability of distributed training jobs.

This change introduces a slight incompatibility to store initialization. For users who wish to continue using the legacy backend, the tutorial will
provide guidance on how to specify to use the previous TCPStore server backend.


Performance Benchmark
---------------------

To better demonstrate the benefit of our new libuv TCPStore backend, we set up a benchmark over a wide range of job size, from 1024 (1K) to 98304 (96K) ranks.
We first measured the TCPStore initialization time using the code snippet below:

.. code:: python

    import logging
    import os

    from time import perf_counter

    import torch
    import torch.distributed as dist

    logger: logging.Logger = logging.getLogger(__name__)

    # Env var are preset when launching the benchmark
    env_rank = os.environ.get("RANK", 0)
    env_world_size = os.environ.get("WORLD_SIZE", 1)
    env_master_addr = os.environ.get("MASTER_ADDR", "localhost")
    env_master_port = os.environ.get("MASTER_PORT", "23456")

    start = perf_counter()
    tcp_store = dist.TCPStore(
        env_master_addr,
        int(env_master_port),
        world_size=int(env_world_size),
        is_master=(int(env_rank) == 0),
    )
    end = perf_counter()
    time_elapsed = end - start
    logger.info(
        f"Complete TCPStore init with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds."
    )

Since the execution of the TCPStore server thread will be blocked until all clients are successfully connected, we take the time measured on rank 0 as the total
TCPStore initialization runtime. The experiment numbers are reported in the figure below:

.. figure:: /_static/img/distributed/tcpstore_init_time.png
   :width: 100%
   :align: center
   :alt: TCPStore Initialization Runtime Benchmark Result

Figure 1. shows some significant evidence that the libuv backend is superior to the legacy backend:

- TCPStore with libuv backend always has a faster initialization than the legacy backend, especially at super-large scale
- The legacy backend would timeout at server-client connecting at 96K scale (for example, over 30 minutes) while the libuv backend completed the initialization in 100 seconds.

The second benchmark we did is to measure the runtime of TCPStore ``store_based_barrier`` operation:

.. code:: python

    import logging
    import os
    import time

    from datetime import timedelta
    from time import perf_counter

    import torch
    import torch.distributed as dist

    DistStoreError = torch._C._DistStoreError
    logger: logging.Logger = logging.getLogger(__name__)

    # since dist._store_based_barrier is a private function and cannot be directly called, we need to write a function which does the same
    def store_based_barrier(
        rank,
        store,
        group_name,
        rendezvous_count,
        timeout=dist.constants.default_pg_timeout,
        logging_interval=timedelta(seconds=10),
    ):
        store_key = f"store_based_barrier_key:{group_name}"
        store.add(store_key, 1)

        world_size = rendezvous_count
        worker_count = store.add(store_key, 0)

        last_worker_key = f"{store_key}:last_worker"
        if worker_count == world_size:
            store.set(last_worker_key, "1")

        start = time.time()
        while True:
            try:
                # This will throw an exception after the logging_interval in which we print out
                # the status of the group or time out officially, throwing runtime error
                store.wait([last_worker_key], logging_interval)
                break
            except RuntimeError as e:
                worker_count = store.add(store_key, 0)
                # Print status periodically to keep track.
                logger.info(
                    "Waiting in store based barrier to initialize process group for "
                    "rank: %s, key: %s (world_size=%s, num_workers_joined=%s, timeout=%s)"
                    "error: %s",
                    rank,
                    store_key,
                    world_size,
                    worker_count,
                    timeout,
                    e,
                )

                if timedelta(seconds=(time.time() - start)) > timeout:
                    raise DistStoreError(
                        "Timed out initializing process group in store based barrier on "
                        "rank {}, for key: {} (world_size={}, num_workers_joined={}, timeout={})".format(
                            rank, store_key, world_size, worker_count, timeout
                        )
                    )

        logger.info(
            "Rank %s: Completed store-based barrier for key:%s with %s nodes.",
            rank,
            store_key,
            world_size,
        )

    # Env var are preset when launching the benchmark
    env_rank = os.environ.get("RANK", 0)
    env_world_size = os.environ.get("WORLD_SIZE", 1)
    env_master_addr = os.environ.get("MASTER_ADDR", "localhost")
    env_master_port = os.environ.get("MASTER_PORT", "23456")

    tcp_store = dist.TCPStore(
        env_master_addr,
        int(env_master_port),
        world_size=int(env_world_size),
        is_master=(int(env_rank) == 0),
    )

    # sync workers
    store_based_barrier(int(env_rank), tcp_store, "tcpstore_test", int(env_world_size))

    number_runs = 10
    start = perf_counter()
    for _ in range(number_runs):
        store_based_barrier(
            int(env_rank), tcp_store, "tcpstore_test", int(env_world_size)
        )
    end = perf_counter()
    time_elapsed = end - start
    logger.info(
        f"Complete {number_runs} TCPStore barrier runs with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds."
    )

We compute the average by dividing the runtime measured on rank 0 by ``number_runs`` and report it in the figure below:

.. figure:: /_static/img/distributed/tcpstore_barrier_time.png
   :width: 100%
   :align: center
   :alt: TCPStore Barrier Runtime Benchmark Result

Figure 2. shows that the I/O performance of libuv backend is comparable to the legacy backend:

- The libuv backend has a comparable performance over the whole spectrum in terms of the number of ranks
- The libuv backend runtime is more stable than the legacy backend as the number of ranks grows


Impact
------

One incompatibility that users may need to pay attention is, TCPStore currently does not support initialization with a ``listen_fd`` when using libuv backend.
If the user wants to keep using this initialization method, the user can simply pass ``use_libuv=False`` to stay with the old TCPStore backend.

.. code:: python

    import socket

    import torch
    import torch.distributed as dist

    listen_sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_sock.bind(("localhost", 0))
    addr, port, *_ = listen_sock.getsockname()
    listen_fd = listen_sock.detach()

    tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd)  # expect NotImplementedError
    tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd, use_libuv=False)  # OK. Use legacy backend


Exit Route 1: Pass ``use_libuv=False`` to TCPStore Initialization
-----------------------------------------------------------------

As the above code snippet shows, if user calls TCPStore init method to create a store, simply passing ``use_libuv=False`` allows user to remain using the old
TCPStore backend. This override has the highest priority over other approaches determining which backend the TCPStore server should choose.


Exit Route 2: Add ``use_libuv=0`` to ``init_method`` at ProcessGroup Initialization
-----------------------------------------------------------------------------------

``ProcessGroup`` creates a TCPStore if user does not explicitly pass one to its initialization. User can add the query option ``use_libuv=0`` to ``init_method`` when
initializing the ``ProcessGroup``. This approach has lower priority than Exit Route 1.

.. code:: python

    import torch
    import torch.distributed as dist

    addr = "localhost"
    port = 23456
    dist.init_process_group(
        backend="cpu:gloo,cuda:nccl",
        rank=0,
        world_size=1,
        init_method=f"tcp://{addr}:{port}?use_libuv=0",
    )
    dist.destroy_process_group()


Exit Route 3: Set Environment Variable ``USE_LIBUV`` to ``0``
-------------------------------------------------------------

When ProcessGroup creates a TCPStore, it also checks the environment vairable ``USE_LIBUV`` to determine which TCPStore backend to use. User can set the environment
variable ``"USE_LIBUV"`` to ``"0"`` to specify the use of old TCPStore backend. This approach has lower priority than Exit Route 2, for example, if the user sets environment
variable ``USE_LIBUV`` to ``1`` and also passes ``use_libuv=0`` in ``init_method``, then the old store backend will be chosen.

.. code:: python

    import os

    import torch
    import torch.distributed as dist

    addr = "localhost"
    port = 23456
    os.environ["USE_LIBUV"] = "0"
    dist.init_process_group(
        backend="cpu:gloo,cuda:nccl",
        rank=0,
        world_size=1,
        init_method=f"tcp://{addr}:{port}",
    )
    dist.destroy_process_group()


Conclusion
----------
In PyTorch 2.4, we made the new libuv TCPStore backend the default. Although the new backend has incompatibility with initialization from a ``listen_fd``, it
shows significant performance improvement on store initialization at large-scale and compatible performance on store I/O at small/medium/large scales, which
brings a major benefit to Distributed Training's control plane. This tutorial explains our motivation, goes through the performance benchmark, notifies users
of the potential impact, and introduces three exit routes to remain using the legacy backend. In the long term, we aim to eventually deprecate the legacy backend.