Skip to content

vllm.distributed.eplb

Expert parallelism load balancer (EPLB).

Modules:

Name Description
eplb_state

Expert parallelism load balancer (EPLB) metrics and states.

rebalance_algo

Expert parallelism load balancer (EPLB) for vLLM.

rebalance_execute

The actual execution of the rearrangement.

logger module-attribute

logger = init_logger(__name__)

EplbModelState dataclass

EPLB metrics.

Source code in vllm/distributed/eplb/eplb_state.py
@dataclass
class EplbModelState:
    """EPLB metrics."""

    physical_to_logical_map: torch.Tensor
    """
    Mapping from physical experts to logical experts.

    Shape: (num_moe_layers, num_physical_experts)

    # Example

    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the mapping could look like this:

    ```
    [[0, 1, 2, 3, 0, 1],
     [0, 2, 0, 1, 0, 3]]
    ```
    """
    logical_to_physical_map: torch.Tensor
    """
    Mapping from logical experts to physical experts.

    This is a sparse matrix, where -1 indicates no mapping.

    Shape: (num_moe_layers, num_logical_experts, num_redundant_experts + 1)

    # Example

    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the mapping could look like this:

    ```
    [[[0, 4, -1],
      [1, 5, -1],
      [2, -1, -1],
      [3, -1, -1]],
     [[0, 2, 4],
      [3, -1, -1],
      [1, -1, -1],
      [5, -1, -1]]]
    ```
    """
    logical_replica_count: torch.Tensor
    """
    Number of replicas for each logical expert.
    This is exactly the non-`-1` count in the `logical_to_physical_map`.

    Shape: (num_moe_layers, num_logical_experts)

    # Example
    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the count could look like this:

    ```
    [[2, 2, 1, 1],
     [3, 1, 1, 1]]
    """

    expert_load_pass: torch.Tensor
    """
    Expert load during this forward pass. 
    We use the token count each expert processes as the load.

    Shape: (num_moe_layers, num_physical_experts)
    """
    expert_load_window: torch.Tensor
    """
    A sliding window of expert load.

    Shape: (window_size, num_moe_layers, num_physical_experts)

    NOTE: The expert_load_view now records load for all physical experts
    rather than just local experts. This ensures consistent load statistics
    across different dispatch methods (naive all-to-all, DeepEP, pplx-kernels).
    The recorded load will be multiplied by dp_size when using naive all-to-all
    due to each DP rank contributing the same token set to the calculation.
    See:
    https://github.com/vllm-project/vllm/pull/22167#pullrequestreview-3086143856
    """
    model_name: str
    model: MixtureOfExperts

expert_load_pass instance-attribute

expert_load_pass: Tensor

Expert load during this forward pass. We use the token count each expert processes as the load.

Shape: (num_moe_layers, num_physical_experts)

expert_load_window instance-attribute

expert_load_window: Tensor

A sliding window of expert load.

Shape: (window_size, num_moe_layers, num_physical_experts)

NOTE: The expert_load_view now records load for all physical experts rather than just local experts. This ensures consistent load statistics across different dispatch methods (naive all-to-all, DeepEP, pplx-kernels). The recorded load will be multiplied by dp_size when using naive all-to-all due to each DP rank contributing the same token set to the calculation. See: https://github.com/vllm-project/vllm/pull/22167#pullrequestreview-3086143856

logical_replica_count instance-attribute

logical_replica_count: Tensor

Number of replicas for each logical expert. This is exactly the non--1 count in the logical_to_physical_map.

Shape: (num_moe_layers, num_logical_experts)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the count could look like this:

