Source code for flax.core.spmd

# Copyright 2024 The Flax Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import dataclasses
import threading

import jax
from jax.sharding import PartitionSpec, NamedSharding
from flax.core import meta
from flax.typing import (
    LogicalRules,
    Sharding,
)

def get_pspec(sharding, sharding_rules = None) -> PartitionSpec:
  """Given an `nnx.Variable`, return its `PartitionSpec`."""
  if get_logical_axis_rules() or sharding_rules:
    context_rules = get_logical_axis_rules()
    rules = composite_rules(context_rules, sharding_rules)
    return PartitionSpec(*from_sharding_rules(sharding, rules))
  return PartitionSpec(*sharding)

def _apply_sharding(value, sharding, mesh):
  if mesh.are_all_axes_explicit:
    return jax.sharding.reshard(value, sharding)
  elif mesh.are_all_axes_auto:
    return jax.lax.with_sharding_constraint(value, sharding)
  else:
    raise ValueError(
        'Mesh must have all axes as Explicit or all axes as Auto. '
        f'Got mixed axis types: {mesh.axis_types}')


def shard_value(
  value, sharding, sharding_rules, mesh: jax.sharding.AbstractMesh | jax.sharding.Mesh | None
):
  if not sharding:
    return value

  if mesh is None:
    mesh = meta.get_global_mesh()

  if mesh is None:
    raise ValueError(
      'An auto mesh context or metadata is required if creating a variable'
      f' with annotation {sharding=}. '
      'For more guidance, see https://flax.readthedocs.io/en/latest/flip/4844-var-eager-sharding.html.')
  pspec = get_pspec(sharding, sharding_rules)
  return _apply_sharding(value, NamedSharding(mesh, pspec), mesh)




# Dynamic Axis Mapping Context
# ------------------------------------------------------------------------------


@dataclasses.dataclass
class _AxisRules(threading.local):
  """Dynamic logical axis to mesh axis binding context."""

  rules: LogicalRules = ()


# Global axis binding context.
_axis_rules = _AxisRules()


[docs]def set_logical_axis_rules(rules: LogicalRules): """Sets the global logical axis to mesh axis binding.""" _axis_rules.rules = rules
[docs]def get_logical_axis_rules() -> LogicalRules: """Returns the global logical axis to mesh axis binding.""" return _axis_rules.rules
[docs]@contextlib.contextmanager def logical_axis_rules(rules: LogicalRules): """Context manager for setting the logical to mesh axis bindings.""" old_rules = _axis_rules.rules try: _axis_rules.rules = rules yield finally: _axis_rules.rules = old_rules
def composite_rules(rule1, rule2): if not rule1 and not rule2: return () if rule1 and not rule2: return rule1 if rule2 and not rule1: return rule2 rules = {alias: value for alias, value in rule1} for alias, value in rule2: if alias in rules and rules[alias] != value: raise ValueError( f'Inconsistent logical axis annotations for {alias}: ' f'{rules[alias]} vs {value}' ) rules[alias] = value return tuple(rules.items()) def from_sharding_rules( sharding: Sharding, sharding_rules: LogicalRules ) -> Sharding: rules = {alias: on_mesh for (alias, on_mesh) in sharding_rules} return tuple( rules[str(s)] if (s and str(s) in rules) else s for s in sharding )