Source code for bronx.system.numa

# -*- coding: utf-8 -*-

"""
This module provides informations on the NUMA partitioning of the CPUs.

On Belenos (2x AMD Rome socket with 64 cores each)::

    >>> numa_i = numa_nodes_info()
    >>> print(numa_i)
    There are 8 nodes.
    Node 0 description:
    - cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
    - totalsize: 34234114048 bytes
    Node 1 description:
    - cpus: 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
    - totalsize: 34358689792 bytes
    Node 2 description:
    - cpus: 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
    - totalsize: 34358689792 bytes
    Node 3 description:
    - cpus: 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
    - totalsize: 34346106880 bytes
    Node 4 description:
    - cpus: 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    - totalsize: 34358689792 bytes
    Node 5 description:
    - cpus: 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
    - totalsize: 34358689792 bytes
    Node 6 description:
    - cpus: 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    - totalsize: 34358689792 bytes
    Node 7 description:
    - cpus: 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    - totalsize: 34358689792 bytes
    Distances matrix:
              0    1    2    3    4    5    6    7
       0:    10   12   12   12   32   32   32   32
       1:    12   10   12   12   32   32   32   32
       2:    12   12   10   12   32   32   32   32
       3:    12   12   12   10   32   32   32   32
       4:    32   32   32   32   10   12   12   12
       5:    32   32   32   32   12   10   12   12
       6:    32   32   32   32   12   12   10   12
       7:    32   32   32   32   12   12   12   10
    >>> import pprint
    # To obtain a partitioning in blocks of 4 cores that takes NUMA nodes
    # into account
    >>> pprint.pprint(numa_i.numapacked_cpulist(blocksize=4))
    [[0, 1, 2, 3],
     [64, 65, 66, 67],
     [16, 17, 18, 19],
     [80, 81, 82, 83],
     [32, 33, 34, 35],
     [96, 97, 98, 99],
     [48, 49, 50, 51],
     [112, 113, 114, 115],
     [4, 5, 6, 7],
     [68, 69, 70, 71],
     [20, 21, 22, 23],
     [84, 85, 86, 87],
     [36, 37, 38, 39],
     [100, 101, 102, 103],
     [52, 53, 54, 55],
     [116, 117, 118, 119],
     [8, 9, 10, 11],
     [72, 73, 74, 75],
     [24, 25, 26, 27],
     [88, 89, 90, 91],
     [40, 41, 42, 43],
     [104, 105, 106, 107],
     [56, 57, 58, 59],
     [120, 121, 122, 123],
     [12, 13, 14, 15],
     [76, 77, 78, 79],
     [28, 29, 30, 31],
     [92, 93, 94, 95],
     [44, 45, 46, 47],
     [108, 109, 110, 111],
     [60, 61, 62, 63],
     [124, 125, 126, 127]]

"""

from __future__ import print_function, absolute_import, unicode_literals, division

import six

import abc
from collections import defaultdict, deque
import copy
from ctypes import CDLL, byref, c_longlong
from ctypes.util import find_library
from itertools import chain, cycle, combinations

from bronx.compat.moves import collections_abc
from bronx.patterns import Singleton