``` [[2, 2, 1, 1], [3, 1, 1, 1]]

logical_to_physical_map instance-attribute

logical_to_physical_map: Tensor

Mapping from logical experts to physical experts.

This is a sparse matrix, where -1 indicates no mapping.

Shape: (num_moe_layers, num_logical_experts, num_redundant_experts + 1)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the mapping could look like this:

[[[0, 4, -1],
  [1, 5, -1],
  [2, -1, -1],
  [3, -1, -1]],
 [[0, 2, 4],
  [3, -1, -1],
  [1, -1, -1],
  [5, -1, -1]]]

model instance-attribute

model_name instance-attribute

model_name: str

physical_to_logical_map instance-attribute

physical_to_logical_map: Tensor

Mapping from physical experts to logical experts.

Shape: (num_moe_layers, num_physical_experts)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the mapping could look like this:

[[0, 1, 2, 3, 0, 1],
 [0, 2, 0, 1, 0, 3]]

__init__

__init__(
    physical_to_logical_map: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
    expert_load_pass: Tensor,
    expert_load_window: Tensor,
    model_name: str,
    model: MixtureOfExperts,
) -> None

EplbState

EplbState of each expert parallel model. Key is the model config hash.

Source code in vllm/distributed/eplb/eplb_state.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
class EplbState:
    """
    EplbState of each expert parallel model. Key is the model config hash.
    """

    def __init__(self, parallel_config: ParallelConfig, device: torch.device):
        self.parallel_config = parallel_config
        self.device = device
        self.model_states: dict[str, EplbModelState] = {}
        """
        Current step in the sliding window.

        Different from `expert_rearrangement_step`, 
        each EP rank may have its own `expert_load_window_step`.
        """
        self.expert_load_window_step: int = 0
        """
        Size of the expert load sliding window.
        This is a constant and is taken from the config.
        """
        self.expert_load_window_size: int = 0
        """
        Steps after last rearrangement.
        Will trigger a rearrangement if it exceeds the threshold.

        NOTE: Keep in mind that all EP ranks need to have the same
        `expert_rearrangement_step` value to ensure synchronization.
        Otherwise, the rearrangement will hang at collective
        communication calls.
        """
        self.expert_rearrangement_step: int = 0
        """
        Interval for expert rearrangement steps.
        This is a constant and is taken from the config.
        """
        self.expert_rearrangement_step_interval: int = 0

    @staticmethod
    def build_initial_global_physical_to_logical_map(
        num_routed_experts: int,
        num_redundant_experts: int,
    ) -> Sequence[int]:
        """
        Build an initial expert arrangement using the following structure:
        [original routed experts, redundant experts]

        Returns:
            physical_to_logical_map (Sequence[int]): A list of integers,
                where each integer is the index of the logical expert
                that the corresponding physical expert maps to.
        """
        global_physical_to_logical_map = list(range(num_routed_experts))
        global_physical_to_logical_map += [
            i % num_routed_experts for i in range(num_redundant_experts)
        ]
        return global_physical_to_logical_map

    def validate_ep_configuration(self, new_model: MixtureOfExperts):
        """
        Validate that the expert parallel configuration of
        the new model is the same as the existing models.
        """
        if len(self.model_states) > 0:
            model = next(iter(self.model_states.values())).model
            if (
                model.num_routed_experts != new_model.num_routed_experts
                or model.num_redundant_experts != new_model.num_redundant_experts
                or model.num_physical_experts != new_model.num_physical_experts
                or model.num_logical_experts != new_model.num_logical_experts
                or model.num_expert_groups != new_model.num_expert_groups
            ):
                raise RuntimeError(
                    "Model: {} "
                    "with config {} "
                    "{} {} {} {} "
                    "mismatch with new model {} "
                    "with config {} "
                    "{} {} {} {}".format(
                        type(model),
                        model.num_routed_experts,
                        model.num_redundant_experts,
                        model.num_physical_experts,
                        model.num_logical_experts,
                        model.num_expert_groups,
                        type(new_model),
                        new_model.num_routed_experts,
                        new_model.num_redundant_experts,
                        new_model.num_physical_experts,
                        new_model.num_logical_experts,
                        new_model.num_expert_groups,
                    )
                )

    def add_model(
        self,
        model: MixtureOfExperts,
        model_config: ModelConfig,
        global_expert_load: torch.Tensor | None = None,
        old_global_expert_indices: torch.Tensor | None = None,
        rank_mapping: dict[int, int] | None = None,
    ):
        """
        Build the initial EPLB state.
        """
        self.validate_ep_configuration(model)
        physical_to_logical_map_list = (
            EplbState.build_initial_global_physical_to_logical_map(
                model.num_routed_experts,
                model.num_redundant_experts,
            )
        )
        physical_to_logical_map = torch.tensor(
            physical_to_logical_map_list,
            device=self.device,
        )
        # Assuming 8 GPUs per node, this supports up to
        # (1023 + 1) / 8 = 128 nodes for now.
        # TODO(rui): make this configurable
        MAX_EXPERT_REDUNDANCY = 1023
        assert model.num_redundant_experts <= MAX_EXPERT_REDUNDANCY, (
            f"num_redundant_experts {model.num_redundant_experts} "
            f"must be less than or equal to {MAX_EXPERT_REDUNDANCY}"
        )
        max_slots_per_logical_expert = MAX_EXPERT_REDUNDANCY + 1
        logical_to_physical_map = torch.full(
            (model.num_logical_experts, max_slots_per_logical_expert),
            -1,
            device=self.device,
        )
        logical_replica_count = torch.zeros(
            (model.num_logical_experts,),
            device=self.device,
            dtype=torch.long,
        )

        for i in range(model.num_physical_experts):
            logical_idx = physical_to_logical_map[i]
            logical_to_physical_map[logical_idx, logical_replica_count[logical_idx]] = i
            logical_replica_count[logical_idx] += 1

        # Duplicate initial mapping for all layers
        physical_to_logical_map = (
            physical_to_logical_map.unsqueeze(0)
            .expand(
                model.num_moe_layers,
                -1,
            )
            .contiguous()
        )
        logical_to_physical_map = (
            logical_to_physical_map.unsqueeze(0)
            .expand(
                model.num_moe_layers,
                -1,
                -1,
            )
            .contiguous()
        )
        logical_replica_count = (
            logical_replica_count.unsqueeze(0)
            .expand(
                model.num_moe_layers,
                -1,
            )
            .contiguous()
        )

        expert_load_pass = torch.zeros(
            (model.num_moe_layers, model.num_physical_experts),
            dtype=torch.int32,
            device=self.device,
        )
        self.expert_load_window_size = self.parallel_config.eplb_config.window_size
        expert_load_window = torch.zeros(
            (
                self.expert_load_window_size,
                model.num_moe_layers,
                model.num_physical_experts,
            ),
            dtype=torch.int32,
            device=self.device,
        )

        # Set the initial progress of rearrangement to 3/4
        eplb_step_interval = self.parallel_config.eplb_config.step_interval
        self.expert_rearrangement_step = max(
            0, eplb_step_interval - eplb_step_interval // 4
        )
        self.expert_rearrangement_step_interval = eplb_step_interval

        if global_expert_load is not None:
            ep_group = get_ep_group().device_group
            assert global_expert_load.shape == (
                model.num_moe_layers,
                model.num_logical_experts,
            )
            assert global_expert_load.dtype == torch.int64

            num_replicas = model.num_physical_experts
            num_groups = model.num_expert_groups
            num_nodes = get_node_count()
            num_gpus = ep_group.size()

            if num_gpus % num_nodes != 0:
                num_nodes = 1
                logger.warning_once(
                    f"num_gpus % num_nodes != 0, "
                    "not using hierarchical rearrangement algorithm.\n"
                    f"{num_gpus=}, {num_nodes=}"
                )

            # Get new expert mappings
            (
                new_physical_to_logical_map,
                new_logical_to_physical_map,
                new_logical_replica_count,
            ) = rebalance_experts(
                global_expert_load,
                num_replicas,
                num_groups,
                num_nodes,
                num_gpus,
            )

            max_physical_slots = new_logical_to_physical_map.shape[-1]
            assert max_physical_slots <= logical_to_physical_map.shape[-1]
            new_logical_to_physical_map = torch.nn.functional.pad(
                new_logical_to_physical_map,
                (0, logical_to_physical_map.shape[-1] - max_physical_slots),
                value=-1,
            )
            physical_to_logical_map = new_physical_to_logical_map.to(self.device)
            logical_to_physical_map.copy_(new_logical_to_physical_map)
            logical_replica_count.copy_(new_logical_replica_count)

        model.set_eplb_state(
            expert_load_pass,
            logical_to_physical_map,
            logical_replica_count,
        )
        if global_expert_load is not None:
            rearrange_expert_weights_inplace(
                old_global_expert_indices,
                new_physical_to_logical_map,
                model.expert_weights,
                ep_group,
                False,
                rank_mapping,
            )
            self.expert_rearrangement_step = 0

        self.model_states[model_config.compute_hash()] = EplbModelState(
            physical_to_logical_map,
            logical_to_physical_map,
            logical_replica_count,
            expert_load_pass,
            expert_load_window,
            model_config.model,
            model,
        )

    def step(
        self,
        is_dummy: bool = False,
        is_profile: bool = False,
        log_stats: bool = False,
    ) -> None:
        """
        Step the EPLB state.

        Args:
            is_dummy (bool): If `True`, this is a dummy step and the load
                metrics recorded in this forward pass will not count.
                Defaults to `False`.
            is_profile (bool): If `True`, perform a dummy rearrangement
                with maximum communication cost. This is used in
                `profile_run` to reserve enough memory
                for the communication buffer.
            log_stats (bool): If `True`, log the expert load metrics.

        # Stats
            The metrics are all summed up across layers.
            - `avg_tokens`: The average load across ranks.
            - `max_tokens`: The maximum load across ranks.
            - `balancedness`: The ratio of average load to maximum load.
        """

        if is_profile:
            self.rearrange(is_profile=True)
            return

        if is_dummy:
            # Do not record load metrics for dummy steps
            for eplb_model_state in self.model_states.values():
                eplb_model_state.expert_load_pass.zero_()

        if log_stats:
            # Sync the expert load pass for each model (main and drafter).
            # expert_load_pass: (num_moe_layers, num_physical_experts)
            expert_load_pass_list = self._sync_load_pass()
            ep_group = get_ep_group().device_group
            for expert_load_pass, eplb_model_state in zip(
                expert_load_pass_list, self.model_states.values()
            ):
                # num_tokens_per_rank: (num_moe_layers, num_ranks)
                num_tokens_per_rank = (
                    expert_load_pass.reshape(
                        expert_load_pass.shape[0], ep_group.size(), -1
                    )
                    .sum(dim=-1)
                    .float()
                )

                # Compute balancedness ratio:
                # for each layer:
                #   (mean load across ranks) / (max load across ranks)
                avg_tokens_tensor = num_tokens_per_rank.mean(dim=0).sum(dim=0)
                max_tokens_tensor = num_tokens_per_rank.max(dim=0).values.sum(dim=0)

                # Just to make type checker happy
                tokens_tensors: list[float] = torch.stack(
                    [avg_tokens_tensor, max_tokens_tensor]
                ).tolist()
                avg_tokens, max_tokens = tokens_tensors
                balancedness = avg_tokens / max_tokens if max_tokens > 0 else 0.0

                if ep_group.rank() == 0:
                    logger.info(
                        "EPLB step: %d for model %s: avg_tokens=%.2f, "
                        "max_tokens=%d, balancedness=%.4f",
                        self.expert_rearrangement_step,
                        eplb_model_state.model_name,
                        avg_tokens,
                        max_tokens,
                        balancedness,
                    )

        # Update the expert load sliding window
        if not is_dummy:
            for eplb_model_state in self.model_states.values():
                eplb_model_state.expert_load_window[self.expert_load_window_step] = (
                    eplb_model_state.expert_load_pass.clone()
                )
                eplb_model_state.expert_load_pass.zero_()

            self.expert_load_window_step += 1
            if self.expert_load_window_step >= self.expert_load_window_size:
                self.expert_load_window_step = 0

        # Step the expert rearrangement step
        # Note that even if this is a dummy step, we still increment the
        # rearrangement step and perform rearrangement to ensure all ranks are
        # performing collective communication.
        self.expert_rearrangement_step += 1
        if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval:
            self.expert_rearrangement_step = 0
            self.rearrange()

    def rearrange(
        self,
        is_profile: bool = False,
        execute_shuffle: bool = True,
        global_expert_loads: list[torch.Tensor] | None = None,
        rank_mapping: dict[int, int] | None = None,
    ) -> torch.Tensor | None:
        """
        Rearrange the experts according to the current load.

        Args:
            is_profile (bool): If `True`, perform a dummy rearrangement.
                This is used in `profile_run` to reserve enough memory,
                no memory movement will be performed. Default is False.
            execute_shuffle (bool): If `True`, execute the shuffle
                in elastic expert parallel (EEP). Default is True.
            global_expert_loads (list[torch.Tensor] | None): The global expert
                loads when scaling is done in EEP.
                List of expert loads for the main and drafter
                (when spec decode is used) models.
            rank_mapping (dict[int, int] | None): The rank mapping
                when scaling is done in EEP.
        """

        ep_group = get_ep_group().device_group
        ep_rank = ep_group.rank()

        time_start = None
        is_main_rank = ep_rank == 0
        if is_main_rank:
            torch.cuda.synchronize()
            time_start = time.perf_counter()
            logger.info("Rearranging experts %s...", "(profile)" if is_profile else "")

        if global_expert_loads is None:
            # Map the physical expert load to global logical experts
            global_expert_load_windows = []
            if not execute_shuffle:
                num_models = torch.tensor(
                    [len(self.model_states)], dtype=torch.int32, device="cpu"
                )
                torch.distributed.broadcast(
                    num_models, group=get_ep_group().cpu_group, group_src=0
                )

            for eplb_model_state in self.model_states.values():
                logical_expert_load_window = torch.zeros(
                    self.expert_load_window_size,
                    eplb_model_state.model.num_moe_layers,
                    eplb_model_state.model.num_logical_experts,
                    dtype=eplb_model_state.expert_load_window.dtype,
                    device=eplb_model_state.expert_load_window.device,
                )
                logical_expert_load_window.scatter_add_(
                    dim=-1,
                    index=eplb_model_state.physical_to_logical_map.unsqueeze(0)
                    .expand_as(eplb_model_state.expert_load_window)
                    .long(),
                    src=eplb_model_state.expert_load_window,
                )

                if not execute_shuffle:
                    metadata = torch.tensor(
                        [
                            eplb_model_state.model.num_moe_layers,
                            eplb_model_state.model.num_logical_experts,
                            eplb_model_state.physical_to_logical_map.shape[1],
                        ],
                        dtype=torch.int32,
                        device="cpu",
                    )
                    torch.distributed.broadcast(
                        metadata, group=get_ep_group().cpu_group, group_src=0
                    )

                global_expert_load_window = logical_expert_load_window.sum(dim=0)
                global_expert_load_windows.append(global_expert_load_window)
            # Perform all-reduce to get the expert load across all ranks for each model
            global_expert_load_windows = self._allreduce_list(
                global_expert_load_windows
            )
            if not execute_shuffle:
                for eplb_model_state, global_expert_load_window in zip(
                    self.model_states.values(), global_expert_load_windows
                ):
                    # (num_moe_layers, old_num_physical_experts)
                    old_global_expert_indices = eplb_model_state.physical_to_logical_map
                    torch.distributed.broadcast(
                        old_global_expert_indices, group=ep_group, group_src=0
                    )
            if not execute_shuffle:
                return global_expert_load_windows
        else:
            assert execute_shuffle
            global_expert_load_windows = global_expert_loads

        # TODO(bowen): Treat differently for prefill and decode nodes
        eplb_model_state = next(iter(self.model_states.values()))
        model = eplb_model_state.model
        num_replicas = model.num_physical_experts
        num_groups = model.num_expert_groups
        if rank_mapping is not None and len(rank_mapping) == ep_group.size():
            # NOTE(yongji): scale down, we need to rebalance the experts on
            # remaining GPUs, transfer the experts while we haven't shutdown
            # the GPUs to be released.
            cpu_group = get_ep_group().cpu_group
            num_nodes = _node_count_with_rank_mapping(cpu_group, rank_mapping)
            num_gpus = sum(new_rank != -1 for new_rank in rank_mapping.values())
            num_replicas = (
                num_replicas // ep_group.size() * num_gpus
            )  # handle num replicas change
        else:
            num_nodes = get_node_count()
            num_gpus = ep_group.size()

        if num_gpus % num_nodes != 0:
            self.num_nodes = 1
            logger.warning_once(
                f"num_gpus % num_nodes != 0, "
                "not using hierarchical rearrangement algorithm.\n"
                f"{num_gpus=}, {num_nodes=}"
            )

        for eplb_model_state, global_expert_load_window in zip(
            self.model_states.values(), global_expert_load_windows
        ):
            # Get new expert mappings for the model
            (
                new_physical_to_logical_map,
                new_logical_to_physical_map,
                new_logical_replica_count,
            ) = rebalance_experts(
                global_expert_load_window,
                num_replicas,
                num_groups,
                num_nodes,
                num_gpus,
            )

            # Update expert weights
            rearrange_expert_weights_inplace(
                eplb_model_state.physical_to_logical_map,
                new_physical_to_logical_map,
                eplb_model_state.model.expert_weights,
                ep_group,
                is_profile,
                rank_mapping,
            )

            if not is_profile:
                if (
                    eplb_model_state.physical_to_logical_map.shape[1]
                    != new_physical_to_logical_map.shape[1]
                ):
                    eplb_model_state.physical_to_logical_map = (
                        new_physical_to_logical_map.to(
                            eplb_model_state.physical_to_logical_map.device
                        )
                    )
                else:
                    eplb_model_state.physical_to_logical_map.copy_(
                        new_physical_to_logical_map
                    )
                max_physical_slots = new_logical_to_physical_map.shape[-1]
                assert (
                    max_physical_slots
                    <= eplb_model_state.logical_to_physical_map.shape[-1]
                )
                new_logical_to_physical_map = torch.nn.functional.pad(
                    new_logical_to_physical_map,
                    (
                        0,
                        eplb_model_state.logical_to_physical_map.shape[-1]
                        - max_physical_slots,
                    ),
                    value=-1,
                )
                eplb_model_state.logical_to_physical_map.copy_(
                    new_logical_to_physical_map
                )
                eplb_model_state.logical_replica_count.copy_(new_logical_replica_count)

        if is_main_rank:
            assert time_start is not None
            torch.cuda.synchronize()
            time_end = time.perf_counter()
            logger.info(
                "Rearranged experts%sin %.2f seconds.",
                " (profile) " if is_profile else " ",
                time_end - time_start,
            )
        return None

    @staticmethod
    def recv_state() -> tuple[list[torch.Tensor], list[torch.Tensor]]:
        """
        Receive the expert load and old placement from the master rank.
        """
        ep_group = get_ep_group()
        num_models = torch.empty(1, dtype=torch.int32, device="cpu")
        torch.distributed.broadcast(num_models, group=ep_group.cpu_group, group_src=0)
        num_models = num_models.item()
        global_expert_loads = []
        old_global_expert_indices_per_model = []
        for _ in range(num_models):
            metadata = torch.empty(3, dtype=torch.int32, device="cpu")
            torch.distributed.broadcast(metadata, group=ep_group.cpu_group, group_src=0)
            num_moe_layers, num_logical_experts, num_old_physical_experts = (
                metadata.tolist()
            )
            global_expert_load = torch.zeros(
                (num_moe_layers, num_logical_experts),
                dtype=torch.int64,
                device=ep_group.device,
            )
            all_reduce(global_expert_load, group=ep_group.device_group)
            old_global_expert_indices = torch.empty(
                (num_moe_layers, num_old_physical_experts),
                dtype=torch.int64,
                device=ep_group.device,
            )
            torch.distributed.broadcast(
                old_global_expert_indices,
                group=ep_group.device_group,
                group_src=0,
            )
            global_expert_loads.append(global_expert_load)
            old_global_expert_indices_per_model.append(old_global_expert_indices)
        return global_expert_loads, old_global_expert_indices_per_model

    @classmethod
    def get_eep_state(
        cls, parallel_config: ParallelConfig
    ) -> tuple[
        list[torch.Tensor] | None,
        list[torch.Tensor] | None,
        dict[int, int] | None,
    ]:
        num_local_physical_experts = torch.empty(1, dtype=torch.int32, device="cpu")
        torch.distributed.broadcast(
            num_local_physical_experts,
            group=get_ep_group().cpu_group,
            group_src=0,
        )
        num_local_physical_experts = int(num_local_physical_experts.item())
        new_ep_size = get_ep_group().world_size
        global_expert_loads, old_global_expert_indices_per_model = (
            EplbState.recv_state()
        )

        # EP configuration for all models has to be the same so as eplb config
        num_logical_experts = global_expert_loads[0].shape[1]
        parallel_config.eplb_config.num_redundant_experts = (
            num_local_physical_experts * new_ep_size - num_logical_experts
        )
        assert (
            old_global_expert_indices_per_model[0].shape[1] % num_local_physical_experts
            == 0
        )
        old_ep_size = (
            old_global_expert_indices_per_model[0].shape[1]
            // num_local_physical_experts
        )
        rank_mapping = {old_ep_rank: old_ep_rank for old_ep_rank in range(old_ep_size)}
        return (
            global_expert_loads,
            old_global_expert_indices_per_model,
            rank_mapping,
        )

    def _allreduce_list(self, tensor_list: list[torch.Tensor]) -> list[torch.Tensor]:
        """
        All-reduce a list of tensors.
        """
        if len(tensor_list) == 1:
            all_reduce(tensor_list[0], group=get_ep_group().device_group)
            return tensor_list
        assert all(t.dim() == 2 for t in tensor_list), "All tensors must be 2D."
        assert all(t.shape[1] == tensor_list[0].shape[1] for t in tensor_list), (
            "All tensors must have the same shape[1]."
        )
        # Concatenate, all_reduce, then unpack to original shapes.
        # We assume all tensors are 2D and shape[1] (num_physical_experts)
        # is the same across all models.
        shapes = [t.shape for t in tensor_list]
        concat_tensor = torch.cat(tensor_list, dim=0)

        ep_group = get_ep_group().device_group
        all_reduce(concat_tensor, group=ep_group)

        all_reduce_list = []
        offset = 0
        for shape in shapes:
            all_reduce_list.append(concat_tensor[offset : offset + shape[0], :])
            offset += shape[0]
        return all_reduce_list

    def _sync_load_pass(self) -> list[torch.Tensor]:
        """
        Sync the expert load pass across all ranks for log stats.
        Doesn't update the expert load pass in eplb_model_state.
        """
        load_pass_list = []
        for eplb_model_state in self.model_states.values():
            load_pass_list.append(eplb_model_state.expert_load_pass.clone())
        return self._allreduce_list(load_pass_list)

device instance-attribute

device = device

expert_load_window_size instance-attribute

expert_load_window_size: int = 0

Steps after last rearrangement. Will trigger a rearrangement if it exceeds the threshold.

NOTE: Keep in mind that all EP ranks need to have the same expert_rearrangement_step value to ensure synchronization. Otherwise, the rearrangement will hang at collective communication calls.

expert_load_window_step instance-attribute

expert_load_window_step: int = 0

Size of the expert load sliding window. This is a constant and is taken from the config.

expert_rearrangement_step instance-attribute

expert_rearrangement_step: int = 0

Interval for expert rearrangement steps. This is a constant and is taken from the config.

expert_rearrangement_step_interval instance-attribute

expert_rearrangement_step_interval: int = 0

model_states instance-attribute

model_states: dict[str, EplbModelState] = {}

Current step in the sliding window.

Different from expert_rearrangement_step, each EP rank may have its own expert_load_window_step.

parallel_config instance-attribute

parallel_config = parallel_config

__init__

__init__(parallel_config: ParallelConfig, device: device)
Source code in vllm/distributed/eplb/eplb_state.py
def __init__(self, parallel_config: ParallelConfig, device: torch.device):
    self.parallel_config = parallel_config
    self.device = device
    self.model_states: dict[str, EplbModelState] = {}
    """
    Current step in the sliding window.

    Different from `expert_rearrangement_step`, 
    each EP rank may have its own `expert_load_window_step`.
    """
    self.expert_load_window_step: int = 0
    """
    Size of the expert load sliding window.
    This is a constant and is taken from the config.
    """
    self.expert_load_window_size: int = 0
    """
    Steps after last rearrangement.
    Will trigger a rearrangement if it exceeds the threshold.

    NOTE: Keep in mind that all EP ranks need to have the same
    `expert_rearrangement_step` value to ensure synchronization.
    Otherwise, the rearrangement will hang at collective
    communication calls.
    """
    self.expert_rearrangement_step: int = 0
    """
    Interval for expert rearrangement steps.
    This is a constant and is taken from the config.
    """
    self.expert_rearrangement_step_interval: int = 0

_allreduce_list

_allreduce_list(tensor_list: list[Tensor]) -> list[Tensor]

All-reduce a list of tensors.

Source code in vllm/distributed/eplb/eplb_state.py
def _allreduce_list(self, tensor_list: list[torch.Tensor]) -> list[torch.Tensor]:
    """
    All-reduce a list of tensors.
    """
    if len(tensor_list) == 1:
        all_reduce(tensor_list[0], group=get_ep_group().device_group)
        return tensor_list
    assert all(t.dim() == 2 for t in tensor_list), "All tensors must be 2D."
    assert all(t.shape[1] == tensor_list[0].shape[1] for t in tensor_list), (
        "All tensors must have the same shape[1]."
    )
    # Concatenate, all_reduce, then unpack to original shapes.
    # We assume all tensors are 2D and shape[1] (num_physical_experts)
    # is the same across all models.
    shapes = [t.shape for t in tensor_list]
    concat_tensor = torch.cat(tensor_list, dim=0)

    ep_group = get_ep_group().device_group
    all_reduce(concat_tensor, group=ep_group)

    all_reduce_list = []
    offset = 0
    for shape in shapes:
        all_reduce_list.append(concat_tensor[offset : offset + shape[0], :])
        offset += shape[0]
    return all_reduce_list

_sync_load_pass

_sync_load_pass() -> list[Tensor]

Sync the expert load pass across all ranks for log stats. Doesn't update the expert load pass in eplb_model_state.

Source code in vllm/distributed/eplb/eplb_state.py
def _sync_load_pass(self) -> list[torch.Tensor]:
    """
    Sync the expert load pass across all ranks for log stats.
    Doesn't update the expert load pass in eplb_model_state.
    """
    load_pass_list = []
    for eplb_model_state in self.model_states.values():
        load_pass_list.append(eplb_model_state.expert_load_pass.clone())
    return self._allreduce_list(load_pass_list)

add_model

add_model(
    model: MixtureOfExperts,
    model_config: ModelConfig,
    global_expert_load: Tensor | None = None,
    old_global_expert_indices: Tensor | None = None,
    rank_mapping: dict[int, int] | None = None,
)

Build the initial EPLB state.

Source code in vllm/distributed/eplb/eplb_state.py
def add_model(
    self,
    model: MixtureOfExperts,
    model_config: ModelConfig,
    global_expert_load: torch.Tensor | None = None,
    old_global_expert_indices: torch.Tensor | None = None,
    rank_mapping: dict[int, int] | None = None,
):
    """
    Build the initial EPLB state.
    """
    self.validate_ep_configuration(model)
    physical_to_logical_map_list = (
        EplbState.build_initial_global_physical_to_logical_map(
            model.num_routed_experts,
            model.num_redundant_experts,
        )
    )
    physical_to_logical_map = torch.tensor(
        physical_to_logical_map_list,
        device=self.device,
    )
    # Assuming 8 GPUs per node, this supports up to
    # (1023 + 1) / 8 = 128 nodes for now.
    # TODO(rui): make this configurable
    MAX_EXPERT_REDUNDANCY = 1023
    assert model.num_redundant_experts <= MAX_EXPERT_REDUNDANCY, (
        f"num_redundant_experts {model.num_redundant_experts} "
        f"must be less than or equal to {MAX_EXPERT_REDUNDANCY}"
    )
    max_slots_per_logical_expert = MAX_EXPERT_REDUNDANCY + 1
    logical_to_physical_map = torch.full(
        (model.num_logical_experts, max_slots_per_logical_expert),
        -1,
        device=self.device,
    )
    logical_replica_count = torch.zeros(
        (model.num_logical_experts,),
        device=self.device,
        dtype=torch.long,
    )

    for i in range(model.num_physical_experts):
        logical_idx = physical_to_logical_map[i]
        logical_to_physical_map[logical_idx, logical_replica_count[logical_idx]] = i
        logical_replica_count[logical_idx] += 1

    # Duplicate initial mapping for all layers
    physical_to_logical_map = (
        physical_to_logical_map.unsqueeze(0)
        .expand(
            model.num_moe_layers,
            -1,
        )
        .contiguous()
    )
    logical_to_physical_map = (
        logical_to_physical_map.unsqueeze(0)
        .expand(
            model.num_moe_layers,
            -1,
            -1,
        )
        .contiguous()
    )
    logical_replica_count = (
        logical_replica_count.unsqueeze(0)
        .expand(
            model.num_moe_layers,
            -1,
        )
        .contiguous()
    )

    expert_load_pass = torch.zeros(
        (model.num_moe_layers, model.num_physical_experts),
        dtype=torch.int32,
        device=self.device,
    )
    self.expert_load_window_size = self.parallel_config.eplb_config.window_size
    expert_load_window = torch.zeros(
        (
            self.expert_load_window_size,
            model.num_moe_layers,
            model.num_physical_experts,
        ),
        dtype=torch.int32,
        device=self.device,
    )

    # Set the initial progress of rearrangement to 3/4
    eplb_step_interval = self.parallel_config.eplb_config.step_interval
    self.expert_rearrangement_step = max(
        0, eplb_step_interval - eplb_step_interval // 4
    )
    self.expert_rearrangement_step_interval = eplb_step_interval

    if global_expert_load is not None:
        ep_group = get_ep_group().device_group
        assert global_expert_load.shape == (
            model.num_moe_layers,
            model.num_logical_experts,
        )
        assert global_expert_load.dtype == torch.int64

        num_replicas = model.num_physical_experts
        num_groups = model.num_expert_groups
        num_nodes = get_node_count()
        num_gpus = ep_group.size()

        if num_gpus % num_nodes != 0:
            num_nodes = 1
            logger.warning_once(
                f"num_gpus % num_nodes != 0, "
                "not using hierarchical rearrangement algorithm.\n"
                f"{num_gpus=}, {num_nodes=}"
            )

        # Get new expert mappings
        (
            new_physical_to_logical_map,
            new_logical_to_physical_map,
            new_logical_replica_count,
        ) = rebalance_experts(
            global_expert_load,
            num_replicas,
            num_groups,
            num_nodes,
            num_gpus,
        )

        max_physical_slots = new_logical_to_physical_map.shape[-1]
        assert max_physical_slots <= logical_to_physical_map.shape[-1]
        new_logical_to_physical_map = torch.nn.functional.pad(
            new_logical_to_physical_map,
            (0, logical_to_physical_map.shape[-1] - max_physical_slots),
            value=-1,
        )
        physical_to_logical_map = new_physical_to_logical_map.to(self.device)
        logical_to_physical_map.copy_(new_logical_to_physical_map)
        logical_replica_count.copy_(new_logical_replica_count)

    model.set_eplb_state(
        expert_load_pass,
        logical_to_physical_map,
        logical_replica_count,
    )
    if global_expert_load is not None:
        rearrange_expert_weights_inplace(
            old_global_expert_indices,
            new_physical_to_logical_map,
            model.expert_weights,
            ep_group,
            False,
            rank_mapping,
        )
        self.expert_rearrangement_step = 0

    self.model_states[model_config.compute_hash()] = EplbModelState(
        physical_to_logical_map,
        logical_to_physical_map,
        logical_replica_count,
        expert_load_pass,
        expert_load_window,
        model_config.model,
        model,
    )

build_initial_global_physical_to_logical_map staticmethod

build_initial_global_physical_to_logical_map(
    num_routed_experts: int, num_redundant_experts: int
) -> Sequence[int]

Build an initial expert arrangement using the following structure: [original routed experts, redundant experts]

Returns:

Name Type Description
physical_to_logical_map Sequence[int]

A list of integers, where each integer is the index of the logical expert that the corresponding physical expert maps to.

Source code in vllm/distributed/eplb/eplb_state.py
@staticmethod
def build_initial_global_physical_to_logical_map(
    num_routed_experts: int,
    num_redundant_experts: int,
) -> Sequence[int]:
    """
    Build an initial expert arrangement using the following structure:
    [original routed experts, redundant experts]

    Returns:
        physical_to_logical_map (Sequence[int]): A list of integers,
            where each integer is the index of the logical expert
            that the corresponding physical expert maps to.
    """
    global_physical_to_logical_map = list(range(num_routed_experts))
    global_physical_to_logical_map += [
        i % num_routed_experts for i in range(num_redundant_experts)
    ]
    return global_physical_to_logical_map

get_eep_state classmethod

get_eep_state(
    parallel_config: ParallelConfig,
) -> tuple[
    list[Tensor] | None,
    list[Tensor] | None,
    dict[int, int] | None,
]
Source code in vllm/distributed/eplb/eplb_state.py
@classmethod
def get_eep_state(
    cls, parallel_config: ParallelConfig
) -> tuple[
    list[torch.Tensor] | None,
    list[torch.Tensor] | None,
    dict[int, int] | None,
]:
    num_local_physical_experts = torch.empty(1, dtype=torch.int32, device="cpu")
    torch.distributed.broadcast(
        num_local_physical_experts,
        group=get_ep_group().cpu_group,
        group_src=0,
    )
    num_local_physical_experts = int(num_local_physical_experts.item())
    new_ep_size = get_ep_group().world_size
    global_expert_loads, old_global_expert_indices_per_model = (
        EplbState.recv_state()
    )

    # EP configuration for all models has to be the same so as eplb config
    num_logical_experts = global_expert_loads[0].shape[1]
    parallel_config.eplb_config.num_redundant_experts = (
        num_local_physical_experts * new_ep_size - num_logical_experts
    )
    assert (
        old_global_expert_indices_per_model[0].shape[1] % num_local_physical_experts
        == 0
    )
    old_ep_size = (
        old_global_expert_indices_per_model[0].shape[1]
        // num_local_physical_experts
    )
    rank_mapping = {old_ep_rank: old_ep_rank for old_ep_rank in range(old_ep_size)}
    return (
        global_expert_loads,
        old_global_expert_indices_per_model,
        rank_mapping,
    )

rearrange

rearrange(
    is_profile: bool = False,
    execute_shuffle: bool = True,
    global_expert_loads: list[Tensor] | None = None,
    rank_mapping: dict[int, int] | None = None,
) -> Tensor | None

Rearrange the experts according to the current load.

Parameters:

Name Type Description Default
is_profile bool

If True, perform a dummy rearrangement. This is used in profile_run to reserve enough memory, no memory movement will be performed. Default is False.

False
execute_shuffle bool

If True, execute the shuffle in elastic expert parallel (EEP). Default is True.

True
global_expert_loads list[Tensor] | None

The global expert loads when scaling is done in EEP. List of expert loads for the main and drafter (when spec decode is used) models.

None
rank_mapping dict[int, int] | None

The rank mapping when scaling is done in EEP.

None
Source code in vllm/distributed/eplb/eplb_state.py
def rearrange(
    self,
    is_profile: bool = False,
    execute_shuffle: bool = True,
    global_expert_loads: list[torch.Tensor] | None = None,
    rank_mapping: dict[int, int] | None = None,
) -> torch.Tensor | None:
    """
    Rearrange the experts according to the current load.

    Args:
        is_profile (bool): If `True`, perform a dummy rearrangement.
            This is used in `profile_run` to reserve enough memory,
            no memory movement will be performed. Default is False.
        execute_shuffle (bool): If `True`, execute the shuffle
            in elastic expert parallel (EEP). Default is True.
        global_expert_loads (list[torch.Tensor] | None): The global expert
            loads when scaling is done in EEP.
            List of expert loads for the main and drafter
            (when spec decode is used) models.
        rank_mapping (dict[int, int] | None): The rank mapping
            when scaling is done in EEP.
    """

    ep_group = get_ep_group().device_group
    ep_rank = ep_group.rank()

    time_start = None
    is_main_rank = ep_rank == 0
    if is_main_rank:
        torch.cuda.synchronize()
        time_start = time.perf_counter()
        logger.info("Rearranging experts %s...", "(profile)" if is_profile else "")

    if global_expert_loads is None:
        # Map the physical expert load to global logical experts
        global_expert_load_windows = []
        if not execute_shuffle:
            num_models = torch.tensor(
                [len(self.model_states)], dtype=torch.int32, device="cpu"
            )
            torch.distributed.broadcast(
                num_models, group=get_ep_group().cpu_group, group_src=0
            )

        for eplb_model_state in self.model_states.values():
            logical_expert_load_window = torch.zeros(
                self.expert_load_window_size,
                eplb_model_state.model.num_moe_layers,
                eplb_model_state.model.num_logical_experts,
                dtype=eplb_model_state.expert_load_window.dtype,
                device=eplb_model_state.expert_load_window.device,
            )
            logical_expert_load_window.scatter_add_(
                dim=-1,
                index=eplb_model_state.physical_to_logical_map.unsqueeze(0)
                .expand_as(eplb_model_state.expert_load_window)
                .long(),
                src=eplb_model_state.expert_load_window,
            )

            if not execute_shuffle:
                metadata = torch.tensor(
                    [
                        eplb_model_state.model.num_moe_layers,
                        eplb_model_state.model.num_logical_experts,
                        eplb_model_state.physical_to_logical_map.shape[1],
                    ],
                    dtype=torch.int32,
                    device="cpu",
                )
                torch.distributed.broadcast(
                    metadata, group=get_ep_group().cpu_group, group_src=0
                )

            global_expert_load_window = logical_expert_load_window.sum(dim=0)
            global_expert_load_windows.append(global_expert_load_window)
        # Perform all-reduce to get the expert load across all ranks for each model
        global_expert_load_windows = self._allreduce_list(
            global_expert_load_windows
        )
        if not execute_shuffle:
            for eplb_model_state, global_expert_load_window in zip(
                self.model_states.values(), global_expert_load_windows
            ):
                # (num_moe_layers, old_num_physical_experts)
                old_global_expert_indices = eplb_model_state.physical_to_logical_map
                torch.distributed.broadcast(
                    old_global_expert_indices, group=ep_group, group_src=0
                )
        if not execute_shuffle:
            return global_expert_load_windows
    else:
        assert execute_shuffle
        global_expert_load_windows = global_expert_loads

    # TODO(bowen): Treat differently for prefill and decode nodes
    eplb_model_state = next(iter(self.model_states.values()))
    model = eplb_model_state.model
    num_replicas = model.num_physical_experts
    num_groups = model.num_expert_groups
    if rank_mapping is not None and len(rank_mapping) == ep_group.size():
        # NOTE(yongji): scale down, we need to rebalance the experts on
        # remaining GPUs, transfer the experts while we haven't shutdown
        # the GPUs to be released.
        cpu_group = get_ep_group().cpu_group
        num_nodes = _node_count_with_rank_mapping(cpu_group, rank_mapping)
        num_gpus = sum(new_rank != -1 for new_rank in rank_mapping.values())
        num_replicas = (
            num_replicas // ep_group.size() * num_gpus
        )  # handle num replicas change
    else:
        num_nodes = get_node_count()
        num_gpus = ep_group.size()

    if num_gpus % num_nodes != 0:
        self.num_nodes = 1
        logger.warning_once(
            f"num_gpus % num_nodes != 0, "
            "not using hierarchical rearrangement algorithm.\n"
            f"{num_gpus=}, {num_nodes=}"
        )

    for eplb_model_state, global_expert_load_window in zip(
        self.model_states.values(), global_expert_load_windows
    ):
        # Get new expert mappings for the model
        (
            new_physical_to_logical_map,
            new_logical_to_physical_map,
            new_logical_replica_count,
        ) = rebalance_experts(
            global_expert_load_window,
            num_replicas,
            num_groups,
            num_nodes,
            num_gpus,
        )

        # Update expert weights
        rearrange_expert_weights_inplace(
            eplb_model_state.physical_to_logical_map,
            new_physical_to_logical_map,
            eplb_model_state.model.expert_weights,
            ep_group,
            is_profile,
            rank_mapping,
        )

        if not is_profile:
            if (
                eplb_model_state.physical_to_logical_map.shape[1]
                != new_physical_to_logical_map.shape[1]
            ):
                eplb_model_state.physical_to_logical_map = (
                    new_physical_to_logical_map.to(
                        eplb_model_state.physical_to_logical_map.device
                    )
                )
            else:
                eplb_model_state.physical_to_logical_map.copy_(
                    new_physical_to_logical_map
                )
            max_physical_slots = new_logical_to_physical_map.shape[-1]
            assert (
                max_physical_slots
                <= eplb_model_state.logical_to_physical_map.shape[-1]
            )
            new_logical_to_physical_map = torch.nn.functional.pad(
                new_logical_to_physical_map,
                (
                    0,
                    eplb_model_state.logical_to_physical_map.shape[-1]
                    - max_physical_slots,
                ),
                value=-1,
            )
            eplb_model_state.logical_to_physical_map.copy_(
                new_logical_to_physical_map
            )
            eplb_model_state.logical_replica_count.copy_(new_logical_replica_count)

    if is_main_rank:
        assert time_start is not None
        torch.cuda.synchronize()
        time_end = time.perf_counter()
        logger.info(
            "Rearranged experts%sin %.2f seconds.",
            " (profile) " if is_profile else " ",
            time_end - time_start,
        )
    return None

recv_state staticmethod

recv_state() -> tuple[list[Tensor], list[Tensor]]

Receive the expert load and old placement from the master rank.

Source code in vllm/distributed/eplb/eplb_state.py
@staticmethod
def recv_state() -> tuple[list[torch.Tensor], list[torch.Tensor]]:
    """
    Receive the expert load and old placement from the master rank.
    """
    ep_group = get_ep_group()
    num_models = torch.empty(1, dtype=torch.int32, device="cpu")
    torch.distributed.broadcast(num_models, group=ep_group.cpu_group, group_src=0)
    num_models = num_models.item()
    global_expert_loads = []
    old_global_expert_indices_per_model = []
    for _ in range(num_models):
        metadata = torch.empty(3, dtype=torch.int32, device="cpu")
        torch.distributed.broadcast(metadata, group=ep_group.cpu_group, group_src=0)
        num_moe_layers, num_logical_experts, num_old_physical_experts = (
            metadata.tolist()
        )
        global_expert_load = torch.zeros(
            (num_moe_layers, num_logical_experts),
            dtype=torch.int64,
            device=ep_group.device,
        )
        all_reduce(global_expert_load, group=ep_group.device_group)
        old_global_expert_indices = torch.empty(
            (num_moe_layers, num_old_physical_experts),
            dtype=torch.int64,
            device=ep_group.device,
        )
        torch.distributed.broadcast(
            old_global_expert_indices,
            group=ep_group.device_group,
            group_src=0,
        )
        global_expert_loads.append(global_expert_load)
        old_global_expert_indices_per_model.append(old_global_expert_indices)
    return global_expert_loads, old_global_expert_indices_per_model

step

step(
    is_dummy: bool = False,
    is_profile: bool = False,
    log_stats: bool = False,
) -> None

Step the EPLB state.

Parameters:

Name Type Description Default
is_dummy bool

If True, this is a dummy step and the load metrics recorded in this forward pass will not count. Defaults to False.

False
is_profile bool

If True, perform a dummy rearrangement with maximum communication cost. This is used in profile_run to reserve enough memory for the communication buffer.

False
log_stats bool

If True, log the expert load metrics.

False

Stats

The metrics are all summed up across layers.
- `avg_tokens`: The average load across ranks.
- `max_tokens`: The maximum load across ranks.
- `balancedness`: The ratio of average load to maximum load.
Source code in vllm/distributed/eplb/eplb_state.py
def step(
    self,
    is_dummy: bool = False,
    is_profile: bool = False,
    log_stats: bool = False,
) -> None:
    """
    Step the EPLB state.

    Args:
        is_dummy (bool): If `True`, this is a dummy step and the load
            metrics recorded in this forward pass will not count.
            Defaults to `False`.
        is_profile (bool): If `True`, perform a dummy rearrangement
            with maximum communication cost. This is used in
            `profile_run` to reserve enough memory
            for the communication buffer.
        log_stats (bool): If `True`, log the expert load metrics.

    # Stats
        The metrics are all summed up across layers.
        - `avg_tokens`: The average load across ranks.
        - `max_tokens`: The maximum load across ranks.
        - `balancedness`: The ratio of average load to maximum load.
    """

    if is_profile:
        self.rearrange(is_profile=True)
        return

    if is_dummy:
        # Do not record load metrics for dummy steps
        for eplb_model_state in self.model_states.values():
            eplb_model_state.expert_load_pass.zero_()

    if log_stats:
        # Sync the expert load pass for each model (main and drafter).
        # expert_load_pass: (num_moe_layers, num_physical_experts)
        expert_load_pass_list = self._sync_load_pass()
        ep_group = get_ep_group().device_group
        for expert_load_pass, eplb_model_state in zip(
            expert_load_pass_list, self.model_states.values()
        ):
            # num_tokens_per_rank: (num_moe_layers, num_ranks)
            num_tokens_per_rank = (
                expert_load_pass.reshape(
                    expert_load_pass.shape[0], ep_group.size(), -1
                )
                .sum(dim=-1)
                .float()
            )

            # Compute balancedness ratio:
            # for each layer:
            #   (mean load across ranks) / (max load across ranks)
            avg_tokens_tensor = num_tokens_per_rank.mean(dim=0).sum(dim=0)
            max_tokens_tensor = num_tokens_per_rank.max(dim=0).values.sum(dim=0)

            # Just to make type checker happy
            tokens_tensors: list[float] = torch.stack(
                [avg_tokens_tensor, max_tokens_tensor]
            ).tolist()
            avg_tokens, max_tokens = tokens_tensors
            balancedness = avg_tokens / max_tokens if max_tokens > 0 else 0.0

            if ep_group.rank() == 0:
                logger.info(
                    "EPLB step: %d for model %s: avg_tokens=%.2f, "
                    "max_tokens=%d, balancedness=%.4f",
                    self.expert_rearrangement_step,
                    eplb_model_state.model_name,
                    avg_tokens,
                    max_tokens,
                    balancedness,
                )

    # Update the expert load sliding window
    if not is_dummy:
        for eplb_model_state in self.model_states.values():
            eplb_model_state.expert_load_window[self.expert_load_window_step] = (
                eplb_model_state.expert_load_pass.clone()
            )
            eplb_model_state.expert_load_pass.zero_()

        self.expert_load_window_step += 1
        if self.expert_load_window_step >= self.expert_load_window_size:
            self.expert_load_window_step = 0

    # Step the expert rearrangement step
    # Note that even if this is a dummy step, we still increment the
    # rearrangement step and perform rearrangement to ensure all ranks are
    # performing collective communication.
    self.expert_rearrangement_step += 1
    if self.expert_rearrangement_step >= self.expert_rearrangement_step_interval:
        self.expert_rearrangement_step = 0
        self.rearrange()

validate_ep_configuration

validate_ep_configuration(new_model: MixtureOfExperts)

Validate that the expert parallel configuration of the new model is the same as the existing models.

Source code in vllm/distributed/eplb/eplb_state.py
def validate_ep_configuration(self, new_model: MixtureOfExperts):
    """
    Validate that the expert parallel configuration of
    the new model is the same as the existing models.
    """
    if len(self.model_states) > 0:
        model = next(iter(self.model_states.values())).model
        if (
            model.num_routed_experts != new_model.num_routed_experts
            or model.num_redundant_experts != new_model.num_redundant_experts
            or model.num_physical_experts != new_model.num_physical_experts
            or model.num_logical_experts != new_model.num_logical_experts
            or model.num_expert_groups != new_model.num_expert_groups
        ):
            raise RuntimeError(
                "Model: {} "
                "with config {} "
                "{} {} {} {} "
                "mismatch with new model {} "
                "with config {} "
                "{} {} {} {}".format(
                    type(model),
                    model.num_routed_experts,
                    model.num_redundant_experts,
                    model.num_physical_experts,
                    model.num_logical_experts,
                    model.num_expert_groups,
                    type(new_model),
                    new_model.num_routed_experts,
                    new_model.num_redundant_experts,
                    new_model.num_physical_experts,
                    new_model.num_logical_experts,
                    new_model.num_expert_groups,
                )
            )

MixtureOfExperts

Bases: Protocol

Check if the model is a mixture of experts (MoE) model.

Source code in vllm/model_executor/models/interfaces.py
@runtime_checkable
class MixtureOfExperts(Protocol):
    """
    Check if the model is a mixture of experts (MoE) model.
    """

    expert_weights: MutableSequence[Iterable[Tensor]]
    """
    Expert weights saved in this rank.

    The first dimension is the layer, and the second dimension is different
    parameters in the layer, e.g. up/down projection weights.
    """

    num_moe_layers: int
    """Number of MoE layers in this model."""

    num_expert_groups: int
    """Number of expert groups in this model."""

    num_logical_experts: int
    """Number of logical experts in this model."""

    num_physical_experts: int
    """Number of physical experts in this model."""

    num_local_physical_experts: int
    """Number of local physical experts in this model."""

    num_routed_experts: int
    """Number of routed experts in this model."""

    num_shared_experts: int
    """Number of shared experts in this model."""

    num_redundant_experts: int
    """Number of redundant experts in this model."""

    moe_layers: Iterable[nn.Module]
    """List of MoE layers in this model."""

    def set_eplb_state(
        self,
        expert_load_view: Tensor,
        logical_to_physical_map: Tensor,
        logical_replica_count: Tensor,
    ) -> None:
        """
        Register the EPLB state in the MoE model.

        Since these are views of the actual EPLB state, any changes made by
        the EPLB algorithm are automatically reflected in the model's behavior
        without requiring additional method calls to set new states.

        You should also collect model's `expert_weights` here instead of in
        the weight loader, since after initial weight loading, further
        processing like quantization may be applied to the weights.

        Args:
            expert_load_view: A view of the expert load metrics tensor.
            logical_to_physical_map: Mapping from logical to physical experts.
            logical_replica_count: Count of replicas for each logical expert.
        """
        for layer_idx, layer in enumerate(self.moe_layers):
            # Register the expert weights.
            self.expert_weights.append(layer.get_expert_weights())
            layer.set_eplb_state(
                moe_layer_idx=layer_idx,
                expert_load_view=expert_load_view,
                logical_to_physical_map=logical_to_physical_map,
                logical_replica_count=logical_replica_count,
            )

    def update_physical_experts_metadata(
        self,
        num_physical_experts: int,
        num_local_physical_experts: int,
    ) -> None: ...

expert_weights instance-attribute

expert_weights: MutableSequence[Iterable[Tensor]]

Expert weights saved in this rank.

The first dimension is the layer, and the second dimension is different parameters in the layer, e.g. up/down projection weights.

moe_layers instance-attribute

moe_layers: Iterable[Module]

List of MoE layers in this model.

num_expert_groups instance-attribute

num_expert_groups: int

Number of expert groups in this model.

num_local_physical_experts instance-attribute

num_local_physical_experts: int

Number of local physical experts in this model.

num_logical_experts instance-attribute

num_logical_experts: int

Number of logical experts in this model.

num_moe_layers instance-attribute

num_moe_layers: int

Number of MoE layers in this model.

num_physical_experts instance-attribute

num_physical_experts: int

Number of physical experts in this model.

num_redundant_experts instance-attribute

num_redundant_experts: int

Number of redundant experts in this model.

num_routed_experts instance-attribute

num_routed_experts: int

Number of routed experts in this model.

num_shared_experts instance-attribute

num_shared_experts: int

Number of shared experts in this model.

set_eplb_state

set_eplb_state(
    expert_load_view: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
) -> None

Register the EPLB state in the MoE model.

Since these are views of the actual EPLB state, any changes made by the EPLB algorithm are automatically reflected in the model's behavior without requiring additional method calls to set new states.

You should also collect model's expert_weights here instead of in the weight loader, since after initial weight loading, further processing like quantization may be applied to the weights.

Parameters:

Name Type Description Default
expert_load_view Tensor

A view of the expert load metrics tensor.

required
logical_to_physical_map Tensor

Mapping from logical to physical experts.

required
logical_replica_count Tensor

Count of replicas for each logical expert.

required
Source code in vllm/model_executor/models/interfaces.py
def set_eplb_state(
    self,
    expert_load_view: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
) -> None:
    """
    Register the EPLB state in the MoE model.

    Since these are views of the actual EPLB state, any changes made by
    the EPLB algorithm are automatically reflected in the model's behavior
    without requiring additional method calls to set new states.

    You should also collect model's `expert_weights` here instead of in
    the weight loader, since after initial weight loading, further
    processing like quantization may be applied to the weights.

    Args:
        expert_load_view: A view of the expert load metrics tensor.
        logical_to_physical_map: Mapping from logical to physical experts.
        logical_replica_count: Count of replicas for each logical expert.
    """
    for layer_idx, layer in enumerate(self.moe_layers):
        # Register the expert weights.
        self.expert_weights.append(layer.get_expert_weights())
        layer.set_eplb_state(
            moe_layer_idx=layer_idx,
            expert_load_view=expert_load_view,
            logical_to_physical_map=logical_to_physical_map,
            logical_replica_count=logical_replica_count,
        )

update_physical_experts_metadata

update_physical_experts_metadata(
    num_physical_experts: int,
    num_local_physical_experts: int,
) -> None
Source code in vllm/model_executor/models/interfaces.py
def update_physical_experts_metadata(
    self,
    num_physical_experts: int,
    num_local_physical_experts: int,
) -> None: ...

ModelConfig

Configuration for the model.

Source code in vllm/config/model.py
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
@config
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class ModelConfig:
    """Configuration for the model."""

    model: str = "Qwen/Qwen3-0.6B"
    """Name or path of the Hugging Face model to use. It is also used as the
    content for `model_name` tag in metrics output when `served_model_name` is
    not specified."""
    runner: RunnerOption = "auto"
    """The type of model runner to use. Each vLLM instance only supports one
    model runner, even if the same model can be used for multiple types."""
    convert: ConvertOption = "auto"
    """Convert the model using adapters defined in
    [vllm.model_executor.models.adapters][]. The most common use case is to
    adapt a text generation model to be used for pooling tasks."""
    task: TaskOption | None = None
    """[DEPRECATED] The task to use the model for. If the model supports more
    than one model runner, this is used to select which model runner to run.

    Note that the model may support other tasks using the same model runner.
    """
    tokenizer: SkipValidation[str] = None  # type: ignore
    """Name or path of the Hugging Face tokenizer to use. If unspecified, model
    name or path will be used."""
    tokenizer_mode: TokenizerMode = "auto"
    """Tokenizer mode:\n
    - "auto" will use the fast tokenizer if available.\n
    - "slow" will always use the slow tokenizer.\n
    - "mistral" will always use the tokenizer from `mistral_common`.\n
    - "custom" will use --tokenizer to select the preregistered tokenizer."""
    trust_remote_code: bool = False
    """Trust remote code (e.g., from HuggingFace) when downloading the model
    and tokenizer."""
    dtype: ModelDType | torch.dtype = "auto"
    """Data type for model weights and activations:\n
    - "auto" will use FP16 precision for FP32 and FP16 models, and BF16
    precision for BF16 models.\n
    - "half" for FP16. Recommended for AWQ quantization.\n
    - "float16" is the same as "half".\n
    - "bfloat16" for a balance between precision and range.\n
    - "float" is shorthand for FP32 precision.\n
    - "float32" for FP32 precision."""
    seed: int | None = None
    """Random seed for reproducibility. Initialized to None in V0, but
    initialized to 0 in V1."""
    hf_config: PretrainedConfig = field(init=False)
    """The Hugging Face config of the model."""
    hf_text_config: PretrainedConfig = field(init=False)
    """The Hugging Face config of the text model (same as hf_config for text models)."""
    hf_config_path: str | None = None
    """Name or path of the Hugging Face config to use. If unspecified, model
    name or path will be used."""
    allowed_local_media_path: str = ""
    """Allowing API requests to read local images or videos from directories
    specified by the server file system. This is a security risk. Should only
    be enabled in trusted environments."""
    allowed_media_domains: list[str] | None = None
    """If set, only media URLs that belong to this domain can be used for
    multi-modal inputs. """
    revision: str | None = None
    """The specific model version to use. It can be a branch name, a tag name,
    or a commit id. If unspecified, will use the default version."""
    code_revision: str | None = None
    """The specific revision to use for the model code on the Hugging Face Hub.
    It can be a branch name, a tag name, or a commit id. If unspecified, will
    use the default version."""
    tokenizer_revision: str | None = None
    """The specific revision to use for the tokenizer on the Hugging Face Hub.
    It can be a branch name, a tag name, or a commit id. If unspecified, will
    use the default version."""
    max_model_len: SkipValidation[int] = None  # type: ignore
    """Model context length (prompt and output). If unspecified, will be
    automatically derived from the model config.

    When passing via `--max-model-len`, supports k/m/g/K/M/G in human-readable
    format. Examples:\n
    - 1k -> 1000\n
    - 1K -> 1024\n
    - 25.6k -> 25,600"""
    spec_target_max_model_len: int | None = None
    """Specify the maximum length for spec decoding draft models."""
    quantization: SkipValidation[QuantizationMethods | None] = None
    """Method used to quantize the weights. If `None`, we first check the
    `quantization_config` attribute in the model config file. If that is
    `None`, we assume the model weights are not quantized and use `dtype` to
    determine the data type of the weights."""
    enforce_eager: bool = False
    """Whether to always use eager-mode PyTorch. If True, we will disable CUDA
    graph and always execute the model in eager mode. If False, we will use
    CUDA graph and eager execution in hybrid for maximal performance and
    flexibility."""
    max_logprobs: int = 20
    """Maximum number of log probabilities to return when `logprobs` is
    specified in `SamplingParams`. The default value comes the default for the
    OpenAI Chat Completions API. -1 means no cap, i.e. all (output_length *
    vocab_size) logprobs are allowed to be returned and it may cause OOM."""
    logprobs_mode: LogprobsMode = "raw_logprobs"
    """Indicates the content returned in the logprobs and prompt_logprobs.
    Supported mode:
    1) raw_logprobs, 2) processed_logprobs, 3) raw_logits, 4) processed_logits.
    Raw means the values before applying any logit processors, like bad words.
    Processed means the values after applying all processors, including
    temperature and top_k/top_p.
    """
    disable_sliding_window: bool = False
    """Whether to disable sliding window. If True, we will disable the sliding
    window functionality of the model, capping to sliding window size. If the
    model does not support sliding window, this argument is ignored."""
    disable_cascade_attn: bool = False
    """Disable cascade attention for V1. While cascade attention does not
    change the mathematical correctness, disabling it could be useful for
    preventing potential numerical issues. Note that even if this is set to
    False, cascade attention will be only used when the heuristic tells that
    it's beneficial."""
    skip_tokenizer_init: bool = False
    """Skip initialization of tokenizer and detokenizer. Expects valid
    `prompt_token_ids` and `None` for prompt from the input. The generated
    output will contain token ids."""
    enable_prompt_embeds: bool = False
    """If `True`, enables passing text embeddings as inputs via the
    `prompt_embeds` key.

    WARNING: The vLLM engine may crash if incorrect shape of embeddings is passed.
    Only enable this flag for trusted users!"""
    served_model_name: str | list[str] | None = None
    """The model name(s) used in the API. If multiple names are provided, the
    server will respond to any of the provided names. The model name in the
    model field of a response will be the first name in this list. If not
    specified, the model name will be the same as the `--model` argument. Noted
    that this name(s) will also be used in `model_name` tag content of
    prometheus metrics, if multiple names provided, metrics tag will take the
    first one."""
    config_format: str | ConfigFormat = "auto"
    """The format of the model config to load:\n
    - "auto" will try to load the config in hf format if available else it
    will try to load in mistral format.\n
    - "hf" will load the config in hf format.\n
    - "mistral" will load the config in mistral format."""
    hf_token: bool | str | None = None
    """The token to use as HTTP bearer authorization for remote files . If
    `True`, will use the token generated when running `huggingface-cli login`
    (stored in `~/.huggingface`)."""
    hf_overrides: HfOverrides = field(default_factory=dict)
    """If a dictionary, contains arguments to be forwarded to the Hugging Face
    config. If a callable, it is called to update the HuggingFace config."""
    logits_processor_pattern: str | None = None
    """Optional regex pattern specifying valid logits processor qualified names
    that can be passed with the `logits_processors` extra completion argument.
    Defaults to `None`, which allows no processors."""
    generation_config: str = "auto"
    """The folder path to the generation config. Defaults to `"auto"`, the
    generation config will be loaded from model path. If set to `"vllm"`, no
    generation config is loaded, vLLM defaults will be used. If set to a folder
    path, the generation config will be loaded from the specified folder path.
    If `max_new_tokens` is specified in generation config, then it sets a
    server-wide limit on the number of output tokens for all requests."""
    override_generation_config: dict[str, Any] = field(default_factory=dict)
    """Overrides or sets generation config. e.g. `{"temperature": 0.5}`. If
    used with `--generation-config auto`, the override parameters will be
    merged with the default config from the model. If used with
    `--generation-config vllm`, only the override parameters are used."""
    enable_sleep_mode: bool = False
    """Enable sleep mode for the engine (only cuda platform is supported)."""
    model_impl: str | ModelImpl = "auto"
    """Which implementation of the model to use:\n
    - "auto" will try to use the vLLM implementation, if it exists, and fall
    back to the Transformers implementation if no vLLM implementation is
    available.\n
    - "vllm" will use the vLLM model implementation.\n
    - "transformers" will use the Transformers model implementation.\n
    - "terratorch" will use the TerraTorch model implementation.
    """
    override_attention_dtype: str | None = None
    """Override dtype for attention"""
    logits_processors: list[str | type[LogitsProcessor]] | None = None
    """One or more logits processors' fully-qualified class names or class
    definitions"""
    io_processor_plugin: str | None = None
    """IOProcessor plugin name to load at model startup"""

    # Pooler config
    pooler_config: PoolerConfig | None = None
    """Pooler config which controls the behaviour of output pooling in pooling
    models."""
    override_pooler_config: dict | PoolerConfig | None = None
    """[DEPRECATED] Use `pooler_config` instead. This field will be removed in
    v0.12.0 or v1.0.0, whichever is sooner."""

    # Multimodal config and init vars
    multimodal_config: MultiModalConfig | None = None
    """Configuration for multimodal model. If `None`, this will be inferred
    from the architecture of `self.model`."""
    limit_mm_per_prompt: InitVar[dict[str, int | dict[str, int]] | None] = None
    enable_mm_embeds: InitVar[bool | None] = None
    media_io_kwargs: InitVar[dict[str, dict[str, Any]] | None] = None
    mm_processor_kwargs: InitVar[dict[str, Any] | None] = None
    mm_processor_cache_gb: InitVar[float | None] = None
    mm_processor_cache_type: InitVar[MMCacheType | None] = None
    mm_shm_cache_max_object_size_mb: InitVar[int | None] = None
    mm_encoder_tp_mode: InitVar[MMEncoderTPMode | None] = None
    mm_encoder_attn_backend: InitVar[_Backend | str | None] = None
    interleave_mm_strings: InitVar[bool | None] = None
    skip_mm_profiling: InitVar[bool | None] = None
    video_pruning_rate: InitVar[float | None] = None

    def compute_hash(self) -> str:
        """
        WARNING: Whenever a new field is added to this config,
        ensure that it is included in the factors list if
        it affects the computation graph.

        Provide a hash that uniquely identifies all the configs
        that affect the structure of the computation
        graph from input ids/embeddings to the final hidden states,
        excluding anything before input ids/embeddings and after
        the final hidden states.
        """
        factors: list[Any] = []
        factors.append(self.model)
        factors.append(self.dtype)
        factors.append(self.quantization)
        factors.append(self.revision)
        factors.append(self.code_revision)
        factors.append(self.max_model_len)
        factors.append(self.max_logprobs)
        factors.append(self.disable_sliding_window)
        factors.append(self.trust_remote_code)
        factors.append(self.generation_config)
        factors.append(self.model_impl)
        factors.append(self.override_generation_config)
        factors.append(self.video_pruning_rate)
        factors.append(self.enable_prompt_embeds)

        # hf_config can control how the model looks!
        try:
            hf_config_json = self.hf_config.to_json_string(use_diff=False)
        except TypeError:
            from transformers import PretrainedConfig

            from vllm.utils.jsontree import json_map_leaves

            # Handle nested HF configs with unserializable values gracefully
            hf_config_json = (
                json.dumps(
                    json_map_leaves(
                        lambda v: v.to_dict()
                        if isinstance(v, PretrainedConfig)
                        else str(v),
                        self.hf_config.to_dict(),
                    ),
                    indent=2,
                    sort_keys=True,
                )
                + "\n"
            )

        factors.append(hf_config_json)

        str_factors = str(factors)
        assert_hashable(str_factors)
        return hashlib.sha256(str(factors).encode()).hexdigest()

    def _update_nested(
        self,
        target: PretrainedConfig | dict[str, Any],
        updates: dict[str, Any],
    ) -> None:
        """Recursively updates a config or dict with nested updates."""
        for key, value in updates.items():
            if isinstance(value, dict):
                # Get the nested target
                if isinstance(target, dict):
                    nested_target = target.get(key)
                else:
                    nested_target = getattr(target, key, None)

                # If nested target exists and can be updated recursively
                if nested_target is not None and (
                    isinstance(nested_target, dict)
                    or hasattr(nested_target, "__dict__")
                ):
                    self._update_nested(nested_target, value)
                    continue

            # Set the value (base case)
            if isinstance(target, dict):
                target[key] = value
            else:
                setattr(target, key, value)

    def _apply_dict_overrides(
        self,
        config: PretrainedConfig,
        overrides: dict[str, Any],
    ) -> None:
        """Apply dict overrides, handling both nested configs and dict values."""
        from transformers import PretrainedConfig

        for key, value in overrides.items():
            attr = getattr(config, key, None)
            if attr is not None and isinstance(attr, PretrainedConfig):
                # It's a nested config - recursively update it
                self._update_nested(attr, value)
            else:
                # It's a dict-valued parameter - set it directly
                setattr(config, key, value)

    def __post_init__(
        self,
        # Multimodal config init vars
        limit_mm_per_prompt: dict[str, int] | None,
        enable_mm_embeds: bool | None,
        media_io_kwargs: dict[str, dict[str, Any]] | None,
        mm_processor_kwargs: dict[str, Any] | None,
        mm_processor_cache_gb: float | None,
        mm_processor_cache_type: MMCacheType | None,
        mm_shm_cache_max_object_size_mb: int | None,
        mm_encoder_tp_mode: MMEncoderTPMode | None,
        mm_encoder_attn_backend: _Backend | str | None,
        interleave_mm_strings: bool | None,
        skip_mm_profiling: bool | None,
        video_pruning_rate: float | None,
    ) -> None:
        # Set the default seed to 0 in V1.
        # NOTE(woosuk): In V1, we use separate processes for workers (unless
        # VLLM_ENABLE_V1_MULTIPROCESSING=0), so setting a seed here
        # doesn't affect the user process. However, without a consistent seed,
        # different tensor parallel workers would sample different tokens,
        # leading to inconsistent results.
        if self.seed is None:
            self.seed = 0
            if not envs.VLLM_ENABLE_V1_MULTIPROCESSING:
                logger.warning(
                    "The global random seed is set to %d. Since "
                    "VLLM_ENABLE_V1_MULTIPROCESSING is set to False, this may "
                    "affect the random state of the Python process that "
                    "launched vLLM.",
                    self.seed,
                )

        # Keep set served_model_name before maybe_model_redirect(self.model)
        self.served_model_name = get_served_model_name(
            self.model, self.served_model_name
        )
        self.model = maybe_model_redirect(self.model)
        # The tokenizer is consistent with the model by default.
        if self.tokenizer is None:
            self.tokenizer = self.model
        if self.tokenizer_revision is None:
            self.tokenizer_revision = self.revision
        self.tokenizer = maybe_model_redirect(self.tokenizer)

        if isinstance(self.hf_config_path, str):
            self.hf_config_path = maybe_model_redirect(self.hf_config_path)

        if callable(self.hf_overrides):
            hf_overrides_kw = {}
            hf_overrides_fn = self.hf_overrides
            dict_overrides: dict[str, Any] = {}
        else:
            # Separate dict overrides from flat ones
            # We'll determine how to apply dict overrides after loading the config
            hf_overrides_kw = {}
            dict_overrides = {}
            for key, value in self.hf_overrides.items():
                if isinstance(value, dict):
                    dict_overrides[key] = value
                else:
                    hf_overrides_kw[key] = value
            hf_overrides_fn = None

        self.maybe_pull_model_tokenizer_for_runai(self.model, self.tokenizer)

        if (
            (backend := envs.VLLM_ATTENTION_BACKEND)
            and backend == "FLASHINFER"
            and find_spec("flashinfer") is None
        ):
            raise ValueError(
                "VLLM_ATTENTION_BACKEND is set to FLASHINFER, but flashinfer "
                "module was not found. See "
                "https://github.com/vllm-project/vllm/blob/main/docker/Dockerfile "  # noqa: E501
                "for instructions on how to install it."
            )

        from vllm.platforms import current_platform

        if self.override_attention_dtype is not None and not current_platform.is_rocm():
            warnings.warn(
                "override-attention-dtype is set but not using ROCm platform",
                stacklevel=2,
            )

        if self.enable_sleep_mode and not current_platform.is_sleep_mode_available():
            raise ValueError("Sleep mode is not supported on current platform.")

        hf_config = get_config(
            self.hf_config_path or self.model,
            self.trust_remote_code,
            self.revision,
            self.code_revision,
            self.config_format,
            hf_overrides_kw=hf_overrides_kw,
            hf_overrides_fn=hf_overrides_fn,
        )

        self.hf_config = hf_config
        if dict_overrides:
            self._apply_dict_overrides(hf_config, dict_overrides)
        self.hf_text_config = get_hf_text_config(self.hf_config)
        self.attention_chunk_size = getattr(
            self.hf_text_config, "attention_chunk_size", None
        )
        self.encoder_config = self._get_encoder_config()
        self.hf_image_processor_config = get_hf_image_processor_config(
            self.model, hf_token=self.hf_token, revision=self.revision
        )

        architectures = self.architectures
        registry = self.registry
        is_generative_model = registry.is_text_generation_model(architectures, self)
        is_pooling_model = registry.is_pooling_model(architectures, self)

        def _task_to_convert(task: TaskOption) -> ConvertType:
            if task == "embedding" or task == "embed":
                return "embed"
            if task == "classify":
                return "classify"
            if task == "reward":
                return "reward"
            if task == "score":
                new_task = self._get_default_pooling_task(architectures)
                return "classify" if new_task == "classify" else "embed"

            return "none"

        if self.task is not None:
            runner: RunnerOption = "auto"
            convert: ConvertOption = "auto"
            msg_prefix = (
                "The 'task' option has been deprecated and will be "
                "removed in v0.13.0 or v1.0, whichever comes first."
            )
            msg_hint = "Please remove this option."

            is_generative_task = self.task in _RUNNER_TASKS["generate"]
            is_pooling_task = self.task in _RUNNER_TASKS["pooling"]

            if is_generative_model and is_pooling_model:
                if is_generative_task:
                    runner = "generate"
                    convert = "auto"
                    msg_hint = (
                        "Please replace this option with `--runner "
                        "generate` to continue using this model "
                        "as a generative model."
                    )
                elif is_pooling_task:
                    runner = "pooling"
                    convert = "auto"
                    msg_hint = (
                        "Please replace this option with `--runner "
                        "pooling` to continue using this model "
                        "as a pooling model."
                    )
                else:  # task == "auto"
                    pass
            elif is_generative_model or is_pooling_model:
                if is_generative_task:
                    runner = "generate"
                    convert = "auto"
                    msg_hint = "Please remove this option"
                elif is_pooling_task:
                    runner = "pooling"
                    convert = _task_to_convert(self.task)
                    msg_hint = (
                        "Please replace this option with `--convert "
                        f"{convert}` to continue using this model "
                        "as a pooling model."
                    )
                else:  # task == "auto"
                    pass
            else:
                debug_info = {
                    "architectures": architectures,
                    "is_generative_model": is_generative_model,
                    "is_pooling_model": is_pooling_model,
                }
                raise AssertionError(
                    "The model should be a generative or "
                    "pooling model when task is set to "
                    f"{self.task!r}. Found: {debug_info}"
                )

            self.runner = runner
            self.convert = convert

            msg = f"{msg_prefix} {msg_hint}"
            warnings.warn(msg, DeprecationWarning, stacklevel=2)

        self.runner_type = self._get_runner_type(architectures, self.runner)
        self.convert_type = self._get_convert_type(
            architectures, self.runner_type, self.convert
        )

        if self.runner_type == "generate" and not is_generative_model:
            generate_converts = _RUNNER_CONVERTS["generate"]
            if self.convert_type not in generate_converts:
                # Currently we don't have any converters for generative models
                raise ValueError("This model does not support `--runner generate`.")
        if self.runner_type == "pooling" and not is_pooling_model:
            pooling_converts = _RUNNER_CONVERTS["pooling"]
            if self.convert_type not in pooling_converts:
                convert_option = "<" + "|".join(pooling_converts) + ">"
                raise ValueError(
                    "This model does not support `--runner pooling`. "
                    f"You can pass `--convert {convert_option} to adapt "
                    "it into a pooling model."
                )

        # Note: Initialize these attributes early because transformers fallback
        # may fail to load dynamic modules in child processes
        model_info, arch = registry.inspect_model_cls(architectures, self)
        self._model_info = model_info
        self._architecture = arch
        logger.info("Resolved architecture: %s", arch)

        # Init pooler config if needed
        if self.runner_type == "pooling":
            if self.override_pooler_config is not None:
                logger.warning_once(
                    "`override_pooler_config` is deprecated and will be "
                    "removed in v0.12.0 or v1.0.0, whichever is sooner. "
                    "Please use `pooler_config` instead."
                )

                if isinstance(self.override_pooler_config, dict):
                    self.pooler_config = PoolerConfig(**self.override_pooler_config)
                else:
                    self.pooler_config = self.override_pooler_config

            if self.pooler_config is None:
                self.pooler_config = PoolerConfig()

            base_config = get_pooling_config(self.model, self.revision)
            if base_config is not None:
                # Only set values that are not overridden by the user
                for k, v in base_config.items():
                    if getattr(self.pooler_config, k) is None:
                        setattr(self.pooler_config, k, v)

            default_pooling_type = self._model_info.default_pooling_type
            if self.pooler_config.pooling_type is None:
                self.pooler_config.pooling_type = default_pooling_type

        self.dtype: torch.dtype = _get_and_verify_dtype(
            self.model,
            self.hf_config,
            self.dtype,
            is_pooling_model=self.runner_type == "pooling",
            revision=self.revision,
        )

        self.original_max_model_len = self.max_model_len
        self.max_model_len = self.get_and_verify_max_len(self.max_model_len)
        # Init multimodal config if needed
        if self._model_info.supports_multimodal:
            if (
                mm_encoder_tp_mode == "data"
                and not self._model_info.supports_multimodal_encoder_tp_data
            ):
                logger.warning_once(
                    "This model does not support `--mm-encoder-tp-mode data`. "
                    "Falling back to `--mm-encoder-tp-mode weights`."
                )
                mm_encoder_tp_mode = "weights"

            mm_config_kwargs = dict(
                limit_per_prompt=limit_mm_per_prompt,
                enable_mm_embeds=enable_mm_embeds,
                media_io_kwargs=media_io_kwargs,
                mm_processor_kwargs=mm_processor_kwargs,
                mm_processor_cache_gb=mm_processor_cache_gb,
                mm_processor_cache_type=mm_processor_cache_type,
                mm_shm_cache_max_object_size_mb=mm_shm_cache_max_object_size_mb,
                mm_encoder_tp_mode=mm_encoder_tp_mode,
                mm_encoder_attn_backend=mm_encoder_attn_backend,
                interleave_mm_strings=interleave_mm_strings,
                skip_mm_profiling=skip_mm_profiling,
                video_pruning_rate=video_pruning_rate,
            )

            mm_config_kwargs = {
                k: v for k, v in mm_config_kwargs.items() if v is not None
            }

            self.multimodal_config = MultiModalConfig(**mm_config_kwargs)

        if self.disable_sliding_window:
            # Set after get_and_verify_max_len to ensure that max_model_len
            # can be correctly capped to sliding window size
            self.hf_text_config.sliding_window = None

        if not self.skip_tokenizer_init:
            self._verify_tokenizer_mode()

        # Avoid running try_verify_and_update_config multiple times
        self.config_updated = False

        self._verify_quantization()
        self._verify_cuda_graph()
        self._verify_bnb_config()

    @field_validator("quantization", mode="before")
    @classmethod
    def validate_quantization_before(cls, value: Any) -> Any:
        if isinstance(value, str):
            return value.lower()
        return value

    @model_validator(mode="after")
    def validate_model_config_after(self: "ModelConfig") -> "ModelConfig":
        if not isinstance(self.tokenizer, str):
            raise ValueError("tokenizer must be a string after __post_init__.")
        if not isinstance(self.max_model_len, int):
            raise ValueError("max_model_len must be an integer after __post_init__.")
        return self

    def _get_transformers_backend_cls(self) -> str:
        """Determine which Transformers backend class will be used if
        `model_impl` is set to `transformers` or `auto`."""
        cls = "Transformers"
        # If 'hf_config != hf_text_config' it's a nested config, i.e. multimodal
        cls += "MultiModal" if self.hf_config != self.hf_text_config else ""
        cls += "MoE" if self.get_num_experts() > 1 else ""
        # Check if the architecture we're wrapping has defaults
        runner = None
        task = None
        if defaults := try_match_architecture_defaults(self.architectures[0]):
            _, (runner, task) = defaults
        # User specified value take precedence
        if self.runner != "auto":
            runner = self.runner
        # Only consider Transformers backend pooling classes if we're wrapping an
        # architecture that defaults to pooling. Otherwise, we return the LM class
        # and use adapters.
        if runner == "pooling" and task in {"embed", "classify"}:
            if task == "embed":
                cls += "EmbeddingModel"
            elif task == "classify":
                cls += "ForSequenceClassification"
        else:
            cls += "ForCausalLM"
        return cls

    def using_transformers_backend(self) -> bool:
        """Check if the model is using the Transformers backend class."""
        used_cls = self._model_info.architecture
        transformers_backend_cls = self._get_transformers_backend_cls()
        return used_cls == transformers_backend_cls

    @property
    def registry(self):
        return me_models.ModelRegistry

    @property
    def architectures(self) -> list[str]:
        return getattr(self.hf_config, "architectures", [])

    @property
    def architecture(self) -> str:
        """The architecture vllm actually used."""
        return self._architecture

    def maybe_pull_model_tokenizer_for_runai(self, model: str, tokenizer: str) -> None:
        """Pull model/tokenizer from Object Storage to temporary
        directory when needed.

        Args:
            model: Model name or path
            tokenizer: Tokenizer name or path
        """

        if not (is_runai_obj_uri(model) or is_runai_obj_uri(tokenizer)):
            return

        if is_runai_obj_uri(model):
            object_storage_model = ObjectStorageModel(url=model)
            object_storage_model.pull_files(
                model, allow_pattern=["*.model", "*.py", "*.json"]
            )
            self.model_weights = model
            self.model = object_storage_model.dir

            # If tokenizer is same as model, download to same directory
            if model == tokenizer:
                object_storage_model.pull_files(
                    model,
                    ignore_pattern=[
                        "*.pt",
                        "*.safetensors",
                        "*.bin",
                        "*.tensors",
                        "*.pth",
                    ],
                )
                self.tokenizer = object_storage_model.dir
                return

        # Only download tokenizer if needed and not already handled
        if is_runai_obj_uri(tokenizer):
            object_storage_tokenizer = ObjectStorageModel(url=tokenizer)
            object_storage_tokenizer.pull_files(
                model,
                ignore_pattern=["*.pt", "*.safetensors", "*.bin", "*.tensors", "*.pth"],
            )
            self.tokenizer = object_storage_tokenizer.dir

    def _get_encoder_config(self):
        return get_sentence_transformer_tokenizer_config(self.model, self.revision)

    def _verify_tokenizer_mode(self) -> None:
        tokenizer_mode = cast(TokenizerMode, self.tokenizer_mode.lower())
        if tokenizer_mode not in get_args(TokenizerMode):
            raise ValueError(
                f"Unknown tokenizer mode: {self.tokenizer_mode}. Must be "
                f"one of {get_args(TokenizerMode)}."
            )
        self.tokenizer_mode = tokenizer_mode

    def _get_default_runner_type(
        self,
        architectures: list[str],
    ) -> RunnerType:
        registry = self.registry

        # Some Sentence Transformers models use *ForCausalLM archs
        if get_pooling_config(self.model, self.revision):
            return "pooling"

        for arch in architectures:
            if arch in registry.get_supported_archs():
                if registry.is_pooling_model(architectures, self):
                    return "pooling"
                if registry.is_text_generation_model(architectures, self):
                    return "generate"

            match = try_match_architecture_defaults(arch)
            if match:
                _, (runner_type, _) = match
                return runner_type

        return "generate"

    def _get_runner_type(
        self,
        architectures: list[str],
        runner: RunnerOption,
    ) -> RunnerType:
        if runner != "auto":
            return runner

        runner_type = self._get_default_runner_type(architectures)

        # Don't log the most common case
        if runner_type != "generate":
            logger.info(
                "Resolved `--runner auto` to `--runner %s`. "
                "Pass the value explicitly to silence this message.",
                runner_type,
            )

        return runner_type

    def _get_default_convert_type(
        self,
        architectures: list[str],
        runner_type: RunnerType,
    ) -> ConvertType:
        registry = self.registry

        for arch in architectures:
            if arch in registry.get_supported_archs():
                if runner_type == "generate" and registry.is_text_generation_model(
                    architectures, self
                ):
                    return "none"
                if runner_type == "pooling" and registry.is_pooling_model(
                    architectures, self
                ):
                    return "none"

            match = try_match_architecture_defaults(arch, runner_type=runner_type)
            if match:
                _, (_, convert_type) = match
                return convert_type

        # This is to handle Sentence Transformers models that use *ForCausalLM
        # and also multi-modal pooling models which are not defined as
        # Sentence Transformers models
        if runner_type == "pooling":
            return "embed"

        return "none"

    def _get_convert_type(
        self,
        architectures: list[str],
        runner_type: RunnerType,
        convert: ConvertOption,
    ) -> ConvertType:
        if convert != "auto":
            return convert

        convert_type = self._get_default_convert_type(architectures, runner_type)

        # Don't log the most common case
        if convert_type != "none":
            logger.info(
                "Resolved `--convert auto` to `--convert %s`. "
                "Pass the value explicitly to silence this message.",
                convert_type,
            )

        return convert_type

    def _get_default_pooling_task(
        self,
        architectures: list[str],
    ) -> Literal["embed", "classify", "reward"]:
        if self.registry.is_cross_encoder_model(architectures, self):
            return "classify"

        for arch in architectures:
            match = try_match_architecture_defaults(arch, runner_type="pooling")
            if match:
                _, (_, convert_type) = match
                assert convert_type != "none"
                return convert_type

        return "embed"

    def _parse_quant_hf_config(self, hf_config: PretrainedConfig):
        quant_cfg = getattr(hf_config, "quantization_config", None)
        if quant_cfg is None:
            # compressed-tensors uses a "compression_config" key
            quant_cfg = getattr(hf_config, "compression_config", None)

        else:
            # Set quant_method for ModelOpt models.
            producer_name = quant_cfg.get("producer", {}).get("name")
            if producer_name == "modelopt":
                quant_algo = quant_cfg.get("quantization", {}).get("quant_algo")
                if quant_algo == "FP8":
                    quant_cfg["quant_method"] = "modelopt"
                elif quant_algo == "NVFP4":
                    quant_cfg["quant_method"] = "modelopt_fp4"
                elif quant_algo is not None:
                    raise ValueError(f"Unknown ModelOpt quant algo: {quant_algo}")

        return quant_cfg

    def _verify_quantization(self) -> None:
        supported_quantization = me_quant.QUANTIZATION_METHODS
        if self.quantization is not None:
            self.quantization = cast(me_quant.QuantizationMethods, self.quantization)

        # Parse quantization method from the HF model config, if available.
        quant_cfg = self._parse_quant_hf_config(self.hf_config)
        if quant_cfg is None and (
            text_config := getattr(self.hf_config, "text_config", None)
        ):
            # Check the text config as well for multi-modal models.
            quant_cfg = self._parse_quant_hf_config(text_config)

        if quant_cfg is not None:
            # Use the community standard 'quant_method'
            quant_method = quant_cfg.get("quant_method", "").lower()

            # Normalize library names
            quant_method = quant_method.replace(
                "compressed_tensors", "compressed-tensors"
            )

            quant_cfg["quant_method"] = quant_method

            # Quantization methods which are overrides (i.e. they have a
            # `override_quantization_method` method) must be checked in order
            # of preference (this is particularly important for GPTQ).
            overrides = [
                "bitblas",
                "gptq_marlin_24",
                "gptq_marlin",
                "gptq_bitblas",
                "awq_marlin",
                "ipex",
                "moe_wna16",
                "modelopt",
                "modelopt_fp4",
                "petit_nvfp4",
                # Ensure heavy backends are probed last to avoid unnecessary
                # imports during override detection (e.g., MXFP4 imports Triton)
                "mxfp4",
            ]
            quantization_methods = [
                q for q in supported_quantization if q not in overrides
            ]
            # Any custom overrides will be in quantization_methods so we place
            # them at the start of the list so custom overrides have preference
            # over the built-in ones.
            quantization_methods = quantization_methods + overrides

            # Detect which checkpoint is it
            for name in quantization_methods:
                method = me_quant.get_quantization_config(name)
                quantization_override = method.override_quantization_method(
                    quant_cfg, self.quantization
                )
                if quantization_override is not None:
                    # Raise error if the override is not custom (custom would
                    # be in QUANTIZATION_METHODS but not QuantizationMethods)
                    # and hasn't been added to the overrides list.
                    if (
                        name in get_args(me_quant.QuantizationMethods)
                        and name not in overrides
                    ):
                        raise ValueError(
                            f"Quantization method {name} is an override but "
                            "is has not been added to the `overrides` list "
                            "above. This is necessary to ensure that the "
                            "overrides are checked in order of preference."
                        )
                    quant_method = quantization_override
                    self.quantization = quantization_override
                    break

            quant_method = quant_method if quant_method != "" else None
            # Verify quantization configurations.
            if self.quantization is None:
                self.quantization = quant_method
            elif self.quantization != quant_method:
                raise ValueError(
                    "Quantization method specified in the model config "
                    f"({quant_method}) does not match the quantization "
                    f"method specified in the `quantization` argument "
                    f"({self.quantization})."
                )

        if self.quantization is not None:
            if self.quantization not in supported_quantization:
                raise ValueError(
                    f"Unknown quantization method: {self.quantization}. Must "
                    f"be one of {supported_quantization}."
                )
            from vllm.platforms import current_platform

            current_platform.verify_quantization(self.quantization)

    def _verify_cuda_graph(self) -> None:
        # CUDAGraph capture not supported for encoder-decoder models on ROCm
        unsupported_rocm = self.is_encoder_decoder
        if unsupported_rocm and not self.enforce_eager and current_platform.is_rocm():
            logger.warning(
                "CUDA graph is not supported for %s on ROCm yet, fallback "
                "to eager mode.",
                self.hf_config.model_type,
            )
            self.enforce_eager = True

    def _verify_bnb_config(self) -> None:
        """
        The current version of bitsandbytes (0.46.1) with 8-bit models does not
        yet support CUDA graph.
        # TODO Remove this when bitsandbytes supports.
        """
        is_bitsandbytes = self.quantization == "bitsandbytes"
        has_quantization_config = (
            getattr(self.hf_config, "quantization_config", None) is not None
        )
        is_8bit = (
            self.hf_config.quantization_config.get("load_in_8bit", False)
            if has_quantization_config
            else False
        )
        if all(
            [
                is_bitsandbytes,
                has_quantization_config,
                is_8bit,
                not self.enforce_eager,
            ]
        ):
            logger.warning(
                "CUDA graph is not supported on BitsAndBytes 8bit yet, "
                "fallback to the eager mode."
            )

            self.enforce_eager = True

    def _verify_with_expert_parallelism(self) -> None:
        num_experts = self.get_num_experts()
        if num_experts < 1:
            raise ValueError(
                "Number of experts in the model must be greater than 0 "
                "when expert parallelism is enabled."
            )

    def verify_dual_chunk_attention_config(
        self,
        load_config: LoadConfig,
    ) -> None:
        if hasattr(self.hf_config, "dual_chunk_attention_config"):
            # Try loading the sparse attention config
            from vllm.model_executor.model_loader.weight_utils import (
                get_sparse_attention_config,
            )

            sparse_attn_config = get_sparse_attention_config(self, load_config)
            if sparse_attn_config:
                self.hf_config.dual_chunk_attention_config[
                    "sparse_attention_config"
                ] = sparse_attn_config
                if (
                    "sparse_attention_enabled"
                    not in self.hf_config.dual_chunk_attention_config
                ):
                    self.hf_config.dual_chunk_attention_config[
                        "sparse_attention_enabled"
                    ] = True

    def verify_with_parallel_config(
        self,
        parallel_config: ParallelConfig,
    ) -> None:
        if parallel_config.distributed_executor_backend == "external_launcher":
            assert self.seed is not None, (
                "Seed must be set when using external launcher backend to "
                "make sure sampling results are the same across workers."
            )

        total_num_attention_heads = getattr(
            self.hf_text_config, "num_attention_heads", 0
        )
        tensor_parallel_size = parallel_config.tensor_parallel_size
        if total_num_attention_heads % tensor_parallel_size != 0:
            raise ValueError(
                f"Total number of attention heads ({total_num_attention_heads})"
                " must be divisible by tensor parallel size "
                f"({tensor_parallel_size})."
            )

        if parallel_config.enable_expert_parallel:
            self._verify_with_expert_parallelism()

        pipeline_parallel_size = parallel_config.pipeline_parallel_size
        if pipeline_parallel_size > 1 and not self.registry.is_pp_supported_model(
            self.architectures, self
        ):
            raise NotImplementedError(
                "Pipeline parallelism is not supported for this model. "
                "Supported models implement the `SupportsPP` interface."
            )

        decode_context_parallel_size = parallel_config.decode_context_parallel_size
        if decode_context_parallel_size > 1 and not self.use_mla:
            total_num_kv_heads = self.get_total_num_kv_heads()
            assert tensor_parallel_size > total_num_kv_heads, (
                f"tensor parallel size {tensor_parallel_size} must be greater "
                f"than total num kv heads {total_num_kv_heads} when enable "
                f"decode context parallel for GQA/MQA"
            )

            max_dcp_size = tensor_parallel_size // total_num_kv_heads
            assert decode_context_parallel_size <= max_dcp_size, (
                f"decode context parallel size must less than or equal to "
                f"(tensor parallel size {tensor_parallel_size} // total "
                f"num kv heads {total_num_kv_heads}) = {max_dcp_size}, "
                f"but got {decode_context_parallel_size}"
            )

    def get_sliding_window(self) -> int | None:
        """Get the sliding window size from the HF text config if present."""
        return getattr(self.hf_text_config, "sliding_window", None)

    def get_vocab_size(self) -> int:
        return getattr(self.hf_text_config, "vocab_size", 0)

    def get_hidden_size(self) -> int:
        return getattr(self.hf_text_config, "hidden_size", 0)

    @property
    def is_deepseek_mla(self) -> bool:
        if not hasattr(self.hf_text_config, "model_type"):
            return False
        elif self.hf_text_config.model_type in (
            "deepseek_v2",
            "deepseek_v3",
            "deepseek_v32",
            "deepseek_mtp",
            "kimi_k2",
            "kimi_linear",
            "longcat_flash",
            "pangu_ultra_moe",
            "pangu_ultra_moe_mtp",
        ):
            return self.hf_text_config.kv_lora_rank is not None
        elif self.hf_text_config.model_type == "eagle":
            # if the model is an EAGLE module, check for the
            # underlying architecture
            return (
                self.hf_text_config.model.model_type
                in ("deepseek_v2", "deepseek_v3", "deepseek_v32")
                and self.hf_text_config.kv_lora_rank is not None
            )
        return False

    def get_head_size(self) -> int:
        # TODO remove hard code
        if self.is_deepseek_mla:
            qk_rope_head_dim = getattr(self.hf_text_config, "qk_rope_head_dim", 0)
            if self.use_mla:
                return self.hf_text_config.kv_lora_rank + qk_rope_head_dim
            else:
                qk_nope_head_dim = getattr(self.hf_text_config, "qk_nope_head_dim", 0)
                if qk_rope_head_dim and qk_nope_head_dim:
                    return qk_rope_head_dim + qk_nope_head_dim

        if hasattr(self.hf_text_config, "model_type") and (
            self.hf_text_config.model_type == "zamba2"
        ):
            return self.hf_text_config.attention_head_dim

        if self.is_attention_free:
            return 0

        # NOTE: Some configs may set head_dim=None in the config
        if getattr(self.hf_text_config, "head_dim", None) is not None:
            return self.hf_text_config.head_dim

        # NOTE: Some models (such as PLaMo2.1) use `hidden_size_per_head`
        if getattr(self.hf_text_config, "hidden_size_per_head", None) is not None:
            return self.hf_text_config.hidden_size_per_head

        # FIXME(woosuk): This may not be true for all models.
        return (
            self.hf_text_config.hidden_size // self.hf_text_config.num_attention_heads
        )

    def get_total_num_kv_heads(self) -> int:
        """Returns the total number of KV heads."""
        # For GPTBigCode & Falcon:
        # NOTE: for falcon, when new_decoder_architecture is True, the
        # multi_query flag is ignored and we use n_head_kv for the number of
        # KV heads.
        falcon_model_types = ["falcon", "RefinedWeb", "RefinedWebModel"]
        new_decoder_arch_falcon = (
            self.hf_config.model_type in falcon_model_types
            and getattr(self.hf_config, "new_decoder_architecture", False)
        )
        if not new_decoder_arch_falcon and getattr(
            self.hf_text_config, "multi_query", False
        ):
            # Multi-query attention, only one KV head.
            # Currently, tensor parallelism is not supported in this case.
            return 1

        # For DBRX and MPT
        if self.hf_config.model_type == "mpt":
            if "kv_n_heads" in self.hf_config.attn_config:
                return self.hf_config.attn_config["kv_n_heads"]
            return self.hf_config.num_attention_heads
        if self.hf_config.model_type == "dbrx":
            return getattr(
                self.hf_config.attn_config,
                "kv_n_heads",
                self.hf_config.num_attention_heads,
            )

        if self.hf_config.model_type == "nemotron-nas":
            for block in self.hf_config.block_configs:
                if not block.attention.no_op:
                    return (
                        self.hf_config.num_attention_heads
                        // block.attention.n_heads_in_group
                    )

            raise RuntimeError("Couldn't determine number of kv heads")

        if self.is_attention_free:
            return 0

        attributes = [
            # For Falcon:
            "n_head_kv",
            "num_kv_heads",
            # For LLaMA-2:
            "num_key_value_heads",
            # For ChatGLM:
            "multi_query_group_num",
        ]
        for attr in attributes:
            num_kv_heads = getattr(self.hf_text_config, attr, None)
            if num_kv_heads is not None:
                return num_kv_heads

        # For non-grouped-query attention models, the number of KV heads is
        # equal to the number of attention heads.
        return self.hf_text_config.num_attention_heads

    def get_num_kv_heads(self, parallel_config: ParallelConfig) -> int:
        """Returns the number of KV heads per GPU."""
        if self.use_mla:
            # When using MLA during decode it becomes MQA
            return 1

        total_num_kv_heads = self.get_total_num_kv_heads()
        # If tensor parallelism is used, we divide the number of KV heads by
        # the tensor parallel size. We will replicate the KV heads in the
        # case where the number of KV heads is smaller than the tensor
        # parallel size so each GPU has at least one KV head.
        return max(1, total_num_kv_heads // parallel_config.tensor_parallel_size)

    def get_num_attention_heads(self, parallel_config: ParallelConfig) -> int:
        num_heads = getattr(self.hf_text_config, "num_attention_heads", 0)
        return num_heads // parallel_config.tensor_parallel_size

    def get_num_experts(self) -> int:
        """Returns the number of experts in the model."""
        num_expert_names = [
            "num_experts",  # Jamba
            "moe_num_experts",  # Dbrx
            "n_routed_experts",  # DeepSeek
            "num_local_experts",  # Mixtral
        ]
        num_experts = getattr_iter(self.hf_text_config, num_expert_names, 0)
        if isinstance(num_experts, list):
            # Ernie VL's remote code uses list[int]...
            # The values are always the same so we just take the first one.
            return num_experts[0]
        return num_experts

    def get_layers_start_end_indices(
        self, parallel_config: ParallelConfig
    ) -> tuple[int, int]:
        from vllm.distributed.utils import get_pp_indices

        if (
            self.hf_text_config.model_type == "deepseek_mtp"
            or self.hf_config.model_type == "mimo_mtp"
            or self.hf_config.model_type == "glm4_moe_mtp"
            or self.hf_config.model_type == "ernie_mtp"
            or self.hf_config.model_type == "qwen3_next_mtp"
            or self.hf_config.model_type == "pangu_ultra_moe_mtp"
        ):
            total_num_hidden_layers = getattr(
                self.hf_text_config, "num_nextn_predict_layers", 0
            )
        elif self.hf_config.model_type == "longcat_flash_mtp":
            total_num_hidden_layers = getattr(
                self.hf_text_config, "num_nextn_predict_layers", 1
            )
        else:
            total_num_hidden_layers = getattr(
                self.hf_text_config, "num_hidden_layers", 0
            )
        # the layout order is: DP x PP x TP
        pp_rank = (
            parallel_config.rank // parallel_config.tensor_parallel_size
        ) % parallel_config.pipeline_parallel_size
        pp_size = parallel_config.pipeline_parallel_size
        start, end = get_pp_indices(total_num_hidden_layers, pp_rank, pp_size)
        return start, end

    def get_num_layers(self, parallel_config: ParallelConfig) -> int:
        start, end = self.get_layers_start_end_indices(parallel_config)
        return end - start

    def get_num_layers_by_block_type(
        self,
        parallel_config: ParallelConfig,
        block_type: LayerBlockType = "attention",
    ) -> int:
        # This function relies on 'layers_block_type' in hf_config,
        # for w/o this attribute, we will need to have workarounds like so
        attn_block_type = block_type == "attention"
        is_transformer = (
            not self.is_hybrid and not self.has_noops and not self.is_attention_free
        )
        start, end = self.get_layers_start_end_indices(parallel_config)

        if is_transformer:
            # Handle the basic case first
            return end - start if attn_block_type else 0
        elif self.is_attention_free:
            # Attention free
            # Note that this code assumes there
            # is only one type of attention-free block type.
            return 0 if attn_block_type else end - start
        elif self.has_noops:
            block_configs = self.hf_config.block_configs
            return sum(not bc.attention.no_op for bc in block_configs[start:end])
        else:
            # Hybrid model Jamba
            layers_block_type_value = getattr(
                self.hf_text_config, "layers_block_type", None
            )
            if layers_block_type_value is not None:
                if hasattr(self.hf_text_config, "model_type") and (
                    self.hf_text_config.model_type == "zamba2"
                ):
                    if attn_block_type:
                        return sum(
                            t == "hybrid" for t in layers_block_type_value[start:end]
                        )
                    else:
                        return self.get_num_layers(parallel_config)
                return sum(t == block_type for t in layers_block_type_value[start:end])

            # Hybrid model Minimax
            attn_type_list = getattr(self.hf_config, "attn_type_list", None)
            if attn_type_list:
                return sum(t == 1 for t in attn_type_list[start:end])

            # Hybrid model Qwen3Next
            layer_types_value = getattr(self.hf_config, "layer_types", None)
            if layer_types_value is not None:
                if block_type == "attention":
                    return sum(
                        t == "full_attention" for t in layer_types_value[start:end]
                    )
                elif block_type == "linear_attention":
                    return sum(
                        t == "linear_attention" for t in layer_types_value[start:end]
                    )
                else:
                    return sum(t == block_type for t in layer_types_value[start:end])

            if (
                layers_block_type_value is None
                and attn_type_list is None
                and layer_types_value is None
            ):
                raise ValueError(
                    "The model is an hybrid without a layers_block_type or an "
                    "attn_type_list, or a layer_types in the hf_config, "
                    f"cannot determine the num of {block_type} layers"
                )

    def get_mamba_chunk_size(self) -> int | None:
        """
        Returns the mamba chunk size if it exists
        """
        # used by e.g. Bamba, FalconH1, Granite, PLaMo2
        chunk_size = getattr(self.hf_text_config, "mamba_chunk_size", None)
        if chunk_size is None:
            # used by e.g. Mamba2, NemotronH, Zamba
            chunk_size = getattr(self.hf_text_config, "chunk_size", None)

        # Since Mamba1 does not have a chunk notion
        # we use a default chunk size of 1024.
        if chunk_size is None:
            chunk_size = 2048

        return chunk_size

    def get_multimodal_config(self) -> MultiModalConfig:
        """
        Get the multimodal configuration of the model.

        Raises:
            ValueError: If the model is not multimodal.
        """
        if self.multimodal_config is None:
            raise ValueError("The model is not multimodal.")

        return self.multimodal_config

    def try_get_generation_config(self) -> dict[str, Any]:
        """
        This method attempts to retrieve the non-default values of the
        generation config for this model.

        The generation config can contain information about special tokens, as
        well as sampling parameters. Which is why this method exists separately
        to `get_diff_sampling_param`.

        Returns:
            A dictionary containing the non-default generation config.
        """
        if self.generation_config in {"auto", "vllm"}:
            config = try_get_generation_config(
                self.hf_config_path or self.model,
                trust_remote_code=self.trust_remote_code,
                revision=self.revision,
                config_format=self.config_format,
            )
        else:
            config = try_get_generation_config(
                self.generation_config,
                trust_remote_code=self.trust_remote_code,
                config_format=self.config_format,
            )

        if config is None:
            return {}

        return config.to_diff_dict()

    def get_diff_sampling_param(self) -> dict[str, Any]:
        """
        This method returns a dictionary containing the non-default sampling
        parameters with `override_generation_config` applied.

        The default sampling parameters are:

        - vLLM's neutral defaults if `self.generation_config="vllm"`
        - the model's defaults if `self.generation_config="auto"`
        - as defined in `generation_config.json` if
            `self.generation_config="path/to/generation_config/dir"`

        Returns:
            A dictionary containing the non-default sampling parameters.
        """
        if self.generation_config == "vllm":
            config = {}
        else:
            config = self.try_get_generation_config()

        # Overriding with given generation config
        config.update(self.override_generation_config)

        available_params = [
            "repetition_penalty",
            "temperature",
            "top_k",
            "top_p",
            "min_p",
            "max_new_tokens",
        ]
        if any(p in config for p in available_params):
            diff_sampling_param = {
                p: config.get(p) for p in available_params if config.get(p) is not None
            }
            # Huggingface definition of max_new_tokens is equivalent
            # to vLLM's max_tokens
            if "max_new_tokens" in diff_sampling_param:
                diff_sampling_param["max_tokens"] = diff_sampling_param.pop(
                    "max_new_tokens"
                )
        else:
            diff_sampling_param = {}

        if diff_sampling_param:
            logger.warning_once(
                "Default sampling parameters have been overridden by the "
                "model's Hugging Face generation config recommended from the "
                "model creator. If this is not intended, please relaunch "
                "vLLM instance with `--generation-config vllm`."
            )
        return diff_sampling_param

    @property
    def is_encoder_decoder(self) -> bool:
        """Extract the HF encoder/decoder model flag."""
        return is_encoder_decoder(self.hf_config)

    @property
    def uses_alibi(self) -> bool:
        cfg = self.hf_text_config

        return (
            getattr(cfg, "alibi", False)  # Falcon
            or "BloomForCausalLM" in self.architectures  # Bloom
            or getattr(cfg, "position_encoding_type", "") == "alibi"  # codellm_1b_alibi
            or (
                hasattr(cfg, "attn_config")  # MPT
                and (
                    (
                        isinstance(cfg.attn_config, dict)
                        and cfg.attn_config.get("alibi", False)
                    )
                    or (
                        not isinstance(cfg.attn_config, dict)
                        and getattr(cfg.attn_config, "alibi", False)
                    )
                )
            )
        )

    @property
    def uses_mrope(self) -> bool:
        return uses_mrope(self.hf_config)

    @property
    def is_multimodal_model(self) -> bool:
        return self.multimodal_config is not None

    @property
    def is_multimodal_raw_input_only_model(self) -> bool:
        return self._model_info.supports_multimodal_raw_input_only

    @property
    def is_cross_encoder(self) -> bool:
        return (
            self._model_info.supports_cross_encoding or self.convert_type == "classify"
        )

    @property
    def is_pp_supported(self) -> bool:
        return self._model_info.supports_pp

    @property
    def is_attention_free(self) -> bool:
        return self._model_info.is_attention_free

    @property
    def is_hybrid(self) -> bool:
        return self._model_info.is_hybrid

    @property
    def has_noops(self) -> bool:
        return self._model_info.has_noops

    @property
    def has_inner_state(self):
        return self._model_info.has_inner_state

    @property
    def supports_mamba_prefix_caching(self) -> bool:
        return self._model_info.supports_mamba_prefix_caching

    @property
    def use_mla(self) -> bool:
        return self.is_deepseek_mla and not envs.VLLM_MLA_DISABLE

    @property
    def is_matryoshka(self) -> bool:
        return bool(getattr(self.hf_config, "matryoshka_dimensions", None)) or getattr(
            self.hf_config, "is_matryoshka", False
        )

    @property
    def matryoshka_dimensions(self):
        return getattr(self.hf_config, "matryoshka_dimensions", None)

    @property
    def use_pad_token(self) -> bool:
        # cross_encoder models defaults to using pad_token.
        # `llm as reranker` models defaults to not using pad_token.
        return getattr(self.hf_config, "use_pad_token", True)

    @property
    def head_dtype(self) -> torch.dtype:
        """
        "head" refers to the last Linear layer(s) of an LLM,
        such as the lm_head in a generation model,
        or the score or classifier in a classification model.

        `head_dtype` currently only supports pooling models.\n
        - The pooling model defaults to using fp32 head,
        you can use --hf-overrides '{"head_dtype": "model"}' to disable it.
        """

        head_dtype = _get_head_dtype(
            config=self.hf_config, dtype=self.dtype, runner_type=self.runner_type
        )

        if self.runner_type != "pooling" and head_dtype != self.dtype:
            logger.warning_once(
                "`head_dtype` currently only supports pooling models."
                "fallback to model dtype [%s].",
                self.dtype,
            )
            return self.dtype

        if head_dtype not in current_platform.supported_dtypes:
            logger.warning_once(
                "The current platform does not support [%s] head dtype, "
                "fallback to model dtype [%s].",
                head_dtype,
                self.dtype,
            )
            return self.dtype

        logger.debug_once("head dtype: %s", head_dtype)
        return head_dtype

    @property
    def hidden_size(self):
        if hasattr(self.hf_config, "hidden_size"):
            return self.hf_config.hidden_size
        text_config = self.hf_config.get_text_config()
        return text_config.hidden_size

    @property
    def embedding_size(self):
        dense_modules = try_get_dense_modules(self.model, revision=self.revision)
        if dense_modules is not None:
            return dense_modules[-1]["out_features"]
        return self.hidden_size

    def get_and_verify_max_len(self, max_model_len: int):
        # Consider max_model_len in tokenizer_config only when
        # pooling models use absolute position_embedding.
        tokenizer_config = None
        if (
            self.runner_type == "pooling"
            and getattr(self.hf_config, "position_embedding_type", "") == "absolute"
        ):
            tokenizer_config = try_get_tokenizer_config(
                self.tokenizer,
                trust_remote_code=self.trust_remote_code,
                revision=self.tokenizer_revision,
            )
        max_model_len = _get_and_verify_max_len(
            hf_config=self.hf_text_config,
            tokenizer_config=tokenizer_config,
            max_model_len=max_model_len,
            disable_sliding_window=self.disable_sliding_window,
            sliding_window=self.get_sliding_window(),
            spec_target_max_model_len=self.spec_target_max_model_len,
            encoder_config=self.encoder_config,
        )
        logger.info("Using max model len %s", max_model_len)
        return max_model_len

allowed_local_media_path class-attribute instance-attribute

allowed_local_media_path: str = ''

Allowing API requests to read local images or videos from directories specified by the server file system. This is a security risk. Should only be enabled in trusted environments.

allowed_media_domains class-attribute instance-attribute

allowed_media_domains: list[str] | None = None

If set, only media URLs that belong to this domain can be used for multi-modal inputs.

architecture property

architecture: str

The architecture vllm actually used.

architectures property

architectures: list[str]

code_revision class-attribute instance-attribute

code_revision: str | None = None

The specific revision to use for the model code on the Hugging Face Hub. It can be a branch name, a tag name, or a commit id. If unspecified, will use the default version.

config_format class-attribute instance-attribute

config_format: str | ConfigFormat = 'auto'

The format of the model config to load:

  • "auto" will try to load the config in hf format if available else it will try to load in mistral format.

  • "hf" will load the config in hf format.

  • "mistral" will load the config in mistral format.

convert class-attribute instance-attribute

convert: ConvertOption = 'auto'

Convert the model using adapters defined in vllm.model_executor.models.adapters. The most common use case is to adapt a text generation model to be used for pooling tasks.

disable_cascade_attn class-attribute instance-attribute

disable_cascade_attn: bool = False

Disable cascade attention for V1. While cascade attention does not change the mathematical correctness, disabling it could be useful for preventing potential numerical issues. Note that even if this is set to False, cascade attention will be only used when the heuristic tells that it's beneficial.

disable_sliding_window class-attribute instance-attribute

disable_sliding_window: bool = False

Whether to disable sliding window. If True, we will disable the sliding window functionality of the model, capping to sliding window size. If the model does not support sliding window, this argument is ignored.

dtype class-attribute instance-attribute

dtype: ModelDType | dtype = 'auto'

Data type for model weights and activations:

  • "auto" will use FP16 precision for FP32 and FP16 models, and BF16 precision for BF16 models.

  • "half" for FP16. Recommended for AWQ quantization.

  • "float16" is the same as "half".

  • "bfloat16" for a balance between precision and range.

  • "float" is shorthand for FP32 precision.

  • "float32" for FP32 precision.

embedding_size property

embedding_size

enable_prompt_embeds class-attribute instance-attribute

enable_prompt_embeds: bool = False

If True, enables passing text embeddings as inputs via the prompt_embeds key.

WARNING: The vLLM engine may crash if incorrect shape of embeddings is passed. Only enable this flag for trusted users!

enable_sleep_mode class-attribute instance-attribute

enable_sleep_mode: bool = False

Enable sleep mode for the engine (only cuda platform is supported).

enforce_eager class-attribute instance-attribute

enforce_eager: bool = False

Whether to always use eager-mode PyTorch. If True, we will disable CUDA graph and always execute the model in eager mode. If False, we will use CUDA graph and eager execution in hybrid for maximal performance and flexibility.

generation_config class-attribute instance-attribute

generation_config: str = 'auto'

The folder path to the generation config. Defaults to "auto", the generation config will be loaded from model path. If set to "vllm", no generation config is loaded, vLLM defaults will be used. If set to a folder path, the generation config will be loaded from the specified folder path. If max_new_tokens is specified in generation config, then it sets a server-wide limit on the number of output tokens for all requests.

has_inner_state property

has_inner_state

has_noops property

has_noops: bool

head_dtype property

head_dtype: dtype

"head" refers to the last Linear layer(s) of an LLM, such as the lm_head in a generation model, or the score or classifier in a classification model.

head_dtype currently only supports pooling models.

  • The pooling model defaults to using fp32 head, you can use --hf-overrides '{"head_dtype": "model"}' to disable it.

hf_config class-attribute instance-attribute

hf_config: PretrainedConfig = field(init=False)

The Hugging Face config of the model.

hf_config_path class-attribute instance-attribute

hf_config_path: str | None = None

Name or path of the Hugging Face config to use. If unspecified, model name or path will be used.

hf_overrides class-attribute instance-attribute

hf_overrides: HfOverrides = field(default_factory=dict)

If a dictionary, contains arguments to be forwarded to the Hugging Face config. If a callable, it is called to update the HuggingFace config.

hf_text_config class-attribute instance-attribute

hf_text_config: PretrainedConfig = field(init=False)

The Hugging Face config of the text model (same as hf_config for text models).

hf_token class-attribute instance-attribute

hf_token: bool | str | None = None

The token to use as HTTP bearer authorization for remote files . If True, will use the token generated when running huggingface-cli login (stored in ~/.huggingface).

hidden_size property

hidden_size

io_processor_plugin class-attribute instance-attribute

io_processor_plugin: str | None = None

IOProcessor plugin name to load at model startup

is_attention_free property

is_attention_free: bool

is_cross_encoder property

is_cross_encoder: bool

is_deepseek_mla property

is_deepseek_mla: bool

is_encoder_decoder property

is_encoder_decoder: bool

Extract the HF encoder/decoder model flag.

is_hybrid property

is_hybrid: bool

is_matryoshka property

is_matryoshka: bool

is_multimodal_model property

is_multimodal_model: bool

is_multimodal_raw_input_only_model property

is_multimodal_raw_input_only_model: bool

is_pp_supported property

is_pp_supported: bool

logits_processor_pattern class-attribute instance-attribute

logits_processor_pattern: str | None = None

Optional regex pattern specifying valid logits processor qualified names that can be passed with the logits_processors extra completion argument. Defaults to None, which allows no processors.

logits_processors class-attribute instance-attribute

logits_processors: (
    list[str | type[LogitsProcessor]] | None
) = None

One or more logits processors' fully-qualified class names or class definitions

logprobs_mode class-attribute instance-attribute

logprobs_mode: LogprobsMode = 'raw_logprobs'

Indicates the content returned in the logprobs and prompt_logprobs. Supported mode: 1) raw_logprobs, 2) processed_logprobs, 3) raw_logits, 4) processed_logits. Raw means the values before applying any logit processors, like bad words. Processed means the values after applying all processors, including temperature and top_k/top_p.

