From 71557a5f7c221b63759a0d87c0b175b1bab243e6 Mon Sep 17 00:00:00 2001 From: Wentao Ye <44945378+yewentao256@users.noreply.github.com> Date: Wed, 15 Oct 2025 04:23:33 -0400 Subject: [PATCH] [CI] Fix mypy for `vllm/executor` (#26845) Signed-off-by: yewentao256 --- tools/pre_commit/mypy.py | 2 +- vllm/executor/executor_base.py | 7 ++++--- vllm/executor/ray_distributed_executor.py | 18 ++++++++++++------ vllm/executor/ray_utils.py | 7 ++++++- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/tools/pre_commit/mypy.py b/tools/pre_commit/mypy.py index 7fdfdb37a..a3aa54634 100755 --- a/tools/pre_commit/mypy.py +++ b/tools/pre_commit/mypy.py @@ -28,6 +28,7 @@ FILES = [ "vllm/assets", "vllm/distributed", "vllm/entrypoints", + "vllm/executor", "vllm/inputs", "vllm/logging_utils", "vllm/multimodal", @@ -44,7 +45,6 @@ SEPARATE_GROUPS = [ "vllm/attention", "vllm/compilation", "vllm/engine", - "vllm/executor", "vllm/inputs", "vllm/lora", "vllm/model_executor", diff --git a/vllm/executor/executor_base.py b/vllm/executor/executor_base.py index 2c44422ba..a5f83f904 100644 --- a/vllm/executor/executor_base.py +++ b/vllm/executor/executor_base.py @@ -18,7 +18,7 @@ from vllm.lora.request import LoRARequest from vllm.sequence import ExecuteModelRequest from vllm.tasks import SupportedTask from vllm.utils import make_async -from vllm.v1.outputs import PoolerOutput, SamplerOutput +from vllm.v1.outputs import SamplerOutput from vllm.v1.worker.worker_base import WorkerBase logger = init_logger(__name__) @@ -54,7 +54,7 @@ class ExecutorBase(ABC): self._init_executor() self.is_sleeping = False self.sleeping_tags: set[str] = set() - self.kv_output_aggregator = None + self.kv_output_aggregator: KVOutputAggregator | None = None @abstractmethod def _init_executor(self) -> None: @@ -143,8 +143,9 @@ class ExecutorBase(ABC): def execute_model( self, execute_model_req: ExecuteModelRequest - ) -> list[SamplerOutput | PoolerOutput] | None: + ) -> list[SamplerOutput]: output = self.collective_rpc("execute_model", args=(execute_model_req,)) + assert output[0] is not None return output[0] def stop_remote_worker_execution_loop(self) -> None: diff --git a/vllm/executor/ray_distributed_executor.py b/vllm/executor/ray_distributed_executor.py index 943c6a27f..59e282ac9 100644 --- a/vllm/executor/ray_distributed_executor.py +++ b/vllm/executor/ray_distributed_executor.py @@ -217,7 +217,9 @@ class RayDistributedExecutor(DistributedExecutorBase): num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerWrapper).remote(vllm_config=self.vllm_config, rpc_rank=rank) + )(RayWorkerWrapper).remote( # type: ignore[attr-defined] + vllm_config=self.vllm_config, rpc_rank=rank + ) else: worker = ray.remote( num_cpus=0, @@ -225,7 +227,9 @@ class RayDistributedExecutor(DistributedExecutorBase): resources={current_platform.ray_device_key: num_gpus}, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerWrapper).remote(vllm_config=self.vllm_config, rpc_rank=rank) + )(RayWorkerWrapper).remote( # type: ignore[attr-defined] + vllm_config=self.vllm_config, rpc_rank=rank + ) worker_metadata.append(RayWorkerMetaData(worker=worker, created_rank=rank)) worker_ips = ray.get( @@ -303,7 +307,7 @@ class RayDistributedExecutor(DistributedExecutorBase): continue worker_node_and_gpu_ids.append( ray.get(worker.get_node_and_gpu_ids.remote()) - ) # type: ignore + ) # type: ignore[attr-defined] node_workers = defaultdict(list) # node id -> list of worker ranks node_gpus = defaultdict(list) # node id -> list of gpu ids @@ -495,7 +499,9 @@ class RayDistributedExecutor(DistributedExecutorBase): if async_run_tensor_parallel_workers_only: ray_workers = self.non_driver_workers ray_worker_outputs = [ - worker.execute_method.remote(sent_method, *args, **kwargs) + worker.execute_method.remote( # type: ignore[attr-defined] + sent_method, *args, **kwargs + ) for worker in ray_workers ] @@ -715,7 +721,7 @@ class RayDistributedExecutor(DistributedExecutorBase): tasks.append( asyncio.create_task( _run_task_with_lock( - driver_worker.execute_method.remote, + driver_worker.execute_method.remote, # type: ignore[attr-defined] self.pp_locks[pp_rank], "execute_model", execute_model_req, @@ -733,7 +739,7 @@ class RayDistributedExecutor(DistributedExecutorBase): "worker loop is disabled for VLLM_USE_RAY_SPMD_WORKER=1" ) coros = [ - worker.execute_method.remote("start_worker_execution_loop") + worker.execute_method.remote("start_worker_execution_loop") # type: ignore[attr-defined] for worker in self.non_driver_workers ] return await asyncio.gather(*coros) diff --git a/vllm/executor/ray_utils.py b/vllm/executor/ray_utils.py index d12151bb9..ef5a99659 100644 --- a/vllm/executor/ray_utils.py +++ b/vllm/executor/ray_utils.py @@ -90,14 +90,17 @@ try: execute_model_req = self.input_decoder.decode(serialized_req) + assert self.worker is not None, "Worker is not initialized" + # TODO(swang): This is needed right now because Ray Compiled Graph # executes on a background thread, so we need to reset torch's # current device. if not self.compiled_dag_cuda_device_set: + assert self.worker.device is not None current_platform.set_device(self.worker.device) self.compiled_dag_cuda_device_set = True - output = self.worker._execute_model_spmd( + output = self.worker._execute_model_spmd( # type: ignore[attr-defined] execute_model_req, intermediate_tensors ) # Pipeline model request and output to the next pipeline stage. @@ -119,6 +122,7 @@ try: # Not needed pass else: + assert self.worker.device is not None current_platform.set_device(self.worker.device) self.compiled_dag_cuda_device_set = True @@ -139,6 +143,7 @@ try: scheduler_output, intermediate_tensors = scheduler_output else: scheduler_output, intermediate_tensors = scheduler_output, None + assert self.worker.model_runner is not None output = self.worker.model_runner.execute_model( scheduler_output, intermediate_tensors )