[docs]def numa_nodes_info(): """Pick a subclass of :class:`NumaNodesInfo` and instantiate it. Depending on the operating system, the approriate class will be picked. If no :class:`NumaNodesInfo` is available, the :class:`NotImplementedError` exception will be raised. """ numalib_ok = False try: numalib_ok = bool(LibnumaGateway().lib) except OSError: # OSError: The library is not instaled or cannot be loaded # NotImplementedError: The libnuma is here but is not functional pass if numalib_ok: return LibNumaNodesInfo() else: raise NotImplementedError('No suitable NumaNodesInfo implementation could be found')
[docs]class NumaNodeInfo(object): """Hold information on a single Numa node.""" def __init__(self, cpus, distances, totalsize): """ :param set cpus: The CPU numbers associated with this NUMA node :param dict distances: The distance to other NUMA nodes :param totalsize: The NUMA node memory size (in bytes) """ self._cpus = cpus self._distances = distances self._totalsize = totalsize @property def cpus(self): """The set of CPU numbers associated with this NUMA node.""" return self._cpus @property def distances(self): """The distance to other NUMA nodes.""" return self._distances @property def totalsize(self): """The NUMA node memory size (in bytes).""" return self._totalsize def __str__(self): s_out = '- cpus: {:s}\n'.format(' '.join(['{:d}'.format(c) for c in sorted(self.cpus)])) s_out += '- totalsize: {:d} bytes'.format(self._totalsize) return s_out
@six.add_metaclass(abc.ABCMeta) class _NumaAbstractCpuIdDispencer(object): """Dispenser object return blocks of CPUs of a given size. It can be used as an iterator (in this case, the block's size needs to be fixed and specified at creation time using the *default_bsize* attribute) or by simply calling the object (:meth:`__call__`) with a single argument that is the desired blokc size. """ def __init__(self, default_bsize=None): """ :param int default_bsize: The block size used when iterating """ self._default_bsize = default_bsize self._iterlock = False def __iter__(self): if not self._default_bsize: raise ValueError('Cannot iterate because "default_bsize" was not given when building the object') if self._iterlock: raise RuntimeError('You can only iterate once...') self._iterlock = True return self def __next__(self): return self(self._default_bsize) if six.PY2: next = __next__ @abc.abstractmethod def __call__(self, blocksize): pass class _NumaPackedCpuIdDispenser(_NumaAbstractCpuIdDispencer): """ Return blocks of CPUs that are chosen in such a way that the NUMA distance between CPUs is minimun. """ def __init__(self, total_avcpus, xnodesclust, default_bsize=None): super(_NumaPackedCpuIdDispenser, self).__init__(default_bsize=default_bsize) self._total_avcpus = total_avcpus self._last_blocksize = None self._xnodesclust = xnodesclust self._xnodesclust_its_cache = dict() def _xnodesclust_its(self, maxdist, blocksize): """For a given *maxdist*, return the iterators through clusters and NUMA nodes.""" if ((maxdist not in self._xnodesclust_its_cache) or (self._last_blocksize and self._last_blocksize != blocksize)): xnodeclust = self._xnodesclust[maxdist] xnodeclust_list = list() for cluster in xnodeclust: # Always start with the NUMA node that has the buggest number of # available CPUs nodes_av_cpus = [len(node) for node in cluster] nodes_maxidx_av_cpus = nodes_av_cpus.index(max(nodes_av_cpus)) nodes_its = cycle(cluster) for _ in range(nodes_maxidx_av_cpus): next(nodes_its) xnodeclust_list.append((cluster, nodes_its)) # Always start with the cluster that has the buggest number of available CPUs clust_av_cpus = [sum([len(d) for d in clust]) for clust in xnodeclust] clust_maxidx_av_cpus = clust_av_cpus.index(max(clust_av_cpus)) xnodeclust_its = cycle(xnodeclust_list) for _ in range(clust_maxidx_av_cpus): next(xnodeclust_its) self._xnodesclust_its_cache[maxdist] = xnodeclust_its return self._xnodesclust_its_cache[maxdist] def __call__(self, blocksize): if blocksize > self._total_avcpus: # If the number of available CPUs is to small, stop right away raise StopIteration() # Start to distibute the block within cluster if NUMA nodes that have # the smallest NUMA distance... for maxdist in sorted(self._xnodesclust.keys()): # Go through the various clusters xnodeclust_its = self._xnodesclust_its(maxdist, blocksize) for _ in range(len(self._xnodesclust[maxdist])): # We use persistent iterator (from one call to another) in order # To evenly distribute the CPUs cluster, nodes_it = next(xnodeclust_its) # Try to create block of blocksize size in a round-robin manner if sum([len(nodes) for nodes in cluster]) >= blocksize: block = list() while len(block) < blocksize: nodes = next(nodes_it) if nodes: block.append(nodes.popleft()) self._total_avcpus -= blocksize # Great we found a match ! return sorted(block) raise RuntimeError('We should never end up here...') class _NumaBalancedCpuIdDispenser(_NumaAbstractCpuIdDispencer): """ Return blocks of CPUs that are chosen in such a way that they spread over all available NUMA zones (regardless of the NUMA distance between CPUs. """ def __init__(self, cpuiterator, default_bsize=None): super(_NumaBalancedCpuIdDispenser, self).__init__(default_bsize=default_bsize) self._cpuiter = cpuiterator def __call__(self, blocksize): res = [] while len(res) < blocksize: # Stop Iteration may be raised here... that's fine with us ! res.append(next(self._cpuiter)[0]) return sorted(res) class _MetaCpuIdDispenser(object): """Group several :class:`_NumaAbstractCpuIdDispencer` into one.""" def __init__(self, *dispensers): """ :param dispensers: The list of dispensers that will be grouped by this object. """ self._dispensers = deque(dispensers) def __iter__(self): return chain(* self._dispensers) def __call__(self, blocksize): while self._dispensers: try: return self._dispensers[0](blocksize) except StopIteration: self._dispensers.popleft() raise StopIteration()
[docs]@six.add_metaclass(abc.ABCMeta) class NumaNodesInfo(collections_abc.Mapping): """Hold information on the system's NUMA nodes. Abstract class. """ def __init__(self): self._nodes = dict() self._clustering_cache = None self._cyclic_cpus_cache = dict() self._fill_nodes() @abc.abstractmethod def _fill_nodes(self): """Fill the self._nodes dictionary with :class:`NumaNodeInfo` objects.""" pass @abc.abstractmethod def _get_freesize(self, nodeid): """Return the amount of free memory for the **nodeid** NUMA node.""" pass def __getitem__(self, nodeid): """Return the :class:`NumaNodeInfo` corresponding to the **nodeid** NUMA node.""" return self._nodes[nodeid] def __len__(self): """Return the number of NUMA nodes on this system.""" return len(self._nodes) def __iter__(self): """Iterate through NUMA nodes identifiers.""" return iter(self._nodes)
[docs] def freesize(self, nodeid): """Return the amount of free memory for the **nodeid** NUMA node.""" if nodeid not in self: raise KeyError("The requested node does not exists") return self._get_freesize(nodeid)
def __str__(self): s_out = 'There are {:d} nodes.\n'.format(len(self)) for n, ni in self.items(): s_out += 'Node {:d} description:\n{!s}\n'.format(n, ni) s_out += 'Distances matrix:\n' s_out += ' {:s}\n'.format(' '.join(['{:4d}'.format(i) for i in sorted(self)])) s_out += '\n'.join(['{:-4d}: {:s}' .format(i, ' '.join(['{:4d}'.format(d) for _, d in sorted(node.distances.items())])) for i, node in sorted(self.items())]) return s_out
[docs] def numapacked_cpu_dispenser(self, smtlayout=None, default_bsize=None): """Return a new :class:`_NumaPackedCpuIdDispenser` object. :param smtlayout: A dictionary that associates a physical CPU ids to its virtual CPUs :param default_bsize: The default blocksize """ total_ncpus = sum([len(zi.cpus) for zi in self.values()]) smt_replication = smtlayout and len(smtlayout) < total_ncpus if smt_replication: # SMT layout provided: Consider only the physical CPUs when # calculating the cpculist (SMT threads are added at the end...) nsmt_threads = total_ncpus // len(smtlayout) if total_ncpus % len(smtlayout) != 0: raise ValueError('Strange number of SMT threads !') physicalcpus = set(smtlayout.keys()) numa_nodes_iterator = dict() for nnode, ninfo in self.items(): linfo = NumaNodeInfo(set([c for c in ninfo.cpus if c in physicalcpus]), ninfo.distances, ninfo.totalsize) numa_nodes_iterator[nnode] = linfo else: # The SMT is not activated or the requested blocksize is too big numa_nodes_iterator = self # Obtain a hierarchical clustering of the nodes if len(self) == 1: nodesclust = dict() elif len(self) == 2: # Easy... nodesclust = {self[0].distances[1]: [set(self.keys()), ]} else: nodesclust = copy.copy(self.nodes_clustering) nodesclust[0] = [set([n, ]) for n in sorted(self.keys())] # Re-order things in order to spread the blocks all over the NUMA nodes distances = sorted(nodesclust.keys()) for i, d in enumerate(distances[:-1]): mynodesclust = list(nodesclust[d]) nextnodesclust = nodesclust[distances[i + 1]] mynewnodesclust = list() nextclusters_it = cycle(nextnodesclust) while mynodesclust: parentclust = next(nextclusters_it) itodo = None for iclust, c in enumerate(mynodesclust): if c <= parentclust: itodo = iclust break if itodo is not None: mynewnodesclust.append(mynodesclust[itodo]) del mynodesclust[itodo] nodesclust[d] = mynewnodesclust # A dictionary of available CPUS... nodes_avcpus = dict() for n, ninfo in numa_nodes_iterator.items(): nodes_avcpus[n] = deque(sorted(ninfo.cpus)) total_avcpus = sum([len(avcpus) for avcpus in nodes_avcpus.values()]) xnodesclust = {d: [[nodes_avcpus[n] for n in c] for c in clusters] for d, clusters in nodesclust.items()} dispensers = [_NumaPackedCpuIdDispenser(total_avcpus, xnodesclust, default_bsize=default_bsize), ] if smt_replication: # Add extra entries for SMT threads (if needed) for ismt in range(0, nsmt_threads - 1): nodes_avcpus = dict() for n, ninfo in numa_nodes_iterator.items(): nodes_avcpus[n] = deque([smtlayout[c][ismt] for c in sorted(ninfo.cpus)]) xnodesclust = {d: [[nodes_avcpus[n] for n in c] for c in clusters] for d, clusters in nodesclust.items()} dispensers.append(_NumaPackedCpuIdDispenser(total_avcpus, xnodesclust, default_bsize=default_bsize)) return _MetaCpuIdDispenser(* dispensers) else: return dispensers[0]
[docs] def numabalanced_cpu_dispenser(self, smtlayout=None, default_bsize=None): """Return a new :class:`_NumaBalancedCpuIdDispenser` object. :param smtlayout: A dictionary that associates a physical CPU ids to its virtual CPUs :param default_bsize: The default blocksize """ return _NumaBalancedCpuIdDispenser( iter(self.numapacked_cpu_dispenser(smtlayout, default_bsize=1)), default_bsize=default_bsize )
[docs] def numapacked_cpulist(self, blocksize, smtlayout=None): """Return a list of consecutive **blocksize** CPU ids. :param blocksize: The size of each block of CPUs :param smtlayout: A dictionary that associates a physical CPU ids to its virtual CPUs Such CPU ids should be evenly distributed across NUMA zones while trying to minimise the NUMA distance between CPUs. """ total_ncpus = sum([len(zi.cpus) for zi in self.values()]) if not (0 < blocksize <= total_ncpus): raise ValueError('blocksize must be between 1 and the number of CPUs.') # Check for a pre-computed result cache_key = (blocksize, bool(smtlayout)) if cache_key not in self._cyclic_cpus_cache: dispenser = self.numapacked_cpu_dispenser(smtlayout=smtlayout, default_bsize=blocksize) self._cyclic_cpus_cache[cache_key] = [block for block in dispenser] return self._cyclic_cpus_cache[cache_key]
[docs] def numabalanced_cpulist(self, blocksize, smtlayout=None): """Return a list of consecutive **blocksize** CPU ids. :param blocksize: The size of each block of CPUs :param smtlayout: A dictionary that associates a physical CPU ids to its virtual CPUs Such CPU ids should be evenly distributed across NUMA zones in order to spread as much as possible (regardless of the NUMA distance between CPUs). """ cyclic_singleblocks = self.numapacked_cpulist(1, smtlayout=smtlayout) return [[c[0] for c in cyclic_singleblocks[i * blocksize:(i + 1) * blocksize]] for i in range(len(cyclic_singleblocks) // blocksize)]
@property def nodes_clustering(self): """ Perform a hierarchical clustering over the NUMA nodes based on the inter-node distance (using a 'Farthest Point Algorithm' method) An harcoded version of the clustering algorithm is available, but a version based on scipy is also included (depending on the number of nodes on or another is chosen). """ if self._clustering_cache is None: if len(self) <= 16: # If the number of nodes is small enough our hard-coded clustering # seems to be fast enough... self._clustering_cache = self._inhouse_nodes_clustering() else: try: self._clustering_cache = self._scipy_nodes_clustering() except ImportError: self._clustering_cache = self._inhouse_nodes_clustering() return self._clustering_cache def _scipy_nodes_clustering(self): import numpy as np import scipy.cluster.hierarchy as hie nz = len(self) allnodes = sorted(self.keys()) # Compute the flat distance matrix flatmat = np.empty((nz * (nz - 1) // 2, ), dtype=np.int64) i = 0 for iz in allnodes: for jz in allnodes[iz + 1:]: flatmat[i] = self[iz].distances[jz] i += 1 # Compute the linkage matrix lm = hie.linkage(flatmat, method='complete') # generate clusters mdistances = np.unique(hie.maxdists(lm)) raw_clust_results = {d: hie.fcluster(lm, t=d, criterion='distance') for d in mdistances} clust_results = dict() nodes_mapping = {i: n for i, n in enumerate(allnodes)} for d, res in raw_clust_results.items(): dset = defaultdict(set) for i, c in enumerate(res): dset[c].add(nodes_mapping[i]) clust_results[d] = [v for v in sorted(dset.values(), key=lambda x: min(x))] return clust_results def _inhouse_nodes_clustering(self): clust_results = dict() latest_clust_list = [[n, ] for n in sorted(self.keys())] for _ in range(len(self) - 1): d_ijs = defaultdict(list) for combi in combinations(range(len(latest_clust_list)), 2): distances = list() for i0 in latest_clust_list[combi[0]]: for i1 in latest_clust_list[combi[1]]: distances.append(self[i0].distances[i1]) d_ijs[max(distances)].append(combi) best_d_ijs = min(d_ijs.keys()) best_ij = d_ijs[best_d_ijs][0] combi = latest_clust_list[best_ij[0]] + latest_clust_list[best_ij[1]] latest_clust_list[best_ij[0]] = combi del latest_clust_list[best_ij[1]] clust_results[best_d_ijs] = copy.copy(latest_clust_list) clust_results = {float(d): [set(c) for c in sorted(clusters, key=lambda x: min(x))] for d, clusters in clust_results.items()} return clust_results
[docs]class LibnumaGateway(Singleton): """Interface to the libnuma DLL.""" def __init__(self): """ The DLL is initialised on the first call. """ self._lib = None @property def lib(self): """Return the libnuma DLL (provided by the ctypes package).""" if self._lib is None: libname = find_library("numa") if libname: self._lib = CDLL(libname) # May raise OSError if the library is missing else: raise OSError('The libnuma library is not available on this system.') self._lib.numa_node_size64.restype = c_longlong if self._lib.numa_available() == -1: raise NotImplementedError('The NUMA library is instaled but cannot be used.') return self._lib
[docs] def numa_node_size64(self, node): """Return the memory associated with a given **node**. :rtype: tuple :return: (total_memory, free_memory) """ free = c_longlong() total = self.lib.numa_node_size64(node, byref(free)) return (total, free.value)
def __getattr__(self, name): """Re-route all the calls to the libnuma's DLL.""" return getattr(self.lib, name)
[docs]class LibNumaNodesInfo(NumaNodesInfo): """Hold information on the system's NUMA nodes. The information about NUMA nodes is retrieved using the standard libnuma. """ _gateway_class = LibnumaGateway def __init__(self, **kwargs): self._gateway = self._gateway_class(** kwargs) super(LibNumaNodesInfo, self).__init__() def _fill_nodes(self): """Fill the self._nodes dictionary with :class:`NumaNodeInfo` objects.""" nodes_cpus = defaultdict(set) for c in range(0, self._gateway.numa_num_configured_cpus()): nodes_cpus[self._gateway.numa_node_of_cpu(c)].add(c) for i in range(0, self._gateway.numa_max_node() + 1): totalsize, _ = self._gateway.numa_node_size64(i) self._nodes[i] = NumaNodeInfo(nodes_cpus[i], {n: self._gateway.numa_distance(i, n) for n in range(0, self._gateway.numa_max_node() + 1)}, totalsize) def _get_freesize(self, node): _, freesize = self._gateway.numa_node_size64(node) """Return the amount of free memory for the **nodeid** NUMA node.""" return freesize