matryoshka_dimensions property

matryoshka_dimensions

max_logprobs class-attribute instance-attribute

max_logprobs: int = 20

Maximum number of log probabilities to return when logprobs is specified in SamplingParams. The default value comes the default for the OpenAI Chat Completions API. -1 means no cap, i.e. all (output_length * vocab_size) logprobs are allowed to be returned and it may cause OOM.

max_model_len class-attribute instance-attribute

max_model_len: SkipValidation[int] = None

Model context length (prompt and output). If unspecified, will be automatically derived from the model config.

When passing via --max-model-len, supports k/m/g/K/M/G in human-readable format. Examples:

  • 1k -> 1000

  • 1K -> 1024

  • 25.6k -> 25,600

model class-attribute instance-attribute

model: str = 'Qwen/Qwen3-0.6B'

Name or path of the Hugging Face model to use. It is also used as the content for model_name tag in metrics output when served_model_name is not specified.

model_impl class-attribute instance-attribute

model_impl: str | ModelImpl = 'auto'

Which implementation of the model to use:

  • "auto" will try to use the vLLM implementation, if it exists, and fall back to the Transformers implementation if no vLLM implementation is available.

  • "vllm" will use the vLLM model implementation.

  • "transformers" will use the Transformers model implementation.

  • "terratorch" will use the TerraTorch model implementation.

multimodal_config class-attribute instance-attribute

multimodal_config: MultiModalConfig | None = None

Configuration for multimodal model. If None, this will be inferred from the architecture of self.model.

override_attention_dtype class-attribute instance-attribute

override_attention_dtype: str | None = None

Override dtype for attention

override_generation_config class-attribute instance-attribute

override_generation_config: dict[str, Any] = field(
    default_factory=dict
)

Overrides or sets generation config. e.g. {"temperature": 0.5}. If used with --generation-config auto, the override parameters will be merged with the default config from the model. If used with --generation-config vllm, only the override parameters are used.

override_pooler_config class-attribute instance-attribute

override_pooler_config: dict | PoolerConfig | None = None

[DEPRECATED] Use pooler_config instead. This field will be removed in v0.12.0 or v1.0.0, whichever is sooner.

pooler_config class-attribute instance-attribute

pooler_config: PoolerConfig | None = None

Pooler config which controls the behaviour of output pooling in pooling models.

quantization class-attribute instance-attribute

quantization: SkipValidation[QuantizationMethods | None] = (
    None
)

Method used to quantize the weights. If None, we first check the quantization_config attribute in the model config file. If that is None, we assume the model weights are not quantized and use dtype to determine the data type of the weights.

registry property

registry

revision class-attribute instance-attribute

revision: str | None = None

The specific model version to use. It can be a branch name, a tag name, or a commit id. If unspecified, will use the default version.

runner class-attribute instance-attribute

runner: RunnerOption = 'auto'

The type of model runner to use. Each vLLM instance only supports one model runner, even if the same model can be used for multiple types.

seed class-attribute instance-attribute

seed: int | None = None

Random seed for reproducibility. Initialized to None in V0, but initialized to 0 in V1.

served_model_name class-attribute instance-attribute

served_model_name: str | list[str] | None = None

The model name(s) used in the API. If multiple names are provided, the server will respond to any of the provided names. The model name in the model field of a response will be the first name in this list. If not specified, the model name will be the same as the --model argument. Noted that this name(s) will also be used in model_name tag content of prometheus metrics, if multiple names provided, metrics tag will take the first one.

skip_tokenizer_init class-attribute instance-attribute

skip_tokenizer_init: bool = False

Skip initialization of tokenizer and detokenizer. Expects valid prompt_token_ids and None for prompt from the input. The generated output will contain token ids.

spec_target_max_model_len class-attribute instance-attribute

spec_target_max_model_len: int | None = None

Specify the maximum length for spec decoding draft models.

supports_mamba_prefix_caching property

supports_mamba_prefix_caching: bool

task class-attribute instance-attribute

task: TaskOption | None = None

[DEPRECATED] The task to use the model for. If the model supports more than one model runner, this is used to select which model runner to run.

Note that the model may support other tasks using the same model runner.

tokenizer class-attribute instance-attribute

tokenizer: SkipValidation[str] = None

Name or path of the Hugging Face tokenizer to use. If unspecified, model name or path will be used.

tokenizer_mode class-attribute instance-attribute

tokenizer_mode: TokenizerMode = 'auto'

Tokenizer mode:

  • "auto" will use the fast tokenizer if available.

  • "slow" will always use the slow tokenizer.

  • "mistral" will always use the tokenizer from mistral_common.

  • "custom" will use --tokenizer to select the preregistered tokenizer.

tokenizer_revision class-attribute instance-attribute

tokenizer_revision: str | None = None

The specific revision to use for the tokenizer on the Hugging Face Hub. It can be a branch name, a tag name, or a commit id. If unspecified, will use the default version.

trust_remote_code class-attribute instance-attribute

trust_remote_code: bool = False

Trust remote code (e.g., from HuggingFace) when downloading the model and tokenizer.

use_mla property

use_mla: bool

use_pad_token property

use_pad_token: bool

uses_alibi property

uses_alibi: bool

uses_mrope property

uses_mrope: bool

__post_init__

__post_init__(
    limit_mm_per_prompt: dict[str, int] | None,
    enable_mm_embeds: bool | None,
    media_io_kwargs: dict[str, dict[str, Any]] | None,
    mm_processor_kwargs: dict[str, Any] | None,
    mm_processor_cache_gb: float | None,
    mm_processor_cache_type: MMCacheType | None,
    mm_shm_cache_max_object_size_mb: int | None,
    mm_encoder_tp_mode: MMEncoderTPMode | None,
    mm_encoder_attn_backend: _Backend | str | None,
    interleave_mm_strings: bool | None,
    skip_mm_profiling: bool | None,
    video_pruning_rate: float | None,
) -> None
Source code in vllm/config/model.py
def __post_init__(
    self,
    # Multimodal config init vars
    limit_mm_per_prompt: dict[str, int] | None,
    enable_mm_embeds: bool | None,
    media_io_kwargs: dict[str, dict[str, Any]] | None,
    mm_processor_kwargs: dict[str, Any] | None,
    mm_processor_cache_gb: float | None,
    mm_processor_cache_type: MMCacheType | None,
    mm_shm_cache_max_object_size_mb: int | None,
    mm_encoder_tp_mode: MMEncoderTPMode | None,
    mm_encoder_attn_backend: _Backend | str | None,
    interleave_mm_strings: bool | None,
    skip_mm_profiling: bool | None,
    video_pruning_rate: float | None,
) -> None:
    # Set the default seed to 0 in V1.
    # NOTE(woosuk): In V1, we use separate processes for workers (unless
    # VLLM_ENABLE_V1_MULTIPROCESSING=0), so setting a seed here
    # doesn't affect the user process. However, without a consistent seed,
    # different tensor parallel workers would sample different tokens,
    # leading to inconsistent results.
    if self.seed is None:
        self.seed = 0
        if not envs.VLLM_ENABLE_V1_MULTIPROCESSING:
            logger.warning(
                "The global random seed is set to %d. Since "
                "VLLM_ENABLE_V1_MULTIPROCESSING is set to False, this may "
                "affect the random state of the Python process that "
                "launched vLLM.",
                self.seed,
            )

    # Keep set served_model_name before maybe_model_redirect(self.model)
    self.served_model_name = get_served_model_name(
        self.model, self.served_model_name
    )
    self.model = maybe_model_redirect(self.model)
    # The tokenizer is consistent with the model by default.
    if self.tokenizer is None:
        self.tokenizer = self.model
    if self.tokenizer_revision is None:
        self.tokenizer_revision = self.revision
    self.tokenizer = maybe_model_redirect(self.tokenizer)

    if isinstance(self.hf_config_path, str):
        self.hf_config_path = maybe_model_redirect(self.hf_config_path)

    if callable(self.hf_overrides):
        hf_overrides_kw = {}
        hf_overrides_fn = self.hf_overrides
        dict_overrides: dict[str, Any] = {}
    else:
        # Separate dict overrides from flat ones
        # We'll determine how to apply dict overrides after loading the config
        hf_overrides_kw = {}
        dict_overrides = {}
        for key, value in self.hf_overrides.items():
            if isinstance(value, dict):
                dict_overrides[key] = value
            else:
                hf_overrides_kw[key] = value
        hf_overrides_fn = None

    self.maybe_pull_model_tokenizer_for_runai(self.model, self.tokenizer)

    if (
        (backend := envs.VLLM_ATTENTION_BACKEND)
        and backend == "FLASHINFER"
        and find_spec("flashinfer") is None
    ):
        raise ValueError(
            "VLLM_ATTENTION_BACKEND is set to FLASHINFER, but flashinfer "
            "module was not found. See "
            "https://github.com/vllm-project/vllm/blob/main/docker/Dockerfile "  # noqa: E501
            "for instructions on how to install it."
        )

    from vllm.platforms import current_platform

    if self.override_attention_dtype is not None and not current_platform.is_rocm():
        warnings.warn(
            "override-attention-dtype is set but not using ROCm platform",
            stacklevel=2,
        )

    if self.enable_sleep_mode and not current_platform.is_sleep_mode_available():
        raise ValueError("Sleep mode is not supported on current platform.")

    hf_config = get_config(
        self.hf_config_path or self.model,
        self.trust_remote_code,
        self.revision,
        self.code_revision,
        self.config_format,
        hf_overrides_kw=hf_overrides_kw,
        hf_overrides_fn=hf_overrides_fn,
    )

    self.hf_config = hf_config
    if dict_overrides:
        self._apply_dict_overrides(hf_config, dict_overrides)
    self.hf_text_config = get_hf_text_config(self.hf_config)
    self.attention_chunk_size = getattr(
        self.hf_text_config, "attention_chunk_size", None
    )
    self.encoder_config = self._get_encoder_config()
    self.hf_image_processor_config = get_hf_image_processor_config(
        self.model, hf_token=self.hf_token, revision=self.revision
    )

    architectures = self.architectures
    registry = self.registry
    is_generative_model = registry.is_text_generation_model(architectures, self)
    is_pooling_model = registry.is_pooling_model(architectures, self)

    def _task_to_convert(task: TaskOption) -> ConvertType:
        if task == "embedding" or task == "embed":
            return "embed"
        if task == "classify":
            return "classify"
        if task == "reward":
            return "reward"
        if task == "score":
            new_task = self._get_default_pooling_task(architectures)
            return "classify" if new_task == "classify" else "embed"

        return "none"

    if self.task is not None:
        runner: RunnerOption = "auto"
        convert: ConvertOption = "auto"
        msg_prefix = (
            "The 'task' option has been deprecated and will be "
            "removed in v0.13.0 or v1.0, whichever comes first."
        )
        msg_hint = "Please remove this option."

        is_generative_task = self.task in _RUNNER_TASKS["generate"]
        is_pooling_task = self.task in _RUNNER_TASKS["pooling"]

        if is_generative_model and is_pooling_model:
            if is_generative_task:
                runner = "generate"
                convert = "auto"
                msg_hint = (
                    "Please replace this option with `--runner "
                    "generate` to continue using this model "
                    "as a generative model."
                )
            elif is_pooling_task:
                runner = "pooling"
                convert = "auto"
                msg_hint = (
                    "Please replace this option with `--runner "
                    "pooling` to continue using this model "
                    "as a pooling model."
                )
            else:  # task == "auto"
                pass
        elif is_generative_model or is_pooling_model:
            if is_generative_task:
                runner = "generate"
                convert = "auto"
                msg_hint = "Please remove this option"
            elif is_pooling_task:
                runner = "pooling"
                convert = _task_to_convert(self.task)
                msg_hint = (
                    "Please replace this option with `--convert "
                    f"{convert}` to continue using this model "
                    "as a pooling model."
                )
            else:  # task == "auto"
                pass
        else:
            debug_info = {
                "architectures": architectures,
                "is_generative_model": is_generative_model,
                "is_pooling_model": is_pooling_model,
            }
            raise AssertionError(
                "The model should be a generative or "
                "pooling model when task is set to "
                f"{self.task!r}. Found: {debug_info}"
            )

        self.runner = runner
        self.convert = convert

        msg = f"{msg_prefix} {msg_hint}"
        warnings.warn(msg, DeprecationWarning, stacklevel=2)

    self.runner_type = self._get_runner_type(architectures, self.runner)
    self.convert_type = self._get_convert_type(
        architectures, self.runner_type, self.convert
    )

    if self.runner_type == "generate" and not is_generative_model:
        generate_converts = _RUNNER_CONVERTS["generate"]
        if self.convert_type not in generate_converts:
            # Currently we don't have any converters for generative models
            raise ValueError("This model does not support `--runner generate`.")
    if self.runner_type == "pooling" and not is_pooling_model:
        pooling_converts = _RUNNER_CONVERTS["pooling"]
        if self.convert_type not in pooling_converts:
            convert_option = "<" + "|".join(pooling_converts) + ">"
            raise ValueError(
                "This model does not support `--runner pooling`. "
                f"You can pass `--convert {convert_option} to adapt "
                "it into a pooling model."
            )

    # Note: Initialize these attributes early because transformers fallback
    # may fail to load dynamic modules in child processes
    model_info, arch = registry.inspect_model_cls(architectures, self)
    self._model_info = model_info
    self._architecture = arch
    logger.info("Resolved architecture: %s", arch)

    # Init pooler config if needed
    if self.runner_type == "pooling":
        if self.override_pooler_config is not None:
            logger.warning_once(
                "`override_pooler_config` is deprecated and will be "
                "removed in v0.12.0 or v1.0.0, whichever is sooner. "
                "Please use `pooler_config` instead."
            )

            if isinstance(self.override_pooler_config, dict):
                self.pooler_config = PoolerConfig(**self.override_pooler_config)
            else:
                self.pooler_config = self.override_pooler_config

        if self.pooler_config is None:
            self.pooler_config = PoolerConfig()

        base_config = get_pooling_config(self.model, self.revision)
        if base_config is not None:
            # Only set values that are not overridden by the user
            for k, v in base_config.items():
                if getattr(self.pooler_config, k) is None:
                    setattr(self.pooler_config, k, v)

        default_pooling_type = self._model_info.default_pooling_type
        if self.pooler_config.pooling_type is None:
            self.pooler_config.pooling_type = default_pooling_type

    self.dtype: torch.dtype = _get_and_verify_dtype(
        self.model,
        self.hf_config,
        self.dtype,
        is_pooling_model=self.runner_type == "pooling",
        revision=self.revision,
    )

    self.original_max_model_len = self.max_model_len
    self.max_model_len = self.get_and_verify_max_len(self.max_model_len)
    # Init multimodal config if needed
    if self._model_info.supports_multimodal:
        if (
            mm_encoder_tp_mode == "data"
            and not self._model_info.supports_multimodal_encoder_tp_data
        ):
            logger.warning_once(
                "This model does not support `--mm-encoder-tp-mode data`. "
                "Falling back to `--mm-encoder-tp-mode weights`."
            )
            mm_encoder_tp_mode = "weights"

        mm_config_kwargs = dict(
            limit_per_prompt=limit_mm_per_prompt,
            enable_mm_embeds=enable_mm_embeds,
            media_io_kwargs=media_io_kwargs,
            mm_processor_kwargs=mm_processor_kwargs,
            mm_processor_cache_gb=mm_processor_cache_gb,
            mm_processor_cache_type=mm_processor_cache_type,
            mm_shm_cache_max_object_size_mb=mm_shm_cache_max_object_size_mb,
            mm_encoder_tp_mode=mm_encoder_tp_mode,
            mm_encoder_attn_backend=mm_encoder_attn_backend,
            interleave_mm_strings=interleave_mm_strings,
            skip_mm_profiling=skip_mm_profiling,
            video_pruning_rate=video_pruning_rate,
        )

        mm_config_kwargs = {
            k: v for k, v in mm_config_kwargs.items() if v is not None
        }

        self.multimodal_config = MultiModalConfig(**mm_config_kwargs)

    if self.disable_sliding_window:
        # Set after get_and_verify_max_len to ensure that max_model_len
        # can be correctly capped to sliding window size
        self.hf_text_config.sliding_window = None

    if not self.skip_tokenizer_init:
        self._verify_tokenizer_mode()

    # Avoid running try_verify_and_update_config multiple times
    self.config_updated = False

    self._verify_quantization()
    self._verify_cuda_graph()
    self._verify_bnb_config()

_apply_dict_overrides

_apply_dict_overrides(
    config: PretrainedConfig, overrides: dict[str, Any]
) -> None

Apply dict overrides, handling both nested configs and dict values.

Source code in vllm/config/model.py
def _apply_dict_overrides(
    self,
    config: PretrainedConfig,
    overrides: dict[str, Any],
) -> None:
    """Apply dict overrides, handling both nested configs and dict values."""
    from transformers import PretrainedConfig

    for key, value in overrides.items():
        attr = getattr(config, key, None)
        if attr is not None and isinstance(attr, PretrainedConfig):
            # It's a nested config - recursively update it
            self._update_nested(attr, value)
        else:
            # It's a dict-valued parameter - set it directly
            setattr(config, key, value)

_get_convert_type

_get_convert_type(
    architectures: list[str],
    runner_type: RunnerType,
    convert: ConvertOption,
) -> ConvertType
Source code in vllm/config/model.py
def _get_convert_type(
    self,
    architectures: list[str],
    runner_type: RunnerType,
    convert: ConvertOption,
) -> ConvertType:
    if convert != "auto":
        return convert

    convert_type = self._get_default_convert_type(architectures, runner_type)

    # Don't log the most common case
    if convert_type != "none":
        logger.info(
            "Resolved `--convert auto` to `--convert %s`. "
            "Pass the value explicitly to silence this message.",
            convert_type,
        )

    return convert_type

_get_default_convert_type

_get_default_convert_type(
    architectures: list[str], runner_type: RunnerType
) -> ConvertType
Source code in vllm/config/model.py
def _get_default_convert_type(
    self,
    architectures: list[str],
    runner_type: RunnerType,
) -> ConvertType:
    registry = self.registry

    for arch in architectures:
        if arch in registry.get_supported_archs():
            if runner_type == "generate" and registry.is_text_generation_model(
                architectures, self
            ):
                return "none"
            if runner_type == "pooling" and registry.is_pooling_model(
                architectures, self
            ):
                return "none"

        match = try_match_architecture_defaults(arch, runner_type=runner_type)
        if match:
            _, (_, convert_type) = match
            return convert_type

    # This is to handle Sentence Transformers models that use *ForCausalLM
    # and also multi-modal pooling models which are not defined as
    # Sentence Transformers models
    if runner_type == "pooling":
        return "embed"

    return "none"

_get_default_pooling_task

_get_default_pooling_task(
    architectures: list[str],
) -> Literal["embed", "classify", "reward"]
Source code in vllm/config/model.py
def _get_default_pooling_task(
    self,
    architectures: list[str],
) -> Literal["embed", "classify", "reward"]:
    if self.registry.is_cross_encoder_model(architectures, self):
        return "classify"

    for arch in architectures:
        match = try_match_architecture_defaults(arch, runner_type="pooling")
        if match:
            _, (_, convert_type) = match
            assert convert_type != "none"
            return convert_type

    return "embed"

_get_default_runner_type

_get_default_runner_type(
    architectures: list[str],
) -> RunnerType
Source code in vllm/config/model.py
def _get_default_runner_type(
    self,
    architectures: list[str],
) -> RunnerType:
    registry = self.registry

    # Some Sentence Transformers models use *ForCausalLM archs
    if get_pooling_config(self.model, self.revision):
        return "pooling"

    for arch in architectures:
        if arch in registry.get_supported_archs():
            if registry.is_pooling_model(architectures, self):
                return "pooling"
            if registry.is_text_generation_model(architectures, self):
                return "generate"

        match = try_match_architecture_defaults(arch)
        if match:
            _, (runner_type, _) = match
            return runner_type

    return "generate"

_get_encoder_config

_get_encoder_config()
Source code in vllm/config/model.py
def _get_encoder_config(self):
    return get_sentence_transformer_tokenizer_config(self.model, self.revision)

_get_runner_type

_get_runner_type(
    architectures: list[str], runner: RunnerOption
) -> RunnerType
Source code in vllm/config/model.py
def _get_runner_type(
    self,
    architectures: list[str],
    runner: RunnerOption,
) -> RunnerType:
    if runner != "auto":
        return runner

    runner_type = self._get_default_runner_type(architectures)

    # Don't log the most common case
    if runner_type != "generate":
        logger.info(
            "Resolved `--runner auto` to `--runner %s`. "
            "Pass the value explicitly to silence this message.",
            runner_type,
        )

    return runner_type

_get_transformers_backend_cls

_get_transformers_backend_cls() -> str

Determine which Transformers backend class will be used if model_impl is set to transformers or auto.

Source code in vllm/config/model.py
def _get_transformers_backend_cls(self) -> str:
    """Determine which Transformers backend class will be used if
    `model_impl` is set to `transformers` or `auto`."""
    cls = "Transformers"
    # If 'hf_config != hf_text_config' it's a nested config, i.e. multimodal
    cls += "MultiModal" if self.hf_config != self.hf_text_config else ""
    cls += "MoE" if self.get_num_experts() > 1 else ""
    # Check if the architecture we're wrapping has defaults
    runner = None
    task = None
    if defaults := try_match_architecture_defaults(self.architectures[0]):
        _, (runner, task) = defaults
    # User specified value take precedence
    if self.runner != "auto":
        runner = self.runner
    # Only consider Transformers backend pooling classes if we're wrapping an
    # architecture that defaults to pooling. Otherwise, we return the LM class
    # and use adapters.
    if runner == "pooling" and task in {"embed", "classify"}:
        if task == "embed":
            cls += "EmbeddingModel"
        elif task == "classify":
            cls += "ForSequenceClassification"
    else:
        cls += "ForCausalLM"
    return cls

_parse_quant_hf_config

_parse_quant_hf_config(hf_config: PretrainedConfig)
Source code in vllm/config/model.py
def _parse_quant_hf_config(self, hf_config: PretrainedConfig):
    quant_cfg = getattr(hf_config, "quantization_config", None)
    if quant_cfg is None:
        # compressed-tensors uses a "compression_config" key
        quant_cfg = getattr(hf_config, "compression_config", None)

    else:
        # Set quant_method for ModelOpt models.
        producer_name = quant_cfg.get("producer", {}).get("name")
        if producer_name == "modelopt":
            quant_algo = quant_cfg.get("quantization", {}).get("quant_algo")
            if quant_algo == "FP8":
                quant_cfg["quant_method"] = "modelopt"
            elif quant_algo == "NVFP4":
                quant_cfg["quant_method"] = "modelopt_fp4"
            elif quant_algo is not None:
                raise ValueError(f"Unknown ModelOpt quant algo: {quant_algo}")

    return quant_cfg

_update_nested

_update_nested(
    target: PretrainedConfig | dict[str, Any],
    updates: dict[str, Any],
) -> None

Recursively updates a config or dict with nested updates.

Source code in vllm/config/model.py
def _update_nested(
    self,
    target: PretrainedConfig | dict[str, Any],
    updates: dict[str, Any],
) -> None:
    """Recursively updates a config or dict with nested updates."""
    for key, value in updates.items():
        if isinstance(value, dict):
            # Get the nested target
            if isinstance(target, dict):
                nested_target = target.get(key)
            else:
                nested_target = getattr(target, key, None)

            # If nested target exists and can be updated recursively
            if nested_target is not None and (
                isinstance(nested_target, dict)
                or hasattr(nested_target, "__dict__")
            ):
                self._update_nested(nested_target, value)
                continue

        # Set the value (base case)
        if isinstance(target, dict):
            target[key] = value
        else:
            setattr(target, key, value)

_verify_bnb_config

_verify_bnb_config() -> None

The current version of bitsandbytes (0.46.1) with 8-bit models does not yet support CUDA graph.

TODO Remove this when bitsandbytes supports.

Source code in vllm/config/model.py
def _verify_bnb_config(self) -> None:
    """
    The current version of bitsandbytes (0.46.1) with 8-bit models does not
    yet support CUDA graph.
    # TODO Remove this when bitsandbytes supports.
    """
    is_bitsandbytes = self.quantization == "bitsandbytes"
    has_quantization_config = (
        getattr(self.hf_config, "quantization_config", None) is not None
    )
    is_8bit = (
        self.hf_config.quantization_config.get("load_in_8bit", False)
        if has_quantization_config
        else False
    )
    if all(
        [
            is_bitsandbytes,
            has_quantization_config,
            is_8bit,
            not self.enforce_eager,
        ]
    ):
        logger.warning(
            "CUDA graph is not supported on BitsAndBytes 8bit yet, "
            "fallback to the eager mode."
        )

        self.enforce_eager = True

_verify_cuda_graph

_verify_cuda_graph() -> None
Source code in vllm/config/model.py
def _verify_cuda_graph(self) -> None:
    # CUDAGraph capture not supported for encoder-decoder models on ROCm
    unsupported_rocm = self.is_encoder_decoder
    if unsupported_rocm and not self.enforce_eager and current_platform.is_rocm():
        logger.warning(
            "CUDA graph is not supported for %s on ROCm yet, fallback "
            "to eager mode.",
            self.hf_config.model_type,
        )
        self.enforce_eager = True

_verify_quantization

_verify_quantization() -> None
Source code in vllm/config/model.py
def _verify_quantization(self) -> None:
    supported_quantization = me_quant.QUANTIZATION_METHODS
    if self.quantization is not None:
        self.quantization = cast(me_quant.QuantizationMethods, self.quantization)

    # Parse quantization method from the HF model config, if available.
    quant_cfg = self._parse_quant_hf_config(self.hf_config)
    if quant_cfg is None and (
        text_config := getattr(self.hf_config, "text_config", None)
    ):
        # Check the text config as well for multi-modal models.
        quant_cfg = self._parse_quant_hf_config(text_config)

    if quant_cfg is not None:
        # Use the community standard 'quant_method'
        quant_method = quant_cfg.get("quant_method", "").lower()

        # Normalize library names
        quant_method = quant_method.replace(
            "compressed_tensors", "compressed-tensors"
        )

        quant_cfg["quant_method"] = quant_method

        # Quantization methods which are overrides (i.e. they have a
        # `override_quantization_method` method) must be checked in order
        # of preference (this is particularly important for GPTQ).
        overrides = [
            "bitblas",
            "gptq_marlin_24",
            "gptq_marlin",
            "gptq_bitblas",
            "awq_marlin",
            "ipex",
            "moe_wna16",
            "modelopt",
            "modelopt_fp4",
            "petit_nvfp4",
            # Ensure heavy backends are probed last to avoid unnecessary
            # imports during override detection (e.g., MXFP4 imports Triton)
            "mxfp4",
        ]
        quantization_methods = [
            q for q in supported_quantization if q not in overrides
        ]
        # Any custom overrides will be in quantization_methods so we place
        # them at the start of the list so custom overrides have preference
        # over the built-in ones.
        quantization_methods = quantization_methods + overrides

        # Detect which checkpoint is it
        for name in quantization_methods:
            method = me_quant.get_quantization_config(name)
            quantization_override = method.override_quantization_method(
                quant_cfg, self.quantization
            )
            if quantization_override is not None:
                # Raise error if the override is not custom (custom would
                # be in QUANTIZATION_METHODS but not QuantizationMethods)
                # and hasn't been added to the overrides list.
                if (
                    name in get_args(me_quant.QuantizationMethods)
                    and name not in overrides
                ):
                    raise ValueError(
                        f"Quantization method {name} is an override but "
                        "is has not been added to the `overrides` list "
                        "above. This is necessary to ensure that the "
                        "overrides are checked in order of preference."
                    )
                quant_method = quantization_override
                self.quantization = quantization_override
                break

        quant_method = quant_method if quant_method != "" else None
        # Verify quantization configurations.
        if self.quantization is None:
            self.quantization = quant_method
        elif self.quantization != quant_method:
            raise ValueError(
                "Quantization method specified in the model config "
                f"({quant_method}) does not match the quantization "
                f"method specified in the `quantization` argument "
                f"({self.quantization})."
            )

    if self.quantization is not None:
        if self.quantization not in supported_quantization:
            raise ValueError(
                f"Unknown quantization method: {self.quantization}. Must "
                f"be one of {supported_quantization}."
            )
        from vllm.platforms import current_platform

        current_platform.verify_quantization(self.quantization)

_verify_tokenizer_mode

_verify_tokenizer_mode() -> None
Source code in vllm/config/model.py
def _verify_tokenizer_mode(self) -> None:
    tokenizer_mode = cast(TokenizerMode, self.tokenizer_mode.lower())
    if tokenizer_mode not in get_args(TokenizerMode):
        raise ValueError(
            f"Unknown tokenizer mode: {self.tokenizer_mode}. Must be "
            f"one of {get_args(TokenizerMode)}."
        )
    self.tokenizer_mode = tokenizer_mode

_verify_with_expert_parallelism

_verify_with_expert_parallelism() -> None
Source code in vllm/config/model.py
def _verify_with_expert_parallelism(self) -> None:
    num_experts = self.get_num_experts()
    if num_experts < 1:
        raise ValueError(
            "Number of experts in the model must be greater than 0 "
            "when expert parallelism is enabled."
        )

compute_hash

compute_hash() -> str

WARNING: Whenever a new field is added to this config, ensure that it is included in the factors list if it affects the computation graph.

Provide a hash that uniquely identifies all the configs that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.

Source code in vllm/config/model.py
def compute_hash(self) -> str:
    """
    WARNING: Whenever a new field is added to this config,
    ensure that it is included in the factors list if
    it affects the computation graph.

    Provide a hash that uniquely identifies all the configs
    that affect the structure of the computation
    graph from input ids/embeddings to the final hidden states,
    excluding anything before input ids/embeddings and after
    the final hidden states.
    """
    factors: list[Any] = []
    factors.append(self.model)
    factors.append(self.dtype)
    factors.append(self.quantization)
    factors.append(self.revision)
    factors.append(self.code_revision)
    factors.append(self.max_model_len)
    factors.append(self.max_logprobs)
    factors.append(self.disable_sliding_window)
    factors.append(self.trust_remote_code)
    factors.append(self.generation_config)
    factors.append(self.model_impl)
    factors.append(self.override_generation_config)
    factors.append(self.video_pruning_rate)
    factors.append(self.enable_prompt_embeds)

    # hf_config can control how the model looks!
    try:
        hf_config_json = self.hf_config.to_json_string(use_diff=False)
    except TypeError:
        from transformers import PretrainedConfig

        from vllm.utils.jsontree import json_map_leaves

        # Handle nested HF configs with unserializable values gracefully
        hf_config_json = (
            json.dumps(
                json_map_leaves(
                    lambda v: v.to_dict()
                    if isinstance(v, PretrainedConfig)
                    else str(v),
                    self.hf_config.to_dict(),
                ),
                indent=2,
                sort_keys=True,
            )
            + "\n"
        )

    factors.append(hf_config_json)

    str_factors = str(factors)
    assert_hashable(str_factors)
    return hashlib.sha256(str(factors).encode()).hexdigest()

get_and_verify_max_len

get_and_verify_max_len(max_model_len: int)
Source code in vllm/config/model.py
def get_and_verify_max_len(self, max_model_len: int):
    # Consider max_model_len in tokenizer_config only when
    # pooling models use absolute position_embedding.
    tokenizer_config = None
    if (
        self.runner_type == "pooling"
        and getattr(self.hf_config, "position_embedding_type", "") == "absolute"
    ):
        tokenizer_config = try_get_tokenizer_config(
            self.tokenizer,
            trust_remote_code=self.trust_remote_code,
            revision=self.tokenizer_revision,
        )
    max_model_len = _get_and_verify_max_len(
        hf_config=self.hf_text_config,
        tokenizer_config=tokenizer_config,
        max_model_len=max_model_len,
        disable_sliding_window=self.disable_sliding_window,
        sliding_window=self.get_sliding_window(),
        spec_target_max_model_len=self.spec_target_max_model_len,
        encoder_config=self.encoder_config,
    )
    logger.info("Using max model len %s", max_model_len)
    return max_model_len

get_diff_sampling_param

get_diff_sampling_param() -> dict[str, Any]

This method returns a dictionary containing the non-default sampling parameters with override_generation_config applied.

The default sampling parameters are:

  • vLLM's neutral defaults if self.generation_config="vllm"
  • the model's defaults if self.generation_config="auto"
  • as defined in generation_config.json if self.generation_config="path/to/generation_config/dir"

Returns:

Type Description
dict[str, Any]

A dictionary containing the non-default sampling parameters.

Source code in vllm/config/model.py
def get_diff_sampling_param(self) -> dict[str, Any]:
    """
    This method returns a dictionary containing the non-default sampling
    parameters with `override_generation_config` applied.

    The default sampling parameters are:

    - vLLM's neutral defaults if `self.generation_config="vllm"`
    - the model's defaults if `self.generation_config="auto"`
    - as defined in `generation_config.json` if
        `self.generation_config="path/to/generation_config/dir"`

    Returns:
        A dictionary containing the non-default sampling parameters.
    """
    if self.generation_config == "vllm":
        config = {}
    else:
        config = self.try_get_generation_config()

    # Overriding with given generation config
    config.update(self.override_generation_config)

    available_params = [
        "repetition_penalty",
        "temperature",
        "top_k",
        "top_p",
        "min_p",
        "max_new_tokens",
    ]
    if any(p in config for p in available_params):
        diff_sampling_param = {
            p: config.get(p) for p in available_params if config.get(p) is not None
        }
        # Huggingface definition of max_new_tokens is equivalent
        # to vLLM's max_tokens
        if "max_new_tokens" in diff_sampling_param:
            diff_sampling_param["max_tokens"] = diff_sampling_param.pop(
                "max_new_tokens"
            )
    else:
        diff_sampling_param = {}

    if diff_sampling_param:
        logger.warning_once(
            "Default sampling parameters have been overridden by the "
            "model's Hugging Face generation config recommended from the "
            "model creator. If this is not intended, please relaunch "
            "vLLM instance with `--generation-config vllm`."
        )
    return diff_sampling_param

get_head_size

get_head_size() -> int
Source code in vllm/config/model.py
def get_head_size(self) -> int:
    # TODO remove hard code
    if self.is_deepseek_mla:
        qk_rope_head_dim = getattr(self.hf_text_config, "qk_rope_head_dim", 0)
        if self.use_mla:
            return self.hf_text_config.kv_lora_rank + qk_rope_head_dim
        else:
            qk_nope_head_dim = getattr(self.hf_text_config, "qk_nope_head_dim", 0)
            if qk_rope_head_dim and qk_nope_head_dim:
                return qk_rope_head_dim + qk_nope_head_dim

    if hasattr(self.hf_text_config, "model_type") and (
        self.hf_text_config.model_type == "zamba2"
    ):
        return self.hf_text_config.attention_head_dim

    if self.is_attention_free:
        return 0

    # NOTE: Some configs may set head_dim=None in the config
    if getattr(self.hf_text_config, "head_dim", None) is not None:
        return self.hf_text_config.head_dim

    # NOTE: Some models (such as PLaMo2.1) use `hidden_size_per_head`
    if getattr(self.hf_text_config, "hidden_size_per_head", None) is not None:
        return self.hf_text_config.hidden_size_per_head

    # FIXME(woosuk): This may not be true for all models.
    return (
        self.hf_text_config.hidden_size // self.hf_text_config.num_attention_heads
    )

get_hidden_size

get_hidden_size() -> int
Source code in vllm/config/model.py
def get_hidden_size(self) -> int:
    return getattr(self.hf_text_config, "hidden_size", 0)

get_layers_start_end_indices

get_layers_start_end_indices(
    parallel_config: ParallelConfig,
) -> tuple[int, int]
Source code in vllm/config/model.py
def get_layers_start_end_indices(
    self, parallel_config: ParallelConfig
) -> tuple[int, int]:
    from vllm.distributed.utils import get_pp_indices

    if (
        self.hf_text_config.model_type == "deepseek_mtp"
        or self.hf_config.model_type == "mimo_mtp"
        or self.hf_config.model_type == "glm4_moe_mtp"
        or self.hf_config.model_type == "ernie_mtp"
        or self.hf_config.model_type == "qwen3_next_mtp"
        or self.hf_config.model_type == "pangu_ultra_moe_mtp"
    ):
        total_num_hidden_layers = getattr(
            self.hf_text_config, "num_nextn_predict_layers", 0
        )
    elif self.hf_config.model_type == "longcat_flash_mtp":
        total_num_hidden_layers = getattr(
            self.hf_text_config, "num_nextn_predict_layers", 1
        )
    else:
        total_num_hidden_layers = getattr(
            self.hf_text_config, "num_hidden_layers", 0
        )
    # the layout order is: DP x PP x TP
    pp_rank = (
        parallel_config.rank // parallel_config.tensor_parallel_size
    ) % parallel_config.pipeline_parallel_size
    pp_size = parallel_config.pipeline_parallel_size
    start, end = get_pp_indices(total_num_hidden_layers, pp_rank, pp_size)
    return start, end

get_mamba_chunk_size

get_mamba_chunk_size() -> int | None

Returns the mamba chunk size if it exists

Source code in vllm/config/model.py
def get_mamba_chunk_size(self) -> int | None:
    """
    Returns the mamba chunk size if it exists
    """
    # used by e.g. Bamba, FalconH1, Granite, PLaMo2
    chunk_size = getattr(self.hf_text_config, "mamba_chunk_size", None)
    if chunk_size is None:
        # used by e.g. Mamba2, NemotronH, Zamba
        chunk_size = getattr(self.hf_text_config, "chunk_size", None)

    # Since Mamba1 does not have a chunk notion
    # we use a default chunk size of 1024.
    if chunk_size is None:
        chunk_size = 2048

    return chunk_size

get_multimodal_config

get_multimodal_config() -> MultiModalConfig

Get the multimodal configuration of the model.

Raises:

Type Description
ValueError

If the model is not multimodal.

Source code in vllm/config/model.py
def get_multimodal_config(self) -> MultiModalConfig:
    """
    Get the multimodal configuration of the model.

    Raises:
        ValueError: If the model is not multimodal.
    """
    if self.multimodal_config is None:
        raise ValueError("The model is not multimodal.")

    return self.multimodal_config

get_num_attention_heads

get_num_attention_heads(
    parallel_config: ParallelConfig,
) -> int
Source code in vllm/config/model.py
def get_num_attention_heads(self, parallel_config: ParallelConfig) -> int:
    num_heads = getattr(self.hf_text_config, "num_attention_heads", 0)
    return num_heads // parallel_config.tensor_parallel_size

get_num_experts

get_num_experts() -> int

Returns the number of experts in the model.

Source code in vllm/config/model.py
def get_num_experts(self) -> int:
    """Returns the number of experts in the model."""
    num_expert_names = [
        "num_experts",  # Jamba
        "moe_num_experts",  # Dbrx
        "n_routed_experts",  # DeepSeek
        "num_local_experts",  # Mixtral
    ]
    num_experts = getattr_iter(self.hf_text_config, num_expert_names, 0)
    if isinstance(num_experts, list):
        # Ernie VL's remote code uses list[int]...
        # The values are always the same so we just take the first one.
        return num_experts[0]
    return num_experts

get_num_kv_heads

get_num_kv_heads(parallel_config: ParallelConfig) -> int

Returns the number of KV heads per GPU.

Source code in vllm/config/model.py
def get_num_kv_heads(self, parallel_config: ParallelConfig) -> int:
    """Returns the number of KV heads per GPU."""
    if self.use_mla:
        # When using MLA during decode it becomes MQA
        return 1

    total_num_kv_heads = self.get_total_num_kv_heads()
    # If tensor parallelism is used, we divide the number of KV heads by
    # the tensor parallel size. We will replicate the KV heads in the
    # case where the number of KV heads is smaller than the tensor
    # parallel size so each GPU has at least one KV head.
    return max(1, total_num_kv_heads // parallel_config.tensor_parallel_size)

get_num_layers

get_num_layers(parallel_config: ParallelConfig) -> int
Source code in vllm/config/model.py
def get_num_layers(self, parallel_config: ParallelConfig) -> int:
    start, end = self.get_layers_start_end_indices(parallel_config)
    return end - start

get_num_layers_by_block_type

get_num_layers_by_block_type(
    parallel_config: ParallelConfig,
    block_type: LayerBlockType = "attention",
) -> int
Source code in vllm/config/model.py
def get_num_layers_by_block_type(
    self,
    parallel_config: ParallelConfig,
    block_type: LayerBlockType = "attention",
) -> int:
    # This function relies on 'layers_block_type' in hf_config,
    # for w/o this attribute, we will need to have workarounds like so
    attn_block_type = block_type == "attention"
    is_transformer = (
        not self.is_hybrid and not self.has_noops and not self.is_attention_free
    )
    start, end = self.get_layers_start_end_indices(parallel_config)

    if is_transformer:
        # Handle the basic case first
        return end - start if attn_block_type else 0
    elif self.is_attention_free:
        # Attention free
        # Note that this code assumes there
        # is only one type of attention-free block type.
        return 0 if attn_block_type else end - start
    elif self.has_noops:
        block_configs = self.hf_config.block_configs
        return sum(not bc.attention.no_op for bc in block_configs[start:end])
    else:
        # Hybrid model Jamba
        layers_block_type_value = getattr(
            self.hf_text_config, "layers_block_type", None
        )
        if layers_block_type_value is not None:
            if hasattr(self.hf_text_config, "model_type") and (
                self.hf_text_config.model_type == "zamba2"
            ):
                if attn_block_type:
                    return sum(
                        t == "hybrid" for t in layers_block_type_value[start:end]
                    )
                else:
                    return self.get_num_layers(parallel_config)
            return sum(t == block_type for t in layers_block_type_value[start:end])

        # Hybrid model Minimax
        attn_type_list = getattr(self.hf_config, "attn_type_list", None)
        if attn_type_list:
            return sum(t == 1 for t in attn_type_list[start:end])

        # Hybrid model Qwen3Next
        layer_types_value = getattr(self.hf_config, "layer_types", None)
        if layer_types_value is not None:
            if block_type == "attention":
                return sum(
                    t == "full_attention" for t in layer_types_value[start:end]
                )
            elif block_type == "linear_attention":
                return sum(
                    t == "linear_attention" for t in layer_types_value[start:end]
                )
            else:
                return sum(t == block_type for t in layer_types_value[start:end])

        if (
            layers_block_type_value is None
            and attn_type_list is None
            and layer_types_value is None
        ):
            raise ValueError(
                "The model is an hybrid without a layers_block_type or an "
                "attn_type_list, or a layer_types in the hf_config, "
                f"cannot determine the num of {block_type} layers"
            )

get_sliding_window

get_sliding_window() -> int | None

Get the sliding window size from the HF text config if present.

Source code in vllm/config/model.py
def get_sliding_window(self) -> int | None:
    """Get the sliding window size from the HF text config if present."""
    return getattr(self.hf_text_config, "sliding_window", None)

get_total_num_kv_heads

get_total_num_kv_heads() -> int

Returns the total number of KV heads.

Source code in vllm/config/model.py
def get_total_num_kv_heads(self) -> int:
    """Returns the total number of KV heads."""
    # For GPTBigCode & Falcon:
    # NOTE: for falcon, when new_decoder_architecture is True, the
    # multi_query flag is ignored and we use n_head_kv for the number of
    # KV heads.
    falcon_model_types = ["falcon", "RefinedWeb", "RefinedWebModel"]
    new_decoder_arch_falcon = (
        self.hf_config.model_type in falcon_model_types
        and getattr(self.hf_config, "new_decoder_architecture", False)
    )
    if not new_decoder_arch_falcon and getattr(
        self.hf_text_config, "multi_query", False
    ):
        # Multi-query attention, only one KV head.
        # Currently, tensor parallelism is not supported in this case.
        return 1

    # For DBRX and MPT
    if self.hf_config.model_type == "mpt":
        if "kv_n_heads" in self.hf_config.attn_config:
            return self.hf_config.attn_config["kv_n_heads"]
        return self.hf_config.num_attention_heads
    if self.hf_config.model_type == "dbrx":
        return getattr(
            self.hf_config.attn_config,
            "kv_n_heads",
            self.hf_config.num_attention_heads,
        )

    if self.hf_config.model_type == "nemotron-nas":
        for block in self.hf_config.block_configs:
            if not block.attention.no_op:
                return (
                    self.hf_config.num_attention_heads
                    // block.attention.n_heads_in_group
                )

        raise RuntimeError("Couldn't determine number of kv heads")

    if self.is_attention_free:
        return 0

    attributes = [
        # For Falcon:
        "n_head_kv",
        "num_kv_heads",
        # For LLaMA-2:
        "num_key_value_heads",
        # For ChatGLM:
        "multi_query_group_num",
    ]
    for attr in attributes:
        num_kv_heads = getattr(self.hf_text_config, attr, None)
        if num_kv_heads is not None:
            return num_kv_heads

    # For non-grouped-query attention models, the number of KV heads is
    # equal to the number of attention heads.
    return self.hf_text_config.num_attention_heads

get_vocab_size

get_vocab_size() -> int
Source code in vllm/config/model.py
def get_vocab_size(self) -> int:
    return getattr(self.hf_text_config, "vocab_size", 0)

maybe_pull_model_tokenizer_for_runai

maybe_pull_model_tokenizer_for_runai(
    model: str, tokenizer: str
) -> None

Pull model/tokenizer from Object Storage to temporary directory when needed.

Parameters:

Name Type Description Default
model str

Model name or path

required
tokenizer str

Tokenizer name or path

required
Source code in vllm/config/model.py
def maybe_pull_model_tokenizer_for_runai(self, model: str, tokenizer: str) -> None:
    """Pull model/tokenizer from Object Storage to temporary
    directory when needed.

    Args:
        model: Model name or path
        tokenizer: Tokenizer name or path
    """

    if not (is_runai_obj_uri(model) or is_runai_obj_uri(tokenizer)):
        return

    if is_runai_obj_uri(model):
        object_storage_model = ObjectStorageModel(url=model)
        object_storage_model.pull_files(
            model, allow_pattern=["*.model", "*.py", "*.json"]
        )
        self.model_weights = model
        self.model = object_storage_model.dir

        # If tokenizer is same as model, download to same directory
        if model == tokenizer:
            object_storage_model.pull_files(
                model,
                ignore_pattern=[
                    "*.pt",
                    "*.safetensors",
                    "*.bin",
                    "*.tensors",
                    "*.pth",
                ],
            )
            self.tokenizer = object_storage_model.dir
            return

    # Only download tokenizer if needed and not already handled
    if is_runai_obj_uri(tokenizer):
        object_storage_tokenizer = ObjectStorageModel(url=tokenizer)
        object_storage_tokenizer.pull_files(
            model,
            ignore_pattern=["*.pt", "*.safetensors", "*.bin", "*.tensors", "*.pth"],
        )
        self.tokenizer = object_storage_tokenizer.dir

try_get_generation_config

try_get_generation_config() -> dict[str, Any]

This method attempts to retrieve the non-default values of the generation config for this model.

The generation config can contain information about special tokens, as well as sampling parameters. Which is why this method exists separately to get_diff_sampling_param.

Returns:

Type Description
dict[str, Any]

A dictionary containing the non-default generation config.

Source code in vllm/config/model.py
def try_get_generation_config(self) -> dict[str, Any]:
    """
    This method attempts to retrieve the non-default values of the
    generation config for this model.

    The generation config can contain information about special tokens, as
    well as sampling parameters. Which is why this method exists separately
    to `get_diff_sampling_param`.

    Returns:
        A dictionary containing the non-default generation config.
    """
    if self.generation_config in {"auto", "vllm"}:
        config = try_get_generation_config(
            self.hf_config_path or self.model,
            trust_remote_code=self.trust_remote_code,
            revision=self.revision,
            config_format=self.config_format,
        )
    else:
        config = try_get_generation_config(
            self.generation_config,
            trust_remote_code=self.trust_remote_code,
            config_format=self.config_format,
        )

    if config is None:
        return {}

    return config.to_diff_dict()

using_transformers_backend

using_transformers_backend() -> bool

Check if the model is using the Transformers backend class.

Source code in vllm/config/model.py
def using_transformers_backend(self) -> bool:
    """Check if the model is using the Transformers backend class."""
    used_cls = self._model_info.architecture
    transformers_backend_cls = self._get_transformers_backend_cls()
    return used_cls == transformers_backend_cls

validate_model_config_after

validate_model_config_after() -> ModelConfig
Source code in vllm/config/model.py
@model_validator(mode="after")
def validate_model_config_after(self: "ModelConfig") -> "ModelConfig":
    if not isinstance(self.tokenizer, str):
        raise ValueError("tokenizer must be a string after __post_init__.")
    if not isinstance(self.max_model_len, int):
        raise ValueError("max_model_len must be an integer after __post_init__.")
    return self

validate_quantization_before classmethod

validate_quantization_before(value: Any) -> Any
Source code in vllm/config/model.py
@field_validator("quantization", mode="before")
@classmethod
def validate_quantization_before(cls, value: Any) -> Any:
    if isinstance(value, str):
        return value.lower()
    return value

verify_dual_chunk_attention_config

verify_dual_chunk_attention_config(
    load_config: LoadConfig,
) -> None
Source code in vllm/config/model.py
def verify_dual_chunk_attention_config(
    self,
    load_config: LoadConfig,
) -> None:
    if hasattr(self.hf_config, "dual_chunk_attention_config"):
        # Try loading the sparse attention config
        from vllm.model_executor.model_loader.weight_utils import (
            get_sparse_attention_config,
        )

        sparse_attn_config = get_sparse_attention_config(self, load_config)
        if sparse_attn_config:
            self.hf_config.dual_chunk_attention_config[
                "sparse_attention_config"
            ] = sparse_attn_config
            if (
                "sparse_attention_enabled"
                not in self.hf_config.dual_chunk_attention_config
            ):
                self.hf_config.dual_chunk_attention_config[
                    "sparse_attention_enabled"
                ] = True

verify_with_parallel_config

verify_with_parallel_config(
    parallel_config: ParallelConfig,
) -> None
Source code in vllm/config/model.py
def verify_with_parallel_config(
    self,
    parallel_config: ParallelConfig,
) -> None:
    if parallel_config.distributed_executor_backend == "external_launcher":
        assert self.seed is not None, (
            "Seed must be set when using external launcher backend to "
            "make sure sampling results are the same across workers."
        )

    total_num_attention_heads = getattr(
        self.hf_text_config, "num_attention_heads", 0
    )
    tensor_parallel_size = parallel_config.tensor_parallel_size
    if total_num_attention_heads % tensor_parallel_size != 0:
        raise ValueError(
            f"Total number of attention heads ({total_num_attention_heads})"
            " must be divisible by tensor parallel size "
            f"({tensor_parallel_size})."
        )

    if parallel_config.enable_expert_parallel:
        self._verify_with_expert_parallelism()

    pipeline_parallel_size = parallel_config.pipeline_parallel_size
    if pipeline_parallel_size > 1 and not self.registry.is_pp_supported_model(
        self.architectures, self
    ):
        raise NotImplementedError(
            "Pipeline parallelism is not supported for this model. "
            "Supported models implement the `SupportsPP` interface."
        )

    decode_context_parallel_size = parallel_config.decode_context_parallel_size
    if decode_context_parallel_size > 1 and not self.use_mla:
        total_num_kv_heads = self.get_total_num_kv_heads()
        assert tensor_parallel_size > total_num_kv_heads, (
            f"tensor parallel size {tensor_parallel_size} must be greater "
            f"than total num kv heads {total_num_kv_heads} when enable "
            f"decode context parallel for GQA/MQA"
        )

        max_dcp_size = tensor_parallel_size // total_num_kv_heads
        assert decode_context_parallel_size <= max_dcp_size, (
            f"decode context parallel size must less than or equal to "
            f"(tensor parallel size {tensor_parallel_size} // total "
            f"num kv heads {total_num_kv_heads}) = {max_dcp_size}, "
            f"but got {decode_context_parallel_size}"
        )

ParallelConfig

Configuration for the distributed execution.

Source code in vllm/config/parallel.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
@config
@dataclass
class ParallelConfig:
    """Configuration for the distributed execution."""

    pipeline_parallel_size: int = 1
    """Number of pipeline parallel groups."""
    tensor_parallel_size: int = 1
    """Number of tensor parallel groups."""
    data_parallel_size: int = 1
    """Number of data parallel groups. MoE layers will be sharded according to
    the product of the tensor parallel size and data parallel size."""
    data_parallel_size_local: int = 1
    """Number of local data parallel groups."""
    data_parallel_rank: int = 0
    """Rank of the data parallel group."""
    data_parallel_rank_local: int | None = None
    """Local rank of the data parallel group,
    set only in SPMD mode."""
    data_parallel_master_ip: str = "127.0.0.1"
    """IP of the data parallel master."""
    data_parallel_rpc_port: int = 29550
    """Port for data parallel messaging."""
    data_parallel_master_port: int = 29500
    """Port of the data parallel master."""
    data_parallel_backend: DataParallelBackend = "mp"
    """Backend to use for data parallel, either "mp" or "ray"."""
    data_parallel_external_lb: bool = False
    """Whether to use "external" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. This is useful for a "one-pod-per-rank"
    wide-EP setup in Kubernetes. Set implicitly when --data-parallel-rank
    is provided explicitly to vllm serve."""
    data_parallel_hybrid_lb: bool = False
    """Whether to use "hybrid" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. Enables running an AsyncLLM
    and API server on a "per-node" basis where vLLM load balances
    between local data parallel ranks, but an external LB balances
    between vLLM nodes/replicas. Set explicitly in conjunction with
    --data-parallel-start-rank."""
    enable_expert_parallel: bool = False
    """Use expert parallelism instead of tensor parallelism for MoE layers."""
    enable_eplb: bool = False
    """Enable expert parallelism load balancing for MoE layers."""
    eplb_config: EPLBConfig = Field(default_factory=EPLBConfig)
    """Expert parallelism configuration."""
    expert_placement_strategy: ExpertPlacementStrategy = "linear"
    """The expert placement strategy for MoE layers:\n
    - "linear": Experts are placed in a contiguous manner. For example, with 4
      experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have
      experts [2, 3].\n
    - "round_robin": Experts are placed in a round-robin manner. For example,
      with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1
      will have experts [1, 3]. This strategy can help improve load balancing
      for grouped expert models with no redundant experts."""
    all2all_backend: (
        Literal[
            "naive",
            "pplx",
            "deepep_high_throughput",
            "deepep_low_latency",
            "allgather_reducescatter",
            "flashinfer_all2allv",
        ]
        | None
    ) = None
    """All2All backend for MoE expert parallel communication. If not set, uses
    the value from VLLM_ALL2ALL_BACKEND environment variable. Available options:
    - "naive": Naive all2all implementation using broadcasts
    - "allgather_reducescatter": All2all based on allgather and reducescatter
    - "pplx": Use pplx kernels
    - "deepep_high_throughput": Use deepep high-throughput kernels
    - "deepep_low_latency": Use deepep low-latency kernels
    - "flashinfer_all2allv": Use flashinfer alltoallv kernels for mnnvl"""
    num_redundant_experts: int | None = None
    """`num_redundant_experts` is deprecated and has been replaced with
    `eplb_config.num_redundant_experts`. This will be removed in v0.12.0.
    Please use `eplb_config.num_redundant_experts` instead."""
    eplb_window_size: int | None = None
    """`eplb_window_size` is deprecated and has been replaced with
    `eplb_config.window_size`. This will be removed in v0.12.0.
    Please use `eplb_config.window_size` instead."""
    eplb_step_interval: int | None = None
    """`eplb_step_interval` is deprecated and has been replaced with
    `eplb_config.step_interval`. This will be removed in v0.12.0.
    Please use `eplb_config.step_interval` instead."""
    eplb_log_balancedness: bool | None = None
    """`eplb_log_balancedness` is deprecated and has been replaced with
    `eplb_config.log_balancedness`. This will be removed in v0.12.0.
    Please use `eplb_config.log_balancedness` instead."""

    max_parallel_loading_workers: int | None = None
    """Maximum number of parallel loading workers when loading model
    sequentially in multiple batches. To avoid RAM OOM when using tensor
    parallel and large models."""

    disable_custom_all_reduce: bool = False
    """Disable the custom all-reduce kernel and fall back to NCCL."""

    enable_dbo: bool = False
    """Enable dual batch overlap for the model executor."""

    dbo_decode_token_threshold: int = 32
    """The threshold for dual batch overlap for batches only containing decodes.
    If the number of tokens in the request is greater than this threshold,
    microbatching will be used. Otherwise, the request will be processed in a
    single batch."""
    dbo_prefill_token_threshold: int = 512  # TODO(lucas): tune
    """The threshold for dual batch overlap for batches that contain one or more
    prefills. If the number of tokens in the request is greater than this
    threshold, microbatching will be used. Otherwise, the request will be
    processed in a single batch."""

    disable_nccl_for_dp_synchronization: bool = False
    """Forces the dp synchronization logic in vllm/v1/worker/dp_utils.py 
    to use Gloo instead of NCCL for its all reduce"""

    ray_workers_use_nsight: bool = False
    """Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler."""

    ray_runtime_env: RuntimeEnv | None = None
    """Ray runtime environment to pass to distributed workers."""

    placement_group: PlacementGroup | None = None
    """ray distributed model workers placement group."""

    distributed_executor_backend: (
        str | DistributedExecutorBackend | type[Executor] | None
    ) = None
    """Backend to use for distributed model
    workers, either "ray" or "mp" (multiprocessing). If the product
    of pipeline_parallel_size and tensor_parallel_size is less than
    or equal to the number of GPUs available, "mp" will be used to
    keep processing on a single host. Otherwise, this will default
    to "ray" if Ray is installed and fail otherwise. Note that tpu
    only support Ray for distributed inference."""

    worker_cls: str = "auto"
    """The full name of the worker class to use. If "auto", the worker class
    will be determined based on the platform."""
    sd_worker_cls: str = "auto"
    """The full name of the worker class to use for speculative decoding.
    If "auto", the worker class will be determined based on the platform."""
    worker_extension_cls: str = ""
    """The full name of the worker extension class to use. The worker extension
    class is dynamically inherited by the worker class. This is used to inject
    new attributes and methods to the worker class for use in collective_rpc
    calls."""

    world_size: int = Field(init=False)
    """world_size is TPxPP, it affects the number of workers we create."""

    rank: int = 0
    """Global rank in distributed setup."""

    _data_parallel_master_port_list: list[int] = Field(default_factory=list)
    """List of open port auto-queried for data parallel messaging.
    Set to be private as it's not intended to be configured by users.
    """

    decode_context_parallel_size: int = 1
    """Number of decode context parallel groups, because the world size does
    not change by dcp, it simply reuse the GPUs of TP group, and tp_size
    needs to be divisible by dcp_size."""

    _api_process_count: int = Field(default=1, gt=0)
    """
    The number of API processes initialized.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

    _api_process_rank: int = Field(default=0, ge=-1)
    """
    The rank of this API process, or `-1` for engine core processes
    under API server scale-out.

    Note:
        This is an internal config that is only valid for and
        should only be set by API server scale-out.
    """

    @model_validator(mode="after")
    def _validate_parallel_config(self) -> Self:
        if self._api_process_rank >= self._api_process_count:
            raise ValueError(
                "Invalid value of `_api_process_rank`. "
                f"Expected to be `-1` or `[0, {self._api_process_count})`, "
                f"but found: {self._api_process_rank}"
            )

        if self.data_parallel_size_local > self.data_parallel_size:
            raise ValueError(
                f"data_parallel_size_local ({self.data_parallel_size_local}) "
                f"must be <= data_parallel_size ({self.data_parallel_size})"
            )

        if self.data_parallel_size <= 1 and self.data_parallel_external_lb:
            raise ValueError(
                "data_parallel_external_lb can only be set when data_parallel_size > 1"
            )

        if self.enable_eplb:
            if not current_platform.is_cuda():
                raise ValueError(
                    "Expert parallelism load balancing is only supported on "
                    "CUDA devices now."
                )
            if not self.enable_expert_parallel:
                raise ValueError("enable_expert_parallel must be True to use EPLB.")
            if self.tensor_parallel_size * self.data_parallel_size <= 1:
                raise ValueError(
                    "EPLB requires tensor_parallel_size or data_parallel_size "
                    f"to be greater than 1, but got "
                    f"TP={self.tensor_parallel_size},DP={self.data_parallel_size}."
                )
        else:
            if self.eplb_config.num_redundant_experts != 0:
                raise ValueError(
                    "num_redundant_experts is set to "
                    f"{self.eplb_config.num_redundant_experts} but EPLB is not "
                    "enabled. Either enable EPLB or unset "
                    "num_redundant_experts."
                )

        return self

    @property
    def world_size_across_dp(self) -> int:
        """world_size_across_dp is TPxPPxDP, it is the size of the world
        including data parallelism."""
        return self.world_size * self.data_parallel_size

    def get_next_dp_init_port(self) -> int:
        """
        We might need to initialize process groups in multiple
        processes that is related to data parallelism,
        e.g. both in the worker and in the engine, which
        can live in different processes. To avoid port conflicts, we
        pop a new port from the prepared port list each time we need to
        initialize a new process group related to data parallelism.
        """
        if self._data_parallel_master_port_list:
            answer = self._data_parallel_master_port_list.pop()
        else:
            answer = self.data_parallel_master_port
            self.data_parallel_master_port += 1

        return answer

    def stateless_init_dp_group(self) -> ProcessGroup:
        # NOTE: In high-concurrency scenarios multiple processes
        # can pick the same (currently free) port through a race
        # condition when calling `get_open_port()`. When the first
        # process binds the port the others will subsequently fail
        # with `torch.distributed.DistNetworkError: EADDRINUSE`.
        # To make the initialization more robust we retry a few times
        # with a fresh port whenever this specific error is observed.
        from torch.distributed import DistNetworkError

        from vllm.distributed.utils import (
            stateless_init_torch_distributed_process_group,
        )

        max_retries = 5
        last_exc: Exception | None = None
        for _ in range(max_retries):
            try:
                # use gloo since the engine process might not have cuda device
                return stateless_init_torch_distributed_process_group(
                    self.data_parallel_master_ip,
                    self.get_next_dp_init_port(),
                    self.data_parallel_rank,
                    self.data_parallel_size,
                    backend=current_platform.dist_backend,
                )
            except DistNetworkError as e:
                # We only want to retry when the root cause is EADDRINUSE.
                if "EADDRINUSE" in str(e):
                    logger.warning("Address already in use. Retrying with a new port.")
                    last_exc = e
                    continue  # try again with a new port
                raise e

        # If we get here all retries have failed.
        assert last_exc is not None
        raise last_exc

    # The all_reduce at the end of attention (during o_proj) means that
    # inputs are replicated across each rank of the tensor parallel group.
    # If using expert-parallelism with DeepEP All2All ops, replicated
    # tokens results in useless duplicate computation and communication.
    #
    # In this case, ensure the input to the experts is sequence parallel
    # to avoid the excess work.
    #
    # Not needed for pplx-kernels as it can handle duplicate input tokens.
    @property
    def use_sequence_parallel_moe(self) -> bool:
        return (
            self.all2all_backend
            in (
                "allgather_reducescatter",
                "naive",
                "deepep_high_throughput",
                "deepep_low_latency",
            )
            and self.enable_expert_parallel
            and self.tensor_parallel_size > 1
            and self.data_parallel_size > 1
        )

    @staticmethod
    def has_unfinished_dp(dp_group: ProcessGroup, has_unfinished: bool) -> bool:
        tensor = torch.tensor([has_unfinished], dtype=torch.int32, device="cpu")
        # dp rank 0: has_unfinished_seqs=True
        # dp rank 1: has_unfinished_seqs=False
        # aggregated: has_unfinished_seqs=True
        # so this is an OR operation, i.e. MAX in integers
        torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
        aggregated_has_unfinished = bool(tensor.item())
        return aggregated_has_unfinished

    @staticmethod
    def sync_kv_cache_memory_size(dp_group: ProcessGroup, kv_cache_memory: int) -> int:
        if kv_cache_memory == -1:
            kv_cache_memory = torch.iinfo(torch.int64).max
        tensor = torch.tensor([kv_cache_memory], dtype=torch.int64, device="cpu")
        # we cannot use broadcast for stateless dp group since it depends
        # on global rank
        torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
        return tensor.item()

    def compute_hash(self):
        """
        Provide a hash that uniquely identifies all the configs
        that affect the structure of the computation
        graph from input ids/embeddings to the final hidden states,
        excluding anything before input ids/embeddings and after
        the final hidden states.

        This hash is also used for DP worker configuration validation
        to prevent hangs from mismatched collective communication patterns.
        """
        factors: list[Any] = []
        factors.append(self.pipeline_parallel_size)
        factors.append(self.tensor_parallel_size)
        factors.append(self.enable_expert_parallel)
        factors.append(self.data_parallel_size)
        factors.append(self.all2all_backend)
        factors.append(self.enable_eplb)
        if self.enable_eplb:
            factors.append(self.eplb_config.log_balancedness)
            factors.append(self.eplb_config.window_size)
            factors.append(self.eplb_config.step_interval)
            factors.append(self.eplb_config.num_redundant_experts)
        return hashlib.sha256(str(factors).encode()).hexdigest()

    def __post_init__(self) -> None:
        # Set all2all_backend from env var if not specified, with deprecation warning
        if self.all2all_backend is None:
            self.all2all_backend = envs.VLLM_ALL2ALL_BACKEND
            if envs.is_set("VLLM_ALL2ALL_BACKEND"):
                logger.warning_once(
                    "VLLM_ALL2ALL_BACKEND environment variable is deprecated and "
                    "will be removed in a future release. Please use the "
                    "--all2all-backend command-line argument instead."
                )

        # Forward deprecated fields to their new location
        if self.num_redundant_experts is not None:
            self.eplb_config.num_redundant_experts = self.num_redundant_experts
            logger.warning_once(
                "num_redundant_experts is deprecated and has been replaced "
                "with eplb_config.num_redundant_experts. This will be removed "
                "in v0.12.0. Changing this field after initialization will "
                "have no effect."
            )
        if self.eplb_window_size is not None:
            self.eplb_config.window_size = self.eplb_window_size
            logger.warning_once(
                "eplb_window_size is deprecated and has been replaced "
                "with eplb_config.window_size. This will be removed "
                "in v0.12.0. Changing this field after initialization will "
                "have no effect."
            )
        if self.eplb_step_interval is not None:
            self.eplb_config.step_interval = self.eplb_step_interval
            logger.warning_once(
                "eplb_step_interval is deprecated and has been replaced "
                "with eplb_config.step_interval. This will be removed "
                "in v0.12.0. Changing this field after initialization will "
                "have no effect."
            )
        if self.eplb_log_balancedness is not None:
            self.eplb_config.log_balancedness = self.eplb_log_balancedness
            logger.warning_once(
                "eplb_log_balancedness is deprecated and has been replaced "
                "with eplb_config.log_balancedness. This will be removed "
                "in v0.12.0. Changing this field after initialization will "
                "have no effect."
            )

        # Continue with the rest of the initialization
        self.world_size = self.pipeline_parallel_size * self.tensor_parallel_size

        if self.distributed_executor_backend == "external_launcher":
            logger.info("Using external launcher for distributed inference.")
            self.world_size *= self.data_parallel_size

        if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
            # Data parallel was specified in the engine args.
            if self.distributed_executor_backend == "external_launcher":
                # For external launcher,
                # we need to set the data parallel rank automatically
                self.data_parallel_rank = int(os.environ["RANK"]) // (
                    self.world_size // self.data_parallel_size
                )
                logger.info(
                    "Set data_parallel_rank to %d automatically.",
                    self.data_parallel_rank,
                )
            if not self._data_parallel_master_port_list:
                self._data_parallel_master_port_list = get_open_ports_list(5)
            self.data_parallel_master_port = self._data_parallel_master_port_list.pop()

            if not (0 <= self.data_parallel_rank < self.data_parallel_size):
                raise ValueError(
                    f"data_parallel_rank ({self.data_parallel_rank})"
                    f" must be in the range [0, {self.data_parallel_size})"
                )
        else:
            # Otherwise fall back to env vars (e.g. for offline SPMD case).
            self.data_parallel_size = envs.VLLM_DP_SIZE
            self.data_parallel_rank = envs.VLLM_DP_RANK
            self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
            self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
            self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

        if self.distributed_executor_backend == "external_launcher":
            os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
            logger.info("Disabling V1 multiprocessing for external launcher.")

        if self.distributed_executor_backend is None and self.world_size > 1:
            # We use multiprocessing by default if world_size fits on the
            # current node and we aren't in a ray placement group.

            from vllm.v1.executor import ray_utils

            backend: DistributedExecutorBackend = "mp"
            ray_found = ray_utils.ray_is_available()
            if current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
                backend = "uni"
            elif (
                current_platform.is_cuda()
                and cuda_device_count_stateless() < self.world_size
            ):
                gpu_count = cuda_device_count_stateless()
                raise ValueError(
                    f"Tensor parallel size ({self.world_size}) cannot be "
                    f"larger than the number of available GPUs ({gpu_count})."
                )
            elif self.data_parallel_backend == "ray":
                logger.info(
                    "Using ray distributed inference because "
                    "data_parallel_backend is ray"
                )
                backend = "ray"
            elif ray_found:
                if self.placement_group:
                    backend = "ray"
                else:
                    from ray import is_initialized as ray_is_initialized

                    if ray_is_initialized():
                        from ray.util import get_current_placement_group

                        if get_current_placement_group():
                            backend = "ray"
            self.distributed_executor_backend = backend
            logger.debug("Defaulting to use %s for distributed inference", backend)

        if self.distributed_executor_backend is None and self.world_size == 1:
            self.distributed_executor_backend = "uni"

        if self.max_parallel_loading_workers is not None:
            logger.warning(
                "max_parallel_loading_workers is currently "
                "not supported and will be ignored."
            )

    @property
    def use_ray(self) -> bool:
        return self.distributed_executor_backend == "ray" or (
            isinstance(self.distributed_executor_backend, type)
            and getattr(self.distributed_executor_backend, "uses_ray", False)
        )

    @model_validator(mode="after")
    def _verify_args(self) -> Self:
        # Lazy import to avoid circular import
        from vllm.v1.executor import Executor

        # Enable batch invariance settings if requested
        if vllm_is_batch_invariant():
            self.disable_custom_all_reduce = True

        if (
            self.distributed_executor_backend is not None
            and not isinstance(self.distributed_executor_backend, str)
            and not (
                isinstance(self.distributed_executor_backend, type)
                and issubclass(self.distributed_executor_backend, Executor)
            )
        ):
            raise ValueError(
                "Unrecognized distributed executor backend "
                f"{self.distributed_executor_backend}. Supported "
                "values are 'ray', 'mp' 'uni', 'external_launcher', "
                " custom Executor subclass or its import path."
            )
        if self.use_ray:
            from vllm.v1.executor import ray_utils

            ray_utils.assert_ray_available()

        if not current_platform.use_custom_allreduce():
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce kernel because it is not "
                "supported on current platform."
            )
        if self.ray_workers_use_nsight and not self.use_ray:
            raise ValueError(
                "Unable to use nsight profiling unless workers run with Ray."
            )

        return self

_api_process_count class-attribute instance-attribute

_api_process_count: int = Field(default=1, gt=0)

The number of API processes initialized.

Note

This is an internal config that is only valid for and should only be set by API server scale-out.

_api_process_rank class-attribute instance-attribute

_api_process_rank: int = Field(default=0, ge=-1)

The rank of this API process, or -1 for engine core processes under API server scale-out.

Note

This is an internal config that is only valid for and should only be set by API server scale-out.

_data_parallel_master_port_list class-attribute instance-attribute

_data_parallel_master_port_list: list[int] = Field(
    default_factory=list
)

List of open port auto-queried for data parallel messaging. Set to be private as it's not intended to be configured by users.

all2all_backend class-attribute instance-attribute

all2all_backend: (
    Literal[
        "naive",
        "pplx",
        "deepep_high_throughput",
        "deepep_low_latency",
        "allgather_reducescatter",
        "flashinfer_all2allv",
    ]
    | None
) = None

All2All backend for MoE expert parallel communication. If not set, uses the value from VLLM_ALL2ALL_BACKEND environment variable. Available options: - "naive": Naive all2all implementation using broadcasts - "allgather_reducescatter": All2all based on allgather and reducescatter - "pplx": Use pplx kernels - "deepep_high_throughput": Use deepep high-throughput kernels - "deepep_low_latency": Use deepep low-latency kernels - "flashinfer_all2allv": Use flashinfer alltoallv kernels for mnnvl

data_parallel_backend class-attribute instance-attribute

data_parallel_backend: DataParallelBackend = 'mp'

Backend to use for data parallel, either "mp" or "ray".

data_parallel_external_lb class-attribute instance-attribute

data_parallel_external_lb: bool = False

Whether to use "external" DP LB mode. Applies only to online serving and when data_parallel_size > 0. This is useful for a "one-pod-per-rank" wide-EP setup in Kubernetes. Set implicitly when --data-parallel-rank is provided explicitly to vllm serve.

data_parallel_hybrid_lb class-attribute instance-attribute

data_parallel_hybrid_lb: bool = False

Whether to use "hybrid" DP LB mode. Applies only to online serving and when data_parallel_size > 0. Enables running an AsyncLLM and API server on a "per-node" basis where vLLM load balances between local data parallel ranks, but an external LB balances between vLLM nodes/replicas. Set explicitly in conjunction with --data-parallel-start-rank.

data_parallel_master_ip class-attribute instance-attribute

data_parallel_master_ip: str = '127.0.0.1'

IP of the data parallel master.

data_parallel_master_port class-attribute instance-attribute

data_parallel_master_port: int = 29500

Port of the data parallel master.

data_parallel_rank class-attribute instance-attribute

data_parallel_rank: int = 0

Rank of the data parallel group.

data_parallel_rank_local class-attribute instance-attribute

data_parallel_rank_local: int | None = None

Local rank of the data parallel group, set only in SPMD mode.

data_parallel_rpc_port class-attribute instance-attribute

data_parallel_rpc_port: int = 29550

Port for data parallel messaging.

data_parallel_size class-attribute instance-attribute

data_parallel_size: int = 1

Number of data parallel groups. MoE layers will be sharded according to the product of the tensor parallel size and data parallel size.

data_parallel_size_local class-attribute instance-attribute

data_parallel_size_local: int = 1

Number of local data parallel groups.

dbo_decode_token_threshold class-attribute instance-attribute

dbo_decode_token_threshold: int = 32

The threshold for dual batch overlap for batches only containing decodes. If the number of tokens in the request is greater than this threshold, microbatching will be used. Otherwise, the request will be processed in a single batch.

dbo_prefill_token_threshold class-attribute instance-attribute

dbo_prefill_token_threshold: int = 512

The threshold for dual batch overlap for batches that contain one or more prefills. If the number of tokens in the request is greater than this threshold, microbatching will be used. Otherwise, the request will be processed in a single batch.

decode_context_parallel_size class-attribute instance-attribute

decode_context_parallel_size: int = 1

Number of decode context parallel groups, because the world size does not change by dcp, it simply reuse the GPUs of TP group, and tp_size needs to be divisible by dcp_size.

disable_custom_all_reduce class-attribute instance-attribute

disable_custom_all_reduce: bool = False

Disable the custom all-reduce kernel and fall back to NCCL.

disable_nccl_for_dp_synchronization class-attribute instance-attribute

disable_nccl_for_dp_synchronization: bool = False

Forces the dp synchronization logic in vllm/v1/worker/dp_utils.py to use Gloo instead of NCCL for its all reduce

distributed_executor_backend class-attribute instance-attribute

distributed_executor_backend: (
    str | DistributedExecutorBackend | type[Executor] | None
) = None

Backend to use for distributed model workers, either "ray" or "mp" (multiprocessing). If the product of pipeline_parallel_size and tensor_parallel_size is less than or equal to the number of GPUs available, "mp" will be used to keep processing on a single host. Otherwise, this will default to "ray" if Ray is installed and fail otherwise. Note that tpu only support Ray for distributed inference.

enable_dbo class-attribute instance-attribute

enable_dbo: bool = False

Enable dual batch overlap for the model executor.

enable_eplb class-attribute instance-attribute

enable_eplb: bool = False

Enable expert parallelism load balancing for MoE layers.

enable_expert_parallel class-attribute instance-attribute

enable_expert_parallel: bool = False

Use expert parallelism instead of tensor parallelism for MoE layers.

eplb_config class-attribute instance-attribute

eplb_config: EPLBConfig = Field(default_factory=EPLBConfig)

Expert parallelism configuration.

eplb_log_balancedness class-attribute instance-attribute

eplb_log_balancedness: bool | None = None

eplb_log_balancedness is deprecated and has been replaced with eplb_config.log_balancedness. This will be removed in v0.12.0. Please use eplb_config.log_balancedness instead.

eplb_step_interval class-attribute instance-attribute

eplb_step_interval: int | None = None

eplb_step_interval is deprecated and has been replaced with eplb_config.step_interval. This will be removed in v0.12.0. Please use eplb_config.step_interval instead.

eplb_window_size class-attribute instance-attribute

eplb_window_size: int | None = None

eplb_window_size is deprecated and has been replaced with eplb_config.window_size. This will be removed in v0.12.0. Please use eplb_config.window_size instead.

expert_placement_strategy class-attribute instance-attribute

expert_placement_strategy: ExpertPlacementStrategy = (
    "linear"
)

The expert placement strategy for MoE layers:

  • "linear": Experts are placed in a contiguous manner. For example, with 4 experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have experts [2, 3].

  • "round_robin": Experts are placed in a round-robin manner. For example, with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1 will have experts [1, 3]. This strategy can help improve load balancing for grouped expert models with no redundant experts.

max_parallel_loading_workers class-attribute instance-attribute

max_parallel_loading_workers: int | None = None

Maximum number of parallel loading workers when loading model sequentially in multiple batches. To avoid RAM OOM when using tensor parallel and large models.

num_redundant_experts class-attribute instance-attribute

num_redundant_experts: int | None = None

num_redundant_experts is deprecated and has been replaced with eplb_config.num_redundant_experts. This will be removed in v0.12.0. Please use eplb_config.num_redundant_experts instead.

pipeline_parallel_size class-attribute instance-attribute

pipeline_parallel_size: int = 1

Number of pipeline parallel groups.

placement_group class-attribute instance-attribute

placement_group: PlacementGroup | None = None

ray distributed model workers placement group.

rank class-attribute instance-attribute

rank: int = 0

Global rank in distributed setup.

ray_runtime_env class-attribute instance-attribute

ray_runtime_env: RuntimeEnv | None = None

Ray runtime environment to pass to distributed workers.

ray_workers_use_nsight class-attribute instance-attribute

ray_workers_use_nsight: bool = False

Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler.

sd_worker_cls class-attribute instance-attribute

sd_worker_cls: str = 'auto'

The full name of the worker class to use for speculative decoding. If "auto", the worker class will be determined based on the platform.

tensor_parallel_size class-attribute instance-attribute

tensor_parallel_size: int = 1

Number of tensor parallel groups.

use_ray property

use_ray: bool

use_sequence_parallel_moe property

use_sequence_parallel_moe: bool

worker_cls class-attribute instance-attribute

worker_cls: str = 'auto'

The full name of the worker class to use. If "auto", the worker class will be determined based on the platform.

worker_extension_cls class-attribute instance-attribute

worker_extension_cls: str = ''

The full name of the worker extension class to use. The worker extension class is dynamically inherited by the worker class. This is used to inject new attributes and methods to the worker class for use in collective_rpc calls.

world_size class-attribute instance-attribute

world_size: int = Field(init=False)

world_size is TPxPP, it affects the number of workers we create.

world_size_across_dp property

world_size_across_dp: int

world_size_across_dp is TPxPPxDP, it is the size of the world including data parallelism.

__post_init__

__post_init__() -> None
Source code in vllm/config/parallel.py
def __post_init__(self) -> None:
    # Set all2all_backend from env var if not specified, with deprecation warning
    if self.all2all_backend is None:
        self.all2all_backend = envs.VLLM_ALL2ALL_BACKEND
        if envs.is_set("VLLM_ALL2ALL_BACKEND"):
            logger.warning_once(
                "VLLM_ALL2ALL_BACKEND environment variable is deprecated and "
                "will be removed in a future release. Please use the "
                "--all2all-backend command-line argument instead."
            )

    # Forward deprecated fields to their new location
    if self.num_redundant_experts is not None:
        self.eplb_config.num_redundant_experts = self.num_redundant_experts
        logger.warning_once(
            "num_redundant_experts is deprecated and has been replaced "
            "with eplb_config.num_redundant_experts. This will be removed "
            "in v0.12.0. Changing this field after initialization will "
            "have no effect."
        )
    if self.eplb_window_size is not None:
        self.eplb_config.window_size = self.eplb_window_size
        logger.warning_once(
            "eplb_window_size is deprecated and has been replaced "
            "with eplb_config.window_size. This will be removed "
            "in v0.12.0. Changing this field after initialization will "
            "have no effect."
        )
    if self.eplb_step_interval is not None:
        self.eplb_config.step_interval = self.eplb_step_interval
        logger.warning_once(
            "eplb_step_interval is deprecated and has been replaced "
            "with eplb_config.step_interval. This will be removed "
            "in v0.12.0. Changing this field after initialization will "
            "have no effect."
        )
    if self.eplb_log_balancedness is not None:
        self.eplb_config.log_balancedness = self.eplb_log_balancedness
        logger.warning_once(
            "eplb_log_balancedness is deprecated and has been replaced "
            "with eplb_config.log_balancedness. This will be removed "
            "in v0.12.0. Changing this field after initialization will "
            "have no effect."
        )

    # Continue with the rest of the initialization
    self.world_size = self.pipeline_parallel_size * self.tensor_parallel_size

    if self.distributed_executor_backend == "external_launcher":
        logger.info("Using external launcher for distributed inference.")
        self.world_size *= self.data_parallel_size

    if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
        # Data parallel was specified in the engine args.
        if self.distributed_executor_backend == "external_launcher":
            # For external launcher,
            # we need to set the data parallel rank automatically
            self.data_parallel_rank = int(os.environ["RANK"]) // (
                self.world_size // self.data_parallel_size
            )
            logger.info(
                "Set data_parallel_rank to %d automatically.",
                self.data_parallel_rank,
            )
        if not self._data_parallel_master_port_list:
            self._data_parallel_master_port_list = get_open_ports_list(5)
        self.data_parallel_master_port = self._data_parallel_master_port_list.pop()

        if not (0 <= self.data_parallel_rank < self.data_parallel_size):
            raise ValueError(
                f"data_parallel_rank ({self.data_parallel_rank})"
                f" must be in the range [0, {self.data_parallel_size})"
            )
    else:
        # Otherwise fall back to env vars (e.g. for offline SPMD case).
        self.data_parallel_size = envs.VLLM_DP_SIZE
        self.data_parallel_rank = envs.VLLM_DP_RANK
        self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
        self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
        self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

    if self.distributed_executor_backend == "external_launcher":
        os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
        logger.info("Disabling V1 multiprocessing for external launcher.")

    if self.distributed_executor_backend is None and self.world_size > 1:
        # We use multiprocessing by default if world_size fits on the
        # current node and we aren't in a ray placement group.

        from vllm.v1.executor import ray_utils

        backend: DistributedExecutorBackend = "mp"
        ray_found = ray_utils.ray_is_available()
        if current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
            backend = "uni"
        elif (
            current_platform.is_cuda()
            and cuda_device_count_stateless() < self.world_size
        ):
            gpu_count = cuda_device_count_stateless()
            raise ValueError(
                f"Tensor parallel size ({self.world_size}) cannot be "
                f"larger than the number of available GPUs ({gpu_count})."
            )
        elif self.data_parallel_backend == "ray":
            logger.info(
                "Using ray distributed inference because "
                "data_parallel_backend is ray"
            )
            backend = "ray"
        elif ray_found:
            if self.placement_group:
                backend = "ray"
            else:
                from ray import is_initialized as ray_is_initialized

                if ray_is_initialized():
                    from ray.util import get_current_placement_group

                    if get_current_placement_group():
                        backend = "ray"
        self.distributed_executor_backend = backend
        logger.debug("Defaulting to use %s for distributed inference", backend)

    if self.distributed_executor_backend is None and self.world_size == 1:
        self.distributed_executor_backend = "uni"

    if self.max_parallel_loading_workers is not None:
        logger.warning(
            "max_parallel_loading_workers is currently "
            "not supported and will be ignored."
        )

_validate_parallel_config

_validate_parallel_config() -> Self
Source code in vllm/config/parallel.py
@model_validator(mode="after")
def _validate_parallel_config(self) -> Self:
    if self._api_process_rank >= self._api_process_count:
        raise ValueError(
            "Invalid value of `_api_process_rank`. "
            f"Expected to be `-1` or `[0, {self._api_process_count})`, "
            f"but found: {self._api_process_rank}"
        )

    if self.data_parallel_size_local > self.data_parallel_size:
        raise ValueError(
            f"data_parallel_size_local ({self.data_parallel_size_local}) "
            f"must be <= data_parallel_size ({self.data_parallel_size})"
        )

    if self.data_parallel_size <= 1 and self.data_parallel_external_lb:
        raise ValueError(
            "data_parallel_external_lb can only be set when data_parallel_size > 1"
        )

    if self.enable_eplb:
        if not current_platform.is_cuda():
            raise ValueError(
                "Expert parallelism load balancing is only supported on "
                "CUDA devices now."
            )
        if not self.enable_expert_parallel:
            raise ValueError("enable_expert_parallel must be True to use EPLB.")
        if self.tensor_parallel_size * self.data_parallel_size <= 1:
            raise ValueError(
                "EPLB requires tensor_parallel_size or data_parallel_size "
                f"to be greater than 1, but got "
                f"TP={self.tensor_parallel_size},DP={self.data_parallel_size}."
            )
    else:
        if self.eplb_config.num_redundant_experts != 0:
            raise ValueError(
                "num_redundant_experts is set to "
                f"{self.eplb_config.num_redundant_experts} but EPLB is not "
                "enabled. Either enable EPLB or unset "
                "num_redundant_experts."
            )

    return self

_verify_args

_verify_args() -> Self
Source code in vllm/config/parallel.py
@model_validator(mode="after")
def _verify_args(self) -> Self:
    # Lazy import to avoid circular import
    from vllm.v1.executor import Executor

    # Enable batch invariance settings if requested
    if vllm_is_batch_invariant():
        self.disable_custom_all_reduce = True

    if (
        self.distributed_executor_backend is not None
        and not isinstance(self.distributed_executor_backend, str)
        and not (
            isinstance(self.distributed_executor_backend, type)
            and issubclass(self.distributed_executor_backend, Executor)
        )
    ):
        raise ValueError(
            "Unrecognized distributed executor backend "
            f"{self.distributed_executor_backend}. Supported "
            "values are 'ray', 'mp' 'uni', 'external_launcher', "
            " custom Executor subclass or its import path."
        )
    if self.use_ray:
        from vllm.v1.executor import ray_utils

        ray_utils.assert_ray_available()

    if not current_platform.use_custom_allreduce():
        self.disable_custom_all_reduce = True
        logger.debug(
            "Disabled the custom all-reduce kernel because it is not "
            "supported on current platform."
        )
    if self.ray_workers_use_nsight and not self.use_ray:
        raise ValueError(
            "Unable to use nsight profiling unless workers run with Ray."
        )

    return self

compute_hash

compute_hash()

Provide a hash that uniquely identifies all the configs that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.

This hash is also used for DP worker configuration validation to prevent hangs from mismatched collective communication patterns.

Source code in vllm/config/parallel.py
def compute_hash(self):
    """
    Provide a hash that uniquely identifies all the configs
    that affect the structure of the computation
    graph from input ids/embeddings to the final hidden states,
    excluding anything before input ids/embeddings and after
    the final hidden states.

    This hash is also used for DP worker configuration validation
    to prevent hangs from mismatched collective communication patterns.
    """
    factors: list[Any] = []
    factors.append(self.pipeline_parallel_size)
    factors.append(self.tensor_parallel_size)
    factors.append(self.enable_expert_parallel)
    factors.append(self.data_parallel_size)
    factors.append(self.all2all_backend)
    factors.append(self.enable_eplb)
    if self.enable_eplb:
        factors.append(self.eplb_config.log_balancedness)
        factors.append(self.eplb_config.window_size)
        factors.append(self.eplb_config.step_interval)
        factors.append(self.eplb_config.num_redundant_experts)
    return hashlib.sha256(str(factors).encode()).hexdigest()

get_next_dp_init_port

get_next_dp_init_port() -> int

We might need to initialize process groups in multiple processes that is related to data parallelism, e.g. both in the worker and in the engine, which can live in different processes. To avoid port conflicts, we pop a new port from the prepared port list each time we need to initialize a new process group related to data parallelism.

Source code in vllm/config/parallel.py
def get_next_dp_init_port(self) -> int:
    """
    We might need to initialize process groups in multiple
    processes that is related to data parallelism,
    e.g. both in the worker and in the engine, which
    can live in different processes. To avoid port conflicts, we
    pop a new port from the prepared port list each time we need to
    initialize a new process group related to data parallelism.
    """
    if self._data_parallel_master_port_list:
        answer = self._data_parallel_master_port_list.pop()
    else:
        answer = self.data_parallel_master_port
        self.data_parallel_master_port += 1

    return answer

has_unfinished_dp staticmethod

has_unfinished_dp(
    dp_group: ProcessGroup, has_unfinished: bool
) -> bool
Source code in vllm/config/parallel.py
@staticmethod
def has_unfinished_dp(dp_group: ProcessGroup, has_unfinished: bool) -> bool:
    tensor = torch.tensor([has_unfinished], dtype=torch.int32, device="cpu")
    # dp rank 0: has_unfinished_seqs=True
    # dp rank 1: has_unfinished_seqs=False
    # aggregated: has_unfinished_seqs=True
    # so this is an OR operation, i.e. MAX in integers
    torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
    aggregated_has_unfinished = bool(tensor.item())
    return aggregated_has_unfinished

stateless_init_dp_group

stateless_init_dp_group() -> ProcessGroup
Source code in vllm/config/parallel.py
def stateless_init_dp_group(self) -> ProcessGroup:
    # NOTE: In high-concurrency scenarios multiple processes
    # can pick the same (currently free) port through a race
    # condition when calling `get_open_port()`. When the first
    # process binds the port the others will subsequently fail
    # with `torch.distributed.DistNetworkError: EADDRINUSE`.
    # To make the initialization more robust we retry a few times
    # with a fresh port whenever this specific error is observed.
    from torch.distributed import DistNetworkError

    from vllm.distributed.utils import (
        stateless_init_torch_distributed_process_group,
    )

    max_retries = 5
    last_exc: Exception | None = None
    for _ in range(max_retries):
        try:
            # use gloo since the engine process might not have cuda device
            return stateless_init_torch_distributed_process_group(
                self.data_parallel_master_ip,
                self.get_next_dp_init_port(),
                self.data_parallel_rank,
                self.data_parallel_size,
                backend=current_platform.dist_backend,
            )
        except DistNetworkError as e:
            # We only want to retry when the root cause is EADDRINUSE.
            if "EADDRINUSE" in str(e):
                logger.warning("Address already in use. Retrying with a new port.")
                last_exc = e
                continue  # try again with a new port
            raise e

    # If we get here all retries have failed.
    assert last_exc is not None
    raise last_exc

sync_kv_cache_memory_size staticmethod

sync_kv_cache_memory_size(
    dp_group: ProcessGroup, kv_cache_memory: int
) -> int
Source code in vllm/config/parallel.py
@staticmethod
def sync_kv_cache_memory_size(dp_group: ProcessGroup, kv_cache_memory: int) -> int:
    if kv_cache_memory == -1:
        kv_cache_memory = torch.iinfo(torch.int64).max
    tensor = torch.tensor([kv_cache_memory], dtype=torch.int64, device="cpu")
    # we cannot use broadcast for stateless dp group since it depends
    # on global rank
    torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
    return tensor.item()

StatelessProcessGroup dataclass

A dataclass to hold a metadata store, and the rank, world_size of the group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.

Source code in vllm/distributed/utils.py
@dataclasses.dataclass
class StatelessProcessGroup:
    """A dataclass to hold a metadata store, and the rank, world_size of the
    group. Only use it to communicate metadata between processes.
    For data-plane communication, create NCCL-related objects.
    """

    rank: int
    world_size: int
    store: torch._C._distributed_c10d.Store

    # stores a reference to the socket so that the file descriptor stays alive
    socket: socket.socket | None

    data_expiration_seconds: int = 3600  # 1 hour

    # dst rank -> counter
    send_dst_counter: dict[int, int] = dataclasses.field(default_factory=dict)
    # src rank -> counter
    recv_src_counter: dict[int, int] = dataclasses.field(default_factory=dict)
    broadcast_send_counter: int = 0
    broadcast_recv_src_counter: dict[int, int] = dataclasses.field(default_factory=dict)

    # A deque to store the data entries, with key and timestamp.
    entries: deque[tuple[str, float]] = dataclasses.field(default_factory=deque)

    def __post_init__(self):
        assert self.rank < self.world_size
        self.send_dst_counter = {i: 0 for i in range(self.world_size)}
        self.recv_src_counter = {i: 0 for i in range(self.world_size)}
        self.broadcast_recv_src_counter = {i: 0 for i in range(self.world_size)}

    def send_obj(self, obj: Any, dst: int):
        """Send an object to a destination rank."""
        self.expire_data()
        key = f"send_to/{dst}/{self.send_dst_counter[dst]}"
        self.store.set(key, pickle.dumps(obj))
        self.send_dst_counter[dst] += 1
        self.entries.append((key, time.time()))

    def expire_data(self):
        """Expire data that is older than `data_expiration_seconds` seconds."""
        while self.entries:
            # check the oldest entry
            key, timestamp = self.entries[0]
            if time.time() - timestamp > self.data_expiration_seconds:
                self.store.delete_key(key)
                self.entries.popleft()
            else:
                break

    def recv_obj(self, src: int) -> Any:
        """Receive an object from a source rank."""
        obj = pickle.loads(
            self.store.get(f"send_to/{self.rank}/{self.recv_src_counter[src]}")
        )
        self.recv_src_counter[src] += 1
        return obj

    def broadcast_obj(self, obj: Any | None, src: int) -> Any:
        """Broadcast an object from a source rank to all other ranks.
        It does not clean up after all ranks have received the object.
        Use it for limited times, e.g., for initialization.
        """
        if self.rank == src:
            self.expire_data()
            key = f"broadcast_from/{src}/{self.broadcast_send_counter}"
            self.store.set(key, pickle.dumps(obj))
            self.broadcast_send_counter += 1
            self.entries.append((key, time.time()))
            return obj
        else:
            key = f"broadcast_from/{src}/{self.broadcast_recv_src_counter[src]}"
            recv_obj = pickle.loads(self.store.get(key))
            self.broadcast_recv_src_counter[src] += 1
            return recv_obj

    def all_gather_obj(self, obj: Any) -> list[Any]:
        """All gather an object from all ranks."""
        gathered_objs = []
        for i in range(self.world_size):
            if i == self.rank:
                gathered_objs.append(obj)
                self.broadcast_obj(obj, src=self.rank)
            else:
                recv_obj = self.broadcast_obj(None, src=i)
                gathered_objs.append(recv_obj)
        return gathered_objs

    def barrier(self, timeout: float = 30.0):
        """A robust barrier to synchronize all ranks.


        Uses a multi-phase approach to ensure all processes reach the barrier
        before proceeding:

        1. Each process signals it has reached the barrier

        2. Each process signals that it has confirmed the arrival of all other
        ranks.

        3. Rank 0 waits for all other ranks to signal their departure to ensure
        that all ranks have departed the barrier first.

        Args:
            timeout: Maximum time in seconds to wait for each phase (in seconds)


        Raises:
            RuntimeError: If coordination fails or times out
        """
        # Generate a barrier ID that is globally unique
        try:
            if self.rank == 0:
                barrier_id = f"barrier_{uuid.uuid4()}"
                self.broadcast_obj(barrier_id, src=0)
            else:
                barrier_id = self.broadcast_obj(None, src=0)
        except Exception as e:
            raise RuntimeError("Failed to broadcast barrier_id") from e

        # Phase 1: Signal arrival at barrier
        # Wait for all processes to arrive
        # We need all ranks to confirm the arrival of all other ranks.
        # This is the key synchronization point.
        arrival_key = f"arrival_{barrier_id}_{self.rank}"
        try:
            self.store.set(arrival_key, b"1")
        except Exception as e:
            raise RuntimeError("Failed to signal barrier arrival") from e

        start_time = time.time()
        processes_arrived: set[int] = set()

        while len(processes_arrived) < self.world_size:
            # Check for timeout
            cur_time = time.time()
            if cur_time - start_time > timeout:
                raise RuntimeError(f"Barrier timed out after {timeout:.2f} seconds")

            # Check for each process
            for i in range(self.world_size):
                if i in processes_arrived:
                    continue

                key = f"arrival_{barrier_id}_{i}"
                try:
                    # Try to get the key - if it exists, we'll get a value
                    # If it doesn't exist, it will throw an exception
                    self.store.get(key)
                    processes_arrived.add(i)
                except KeyError:
                    # Key doesn't exist yet
                    pass
                except Exception as check_e:
                    logger.debug("Error checking key existence: %s", check_e)
                    sched_yield()

            # Short sleep to avoid tight polling
            if len(processes_arrived) < self.world_size:
                sched_yield()

        # Phase 2: Signal departure from barrier
        # We only care to block at this stage in rank 0, which runs the
        # server side of the TCPStore. We want to make sure that all
        # clients have departed the barrier before rank 0 in case the
        # next thing after the barrier is a shutdown, including tearing
        # down the TCPStore. Other ranks can exit the barrier immediately
        # after signaling their departure.
        departure_key = f"departure_{barrier_id}_{self.rank}"
        try:
            self.store.set(departure_key, b"1")
        except Exception as e:
            raise RuntimeError("Failed to signal barrier departure") from e

        if self.rank != 0:
            return

        # Make rank 0 wait for all processes to signal departure
        start_time = time.time()
        processes_departed: set[int] = set()

        while len(processes_departed) < self.world_size:
            # Check for timeout
            if time.time() - start_time > timeout:
                raise RuntimeError(
                    f"Barrier departure timed out after {timeout:.2f} seconds"
                )

            # Check for each process
            for i in range(self.world_size):
                if i in processes_departed:
                    continue

                key = f"departure_{barrier_id}_{i}"
                try:
                    # Try to get the key - if it exists, we'll get a value
                    # If it doesn't exist, it will throw an exception
                    self.store.get(key)
                    processes_departed.add(i)
                except KeyError:
                    # Key doesn't exist yet
                    pass
                except Exception as check_e:
                    logger.debug("Error checking key existence: %s", check_e)
                    sched_yield()

            # Short sleep to avoid tight polling
            if len(processes_departed) < self.world_size:
                sched_yield()

        # Clean up keys to avoid leaking memory in the store
        for i in range(self.world_size):
            try:
                self.store.delete_key(f"arrival_{barrier_id}_{i}")
            except Exception:
                logger.debug("Error deleting key: %s", f"arrival_{barrier_id}_{i}")

            try:
                self.store.delete_key(f"departure_{barrier_id}_{i}")
            except Exception:
                logger.debug("Error deleting key: %s", f"departure_{barrier_id}_{i}")

    @staticmethod
    def create(
        host: str,
        port: int,
        rank: int,
        world_size: int,
        data_expiration_seconds: int = 3600,
        store_timeout: int = 300,
    ) -> "StatelessProcessGroup":
        """A replacement for `torch.distributed.init_process_group` that does not
        pollute the global state.

        If we have process A and process B called `torch.distributed.init_process_group`
        to form a group, and then we want to form another group with process A, B, C,
        D, it is not possible in PyTorch, because process A and process B have already
        formed a group, and process C and process D cannot join that group. This
        function is a workaround for this issue.

        `torch.distributed.init_process_group` is a global call, while this function
        is a stateless call. It will return a `StatelessProcessGroup` object that can be
        used for exchanging metadata. With this function, process A and process B
        can call `StatelessProcessGroup.create` to form a group, and then process A, B,
        C, and D can call `StatelessProcessGroup.create` to form another group.
        """  # noqa
        launch_server = rank == 0
        if launch_server:
            # listen on the specified interface (instead of 0.0.0.0)
            listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            listen_socket.bind((host, port))
            listen_socket.listen()
            listen_fd = listen_socket.fileno()
        else:
            listen_socket = None
            listen_fd = None

        store = TCPStore(
            host_name=host,
            port=port,
            world_size=world_size,
            is_master=launch_server,
            timeout=timedelta(seconds=store_timeout),
            use_libuv=False,  # for now: github.com/pytorch/pytorch/pull/150215
            master_listen_fd=listen_fd,
        )

        return StatelessProcessGroup(
            rank=rank,
            world_size=world_size,
            store=store,
            socket=listen_socket,
            data_expiration_seconds=data_expiration_seconds,
        )

broadcast_recv_src_counter class-attribute instance-attribute

broadcast_recv_src_counter: dict[int, int] = field(
    default_factory=dict
)

broadcast_send_counter class-attribute instance-attribute

broadcast_send_counter: int = 0

data_expiration_seconds class-attribute instance-attribute

data_expiration_seconds: int = 3600

entries class-attribute instance-attribute

entries: deque[tuple[str, float]] = field(
    default_factory=deque
)

rank instance-attribute

rank: int

recv_src_counter class-attribute instance-attribute

recv_src_counter: dict[int, int] = field(
    default_factory=dict
)

send_dst_counter class-attribute instance-attribute

send_dst_counter: dict[int, int] = field(
    default_factory=dict
)

socket instance-attribute

socket: socket | None

store instance-attribute

store: Store

world_size instance-attribute

world_size: int

__init__

__init__(
    rank: int,
    world_size: int,
    store: Store,
    socket: socket | None,
    data_expiration_seconds: int = 3600,
    send_dst_counter: dict[int, int] = dict(),
    recv_src_counter: dict[int, int] = dict(),
    broadcast_send_counter: int = 0,
    broadcast_recv_src_counter: dict[int, int] = dict(),
    entries: deque[tuple[str, float]] = deque(),
) -> None

__post_init__

__post_init__()
Source code in vllm/distributed/utils.py
def __post_init__(self):
    assert self.rank < self.world_size
    self.send_dst_counter = {i: 0 for i in range(self.world_size)}
    self.recv_src_counter = {i: 0 for i in range(self.world_size)}
    self.broadcast_recv_src_counter = {i: 0 for i in range(self.world_size)}

all_gather_obj

all_gather_obj(obj: Any) -> list[Any]

All gather an object from all ranks.

Source code in vllm/distributed/utils.py
def all_gather_obj(self, obj: Any) -> list[Any]:
    """All gather an object from all ranks."""
    gathered_objs = []
    for i in range(self.world_size):
        if i == self.rank:
            gathered_objs.append(obj)
            self.broadcast_obj(obj, src=self.rank)
        else:
            recv_obj = self.broadcast_obj(None, src=i)
            gathered_objs.append(recv_obj)
    return gathered_objs

barrier

barrier(timeout: float = 30.0)

A robust barrier to synchronize all ranks.

Uses a multi-phase approach to ensure all processes reach the barrier before proceeding:

  1. Each process signals it has reached the barrier

  2. Each process signals that it has confirmed the arrival of all other ranks.

  3. Rank 0 waits for all other ranks to signal their departure to ensure that all ranks have departed the barrier first.

Parameters:

Name Type Description Default
timeout float

Maximum time in seconds to wait for each phase (in seconds)

30.0

Raises:

Type Description
RuntimeError

If coordination fails or times out

Source code in vllm/distributed/utils.py
def barrier(self, timeout: float = 30.0):
    """A robust barrier to synchronize all ranks.


    Uses a multi-phase approach to ensure all processes reach the barrier
    before proceeding:

    1. Each process signals it has reached the barrier

    2. Each process signals that it has confirmed the arrival of all other
    ranks.

    3. Rank 0 waits for all other ranks to signal their departure to ensure
    that all ranks have departed the barrier first.

    Args:
        timeout: Maximum time in seconds to wait for each phase (in seconds)


    Raises:
        RuntimeError: If coordination fails or times out
    """
    # Generate a barrier ID that is globally unique
    try:
        if self.rank == 0:
            barrier_id = f"barrier_{uuid.uuid4()}"
            self.broadcast_obj(barrier_id, src=0)
        else:
            barrier_id = self.broadcast_obj(None, src=0)
    except Exception as e:
        raise RuntimeError("Failed to broadcast barrier_id") from e

    # Phase 1: Signal arrival at barrier
    # Wait for all processes to arrive
    # We need all ranks to confirm the arrival of all other ranks.
    # This is the key synchronization point.
    arrival_key = f"arrival_{barrier_id}_{self.rank}"
    try:
        self.store.set(arrival_key, b"1")
    except Exception as e:
        raise RuntimeError("Failed to signal barrier arrival") from e

    start_time = time.time()
    processes_arrived: set[int] = set()

    while len(processes_arrived) < self.world_size:
        # Check for timeout
        cur_time = time.time()
        if cur_time - start_time > timeout:
            raise RuntimeError(f"Barrier timed out after {timeout:.2f} seconds")

        # Check for each process
        for i in range(self.world_size):
            if i in processes_arrived:
                continue

            key = f"arrival_{barrier_id}_{i}"
            try:
                # Try to get the key - if it exists, we'll get a value
                # If it doesn't exist, it will throw an exception
                self.store.get(key)
                processes_arrived.add(i)
            except KeyError:
                # Key doesn't exist yet
                pass
            except Exception as check_e:
                logger.debug("Error checking key existence: %s", check_e)
                sched_yield()

        # Short sleep to avoid tight polling
        if len(processes_arrived) < self.world_size:
            sched_yield()

    # Phase 2: Signal departure from barrier
    # We only care to block at this stage in rank 0, which runs the
    # server side of the TCPStore. We want to make sure that all
    # clients have departed the barrier before rank 0 in case the
    # next thing after the barrier is a shutdown, including tearing
    # down the TCPStore. Other ranks can exit the barrier immediately
    # after signaling their departure.
    departure_key = f"departure_{barrier_id}_{self.rank}"
    try:
        self.store.set(departure_key, b"1")
    except Exception as e:
        raise RuntimeError("Failed to signal barrier departure") from e

    if self.rank != 0:
        return

    # Make rank 0 wait for all processes to signal departure
    start_time = time.time()
    processes_departed: set[int] = set()

    while len(processes_departed) < self.world_size:
        # Check for timeout
        if time.time() - start_time > timeout:
            raise RuntimeError(
                f"Barrier departure timed out after {timeout:.2f} seconds"
            )

        # Check for each process
        for i in range(self.world_size):
            if i in processes_departed:
                continue

            key = f"departure_{barrier_id}_{i}"
            try:
                # Try to get the key - if it exists, we'll get a value
                # If it doesn't exist, it will throw an exception
                self.store.get(key)
                processes_departed.add(i)
            except KeyError:
                # Key doesn't exist yet
                pass
            except Exception as check_e:
                logger.debug("Error checking key existence: %s", check_e)
                sched_yield()

        # Short sleep to avoid tight polling
        if len(processes_departed) < self.world_size:
            sched_yield()

    # Clean up keys to avoid leaking memory in the store
    for i in range(self.world_size):
        try:
            self.store.delete_key(f"arrival_{barrier_id}_{i}")
        except Exception:
            logger.debug("Error deleting key: %s", f"arrival_{barrier_id}_{i}")

        try:
            self.store.delete_key(f"departure_{barrier_id}_{i}")
        except Exception:
            logger.debug("Error deleting key: %s", f"departure_{barrier_id}_{i}")

broadcast_obj

broadcast_obj(obj: Any | None, src: int) -> Any

Broadcast an object from a source rank to all other ranks. It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.

Source code in vllm/distributed/utils.py
def broadcast_obj(self, obj: Any | None, src: int) -> Any:
    """Broadcast an object from a source rank to all other ranks.
    It does not clean up after all ranks have received the object.
    Use it for limited times, e.g., for initialization.
    """
    if self.rank == src:
        self.expire_data()
        key = f"broadcast_from/{src}/{self.broadcast_send_counter}"
        self.store.set(key, pickle.dumps(obj))
        self.broadcast_send_counter += 1
        self.entries.append((key, time.time()))
        return obj
    else:
        key = f"broadcast_from/{src}/{self.broadcast_recv_src_counter[src]}"
        recv_obj = pickle.loads(self.store.get(key))
        self.broadcast_recv_src_counter[src] += 1
        return recv_obj

create staticmethod

create(
    host: str,
    port: int,
    rank: int,
    world_size: int,
    data_expiration_seconds: int = 3600,
    store_timeout: int = 300,
) -> StatelessProcessGroup

A replacement for torch.distributed.init_process_group that does not pollute the global state.

If we have process A and process B called torch.distributed.init_process_group to form a group, and then we want to form another group with process A, B, C, D, it is not possible in PyTorch, because process A and process B have already formed a group, and process C and process D cannot join that group. This function is a workaround for this issue.

torch.distributed.init_process_group is a global call, while this function is a stateless call. It will return a StatelessProcessGroup object that can be used for exchanging metadata. With this function, process A and process B can call StatelessProcessGroup.create to form a group, and then process A, B, C, and D can call StatelessProcessGroup.create to form another group.

Source code in vllm/distributed/utils.py
@staticmethod
def create(
    host: str,
    port: int,
    rank: int,
    world_size: int,
    data_expiration_seconds: int = 3600,
    store_timeout: int = 300,
) -> "StatelessProcessGroup":
    """A replacement for `torch.distributed.init_process_group` that does not
    pollute the global state.

    If we have process A and process B called `torch.distributed.init_process_group`
    to form a group, and then we want to form another group with process A, B, C,
    D, it is not possible in PyTorch, because process A and process B have already
    formed a group, and process C and process D cannot join that group. This
    function is a workaround for this issue.

    `torch.distributed.init_process_group` is a global call, while this function
    is a stateless call. It will return a `StatelessProcessGroup` object that can be
    used for exchanging metadata. With this function, process A and process B
    can call `StatelessProcessGroup.create` to form a group, and then process A, B,
    C, and D can call `StatelessProcessGroup.create` to form another group.
    """  # noqa
    launch_server = rank == 0
    if launch_server:
        # listen on the specified interface (instead of 0.0.0.0)
        listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        listen_socket.bind((host, port))
        listen_socket.listen()
        listen_fd = listen_socket.fileno()
    else:
        listen_socket = None
        listen_fd = None

    store = TCPStore(
        host_name=host,
        port=port,
        world_size=world_size,
        is_master=launch_server,
        timeout=timedelta(seconds=store_timeout),
        use_libuv=False,  # for now: github.com/pytorch/pytorch/pull/150215
        master_listen_fd=listen_fd,
    )

    return StatelessProcessGroup(
        rank=rank,
        world_size=world_size,
        store=store,
        socket=listen_socket,
        data_expiration_seconds=data_expiration_seconds,
    )

expire_data

expire_data()

Expire data that is older than data_expiration_seconds seconds.

Source code in vllm/distributed/utils.py
def expire_data(self):
    """Expire data that is older than `data_expiration_seconds` seconds."""
    while self.entries:
        # check the oldest entry
        key, timestamp = self.entries[0]
        if time.time() - timestamp > self.data_expiration_seconds:
            self.store.delete_key(key)
            self.entries.popleft()
        else:
            break

recv_obj

recv_obj(src: int) -> Any

Receive an object from a source rank.

Source code in vllm/distributed/utils.py
def recv_obj(self, src: int) -> Any:
    """Receive an object from a source rank."""
    obj = pickle.loads(
        self.store.get(f"send_to/{self.rank}/{self.recv_src_counter[src]}")
    )
    self.recv_src_counter[src] += 1
    return obj

send_obj

send_obj(obj: Any, dst: int)

Send an object to a destination rank.

Source code in vllm/distributed/utils.py
def send_obj(self, obj: Any, dst: int):
    """Send an object to a destination rank."""
    self.expire_data()
    key = f"send_to/{dst}/{self.send_dst_counter[dst]}"
    self.store.set(key, pickle.dumps(obj))
    self.send_dst_counter[dst] += 1
    self.entries.append((key, time.time()))

get_ep_group

get_ep_group() -> GroupCoordinator
Source code in vllm/distributed/parallel_state.py
def get_ep_group() -> GroupCoordinator:
    assert _EP is not None, "expert parallel group is not initialized"
    return _EP

get_node_count

get_node_count() -> int

Return the total number of nodes in the distributed environment.

Source code in vllm/distributed/parallel_state.py
def get_node_count() -> int:
    """Return the total number of nodes in the distributed environment."""
    assert _NODE_COUNT is not None, "distributed environment is not initialized"
    return _NODE_COUNT

in_the_same_node_as

in_the_same_node_as(
    pg: ProcessGroup | StatelessProcessGroup,
    source_rank: int = 0,
) -> list[bool]

This is a collective operation that returns if each rank is in the same node as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory).

Source code in vllm/distributed/parallel_state.py
def in_the_same_node_as(
    pg: ProcessGroup | StatelessProcessGroup, source_rank: int = 0
) -> list[bool]:
    """
    This is a collective operation that returns if each rank is in the same node
    as the source rank. It tests if processes are attached to the same
    memory system (shared access to shared memory).
    """
    if isinstance(pg, ProcessGroup):
        assert torch.distributed.get_backend(pg) != torch.distributed.Backend.NCCL, (
            "in_the_same_node_as should be tested with a non-NCCL group."
        )
        # local rank inside the group
        rank = torch.distributed.get_rank(group=pg)
        world_size = torch.distributed.get_world_size(group=pg)

        # global ranks of the processes in the group
        ranks = torch.distributed.get_process_group_ranks(pg)
    else:
        rank = pg.rank
        world_size = pg.world_size
        ranks = list(range(world_size))

    # local tensor in each process to store the result
    is_in_the_same_node = torch.tensor(
        [0] * world_size, dtype=torch.int32, device="cpu"
    )

    magic_message = b"magic_message"
    shm = None

    try:
        with contextlib.suppress(OSError):
            if rank == source_rank:
                # create a shared memory segment
                shm = shared_memory.SharedMemory(create=True, size=128)
                shm.buf[: len(magic_message)] = magic_message
                if isinstance(pg, ProcessGroup):
                    torch.distributed.broadcast_object_list(
                        [shm.name], src=ranks[source_rank], group=pg
                    )
                else:
                    pg.broadcast_obj(shm.name, src=source_rank)
                is_in_the_same_node[rank] = 1
            else:
                # try to open the shared memory segment
                if isinstance(pg, ProcessGroup):
                    recv = [None]
                    torch.distributed.broadcast_object_list(
                        recv, src=ranks[source_rank], group=pg
                    )
                    name = recv[0]
                else:
                    name = pg.broadcast_obj(None, src=source_rank)
                # fix to https://stackoverflow.com/q/62748654/9191338
                # Python incorrectly tracks shared memory even if it is not
                # created by the process. The following patch is a workaround.
                with patch(
                    "multiprocessing.resource_tracker.register",
                    lambda *args, **kwargs: None,
                ):
                    shm = shared_memory.SharedMemory(name=name)
                if shm.buf[: len(magic_message)] == magic_message:
                    is_in_the_same_node[rank] = 1
    except Exception as e:
        logger.error("Error ignored in is_in_the_same_node: %s", e)
    finally:
        if shm:
            shm.close()

    if isinstance(pg, ProcessGroup):
        torch.distributed.barrier(group=pg)
    else:
        pg.barrier()

    # clean up the shared memory segment
    with contextlib.suppress(OSError):
        if rank == source_rank and shm:
            shm.unlink()

    if isinstance(pg, ProcessGroup):
        torch.distributed.all_reduce(is_in_the_same_node, group=pg)
        aggregated_data = is_in_the_same_node
    else:
        aggregated_data = torch.zeros_like(is_in_the_same_node)
        for i in range(world_size):
            rank_data = pg.broadcast_obj(is_in_the_same_node, src=i)
            aggregated_data += rank_data

    return [x == 1 for x in aggregated_data.tolist()]

init_logger

init_logger(name: str) -> _VllmLogger

The main purpose of this function is to ensure that loggers are retrieved in such a way that we can be sure the root vllm logger has already been configured.

Source code in vllm/logger.py
def init_logger(name: str) -> _VllmLogger:
    """The main purpose of this function is to ensure that loggers are
    retrieved in such a way that we can be sure the root vllm logger has
    already been configured."""

    logger = logging.getLogger(name)

    for method_name, method in _METHODS_TO_PATCH.items():
        setattr(logger, method_name, MethodType(method, logger))

    return cast(_VllmLogger, logger)

rearrange_expert_weights_inplace

rearrange_expert_weights_inplace(
    old_global_expert_indices: Tensor,
    new_global_expert_indices: Tensor,
    expert_weights: Sequence[Iterable[Tensor]],
    ep_group: ProcessGroup,
    is_profile: bool = False,
    rank_mapping: dict[int, int] | None = None,
) -> None

Rearranges the expert weights in place according to the new expert indices.

The value of the indices arguments are logical indices of the experts, while keys are physical.

Parameters:

Name Type Description Default
old_global_expert_indices Tensor

Shape (num_moe_layers, num_physical_experts).

required
new_global_expert_indices Tensor

Shape (num_moe_layers, num_physical_experts).

required
expert_weights Sequence[Iterable[Tensor]]

A sequence of shape (num_moe_layers)(weight_count) of tensors of shape (num_local_physical_experts, hidden_size_i). For example, a linear layer may have up and down projection, so weight_count = 2. Each weight's hidden size can be different.

required
ep_group ProcessGroup

The device process group for expert parallelism.

required
is_profile bool

If True, do not perform any actual weight copy. This is used during profile run, where we only perform dummy communications to reserve enough memory for the buffers.

False
rank_mapping dict[int, int] | None

A dictionary mapping old rank to new rank.

None
Source code in vllm/distributed/eplb/rebalance_execute.py
def rearrange_expert_weights_inplace(
    old_global_expert_indices: torch.Tensor,
    new_global_expert_indices: torch.Tensor,
    expert_weights: Sequence[Iterable[torch.Tensor]],
    ep_group: ProcessGroup,
    is_profile: bool = False,
    rank_mapping: dict[int, int] | None = None,
) -> None:
    """
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.
        rank_mapping: A dictionary mapping old rank to new rank.
    """
    if rank_mapping is not None:
        if len(rank_mapping) == ep_group.size():
            # scale down
            new_global_expert_indices = _map_new_expert_indices_with_rank_mapping(
                new_global_expert_indices,
                rank_mapping,
            )
        else:
            # scale up
            old_global_expert_indices = _map_old_expert_indices_with_rank_mapping(
                old_global_expert_indices,
                rank_mapping,
                ep_group.size(),
            )

    assert old_global_expert_indices.shape[1] == new_global_expert_indices.shape[1]

    num_moe_layers, num_physical_experts = old_global_expert_indices.shape
    assert len(expert_weights) == num_moe_layers

    num_local_physical_experts = next(iter(expert_weights[0])).shape[0]
    assert new_global_expert_indices.shape == (num_moe_layers, num_physical_experts)

    ep_rank = ep_group.rank()
    ep_size = ep_group.size()
    assert num_physical_experts == ep_size * num_local_physical_experts

    # A buffer to hold the expert weights in one layer during the exchange.
    # NOTE: Currently we assume the same weights across different layers
    # have the same shape.
    expert_weights_buffer = [torch.empty_like(w) for w in expert_weights[0]]

    if is_profile:
        # Maximum send size is to send all local experts to all ranks,
        # So we use a dummy `all_gather` to reserve enough communication buffer
        for weight, buffer in zip(expert_weights[0], expert_weights_buffer):
            # A `/dev/null`-like buffer to avoid real memory allocation
            dummy_recv_buffer = [buffer for _ in range(ep_size)]
            # NOTE(bowen): Needed this barrier to avoid OOM during actual
            # execution. I'm not very sure why this is needed
            torch.distributed.barrier()
            all_gather(
                dummy_recv_buffer,
                weight,
                group=ep_group,
            )
        return

    for layer in range(num_moe_layers):
        # NOTE(bowen): We need this synchronize to run, but I don't know why.
        # If you figure out the reason, please let me know -- thank you!
        torch.cuda.synchronize()
        shuffle_layer(
            num_local_physical_experts,
            ep_rank,
            old_global_expert_indices[layer].tolist(),
            new_global_expert_indices[layer].tolist(),
            expert_weights[layer],
            expert_weights_buffer,
            ep_group,
        )

rebalance_experts

rebalance_experts(
    weight: Tensor,
    num_replicas: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
) -> tuple[Tensor, Tensor, Tensor]

Entry point for expert-parallelism load balancer.

Parameters:

Name Type Description Default
weight Tensor

[layers, num_logical_experts], the load statistics for all logical experts

required
num_replicas int

number of physical experts, must be a multiple of num_gpus

required
num_groups int

number of expert groups

required
num_nodes int

number of server nodes, where the intra-node network (e.g, NVLink) is faster

required
num_gpus int

number of GPUs, must be a multiple of num_nodes

required

Returns:

Name Type Description
physical_to_logical_map Tensor

[layers, num_replicas], the expert index of each replica

logical_to_physical_map Tensor

[layers, num_logical_experts, X], the replica indices for each expert

expert_count Tensor

[layers, num_logical_experts], number of physical replicas for each logical expert

Source code in vllm/distributed/eplb/rebalance_algo.py
def rebalance_experts(
    weight: torch.Tensor,
    num_replicas: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """
    Entry point for expert-parallelism load balancer.

    Parameters:
        weight: [layers, num_logical_experts], the load statistics for all
            logical experts
        num_replicas: number of physical experts, must be a multiple of
            `num_gpus`
        num_groups: number of expert groups
        num_nodes: number of server nodes, where the intra-node network
            (e.g, NVLink) is faster
        num_gpus: number of GPUs, must be a multiple of `num_nodes`

    Returns:
        physical_to_logical_map:
            [layers, num_replicas], the expert index of each replica
        logical_to_physical_map:
            [layers, num_logical_experts, X], the replica indices for each
            expert
        expert_count:
            [layers, num_logical_experts], number of physical
            replicas for each logical expert
    """
    num_layers, num_logical_experts = weight.shape
    weight = weight.float().cpu()
    if num_groups % num_nodes == 0:
        # use hierarchical load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, num_groups, num_nodes, num_gpus
        )
    else:
        # use global load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, 1, 1, num_gpus
        )
    num_redundant_experts = num_replicas - num_logical_experts
    maxlogcnt = num_redundant_experts + 1
    log2phy: torch.Tensor = torch.full(
        (num_layers, num_logical_experts, maxlogcnt),
        -1,
        dtype=torch.int64,
        device=logcnt.device,
    )
    log2phy.view(num_layers, -1).scatter_(
        -1,
        phy2log * maxlogcnt + phyrank,
        torch.arange(num_replicas, dtype=torch.int64, device=log2phy.device).expand(
            num_layers, -1
        ),
    )
    return phy2log, log2phy, logcnt