[CI] Fix mypy for vllm/executor (#26845)
Signed-off-by: yewentao256 <zhyanwentao@126.com>
This commit is contained in:
@@ -28,6 +28,7 @@ FILES = [
|
|||||||
"vllm/assets",
|
"vllm/assets",
|
||||||
"vllm/distributed",
|
"vllm/distributed",
|
||||||
"vllm/entrypoints",
|
"vllm/entrypoints",
|
||||||
|
"vllm/executor",
|
||||||
"vllm/inputs",
|
"vllm/inputs",
|
||||||
"vllm/logging_utils",
|
"vllm/logging_utils",
|
||||||
"vllm/multimodal",
|
"vllm/multimodal",
|
||||||
@@ -44,7 +45,6 @@ SEPARATE_GROUPS = [
|
|||||||
"vllm/attention",
|
"vllm/attention",
|
||||||
"vllm/compilation",
|
"vllm/compilation",
|
||||||
"vllm/engine",
|
"vllm/engine",
|
||||||
"vllm/executor",
|
|
||||||
"vllm/inputs",
|
"vllm/inputs",
|
||||||
"vllm/lora",
|
"vllm/lora",
|
||||||
"vllm/model_executor",
|
"vllm/model_executor",
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ from vllm.lora.request import LoRARequest
|
|||||||
from vllm.sequence import ExecuteModelRequest
|
from vllm.sequence import ExecuteModelRequest
|
||||||
from vllm.tasks import SupportedTask
|
from vllm.tasks import SupportedTask
|
||||||
from vllm.utils import make_async
|
from vllm.utils import make_async
|
||||||
from vllm.v1.outputs import PoolerOutput, SamplerOutput
|
from vllm.v1.outputs import SamplerOutput
|
||||||
from vllm.v1.worker.worker_base import WorkerBase
|
from vllm.v1.worker.worker_base import WorkerBase
|
||||||
|
|
||||||
logger = init_logger(__name__)
|
logger = init_logger(__name__)
|
||||||
@@ -54,7 +54,7 @@ class ExecutorBase(ABC):
|
|||||||
self._init_executor()
|
self._init_executor()
|
||||||
self.is_sleeping = False
|
self.is_sleeping = False
|
||||||
self.sleeping_tags: set[str] = set()
|
self.sleeping_tags: set[str] = set()
|
||||||
self.kv_output_aggregator = None
|
self.kv_output_aggregator: KVOutputAggregator | None = None
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def _init_executor(self) -> None:
|
def _init_executor(self) -> None:
|
||||||
@@ -143,8 +143,9 @@ class ExecutorBase(ABC):
|
|||||||
|
|
||||||
def execute_model(
|
def execute_model(
|
||||||
self, execute_model_req: ExecuteModelRequest
|
self, execute_model_req: ExecuteModelRequest
|
||||||
) -> list[SamplerOutput | PoolerOutput] | None:
|
) -> list[SamplerOutput]:
|
||||||
output = self.collective_rpc("execute_model", args=(execute_model_req,))
|
output = self.collective_rpc("execute_model", args=(execute_model_req,))
|
||||||
|
assert output[0] is not None
|
||||||
return output[0]
|
return output[0]
|
||||||
|
|
||||||
def stop_remote_worker_execution_loop(self) -> None:
|
def stop_remote_worker_execution_loop(self) -> None:
|
||||||
|
|||||||
@@ -217,7 +217,9 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
num_gpus=num_gpus,
|
num_gpus=num_gpus,
|
||||||
scheduling_strategy=scheduling_strategy,
|
scheduling_strategy=scheduling_strategy,
|
||||||
**ray_remote_kwargs,
|
**ray_remote_kwargs,
|
||||||
)(RayWorkerWrapper).remote(vllm_config=self.vllm_config, rpc_rank=rank)
|
)(RayWorkerWrapper).remote( # type: ignore[attr-defined]
|
||||||
|
vllm_config=self.vllm_config, rpc_rank=rank
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
worker = ray.remote(
|
worker = ray.remote(
|
||||||
num_cpus=0,
|
num_cpus=0,
|
||||||
@@ -225,7 +227,9 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
resources={current_platform.ray_device_key: num_gpus},
|
resources={current_platform.ray_device_key: num_gpus},
|
||||||
scheduling_strategy=scheduling_strategy,
|
scheduling_strategy=scheduling_strategy,
|
||||||
**ray_remote_kwargs,
|
**ray_remote_kwargs,
|
||||||
)(RayWorkerWrapper).remote(vllm_config=self.vllm_config, rpc_rank=rank)
|
)(RayWorkerWrapper).remote( # type: ignore[attr-defined]
|
||||||
|
vllm_config=self.vllm_config, rpc_rank=rank
|
||||||
|
)
|
||||||
worker_metadata.append(RayWorkerMetaData(worker=worker, created_rank=rank))
|
worker_metadata.append(RayWorkerMetaData(worker=worker, created_rank=rank))
|
||||||
|
|
||||||
worker_ips = ray.get(
|
worker_ips = ray.get(
|
||||||
@@ -303,7 +307,7 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
continue
|
continue
|
||||||
worker_node_and_gpu_ids.append(
|
worker_node_and_gpu_ids.append(
|
||||||
ray.get(worker.get_node_and_gpu_ids.remote())
|
ray.get(worker.get_node_and_gpu_ids.remote())
|
||||||
) # type: ignore
|
) # type: ignore[attr-defined]
|
||||||
|
|
||||||
node_workers = defaultdict(list) # node id -> list of worker ranks
|
node_workers = defaultdict(list) # node id -> list of worker ranks
|
||||||
node_gpus = defaultdict(list) # node id -> list of gpu ids
|
node_gpus = defaultdict(list) # node id -> list of gpu ids
|
||||||
@@ -495,7 +499,9 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
if async_run_tensor_parallel_workers_only:
|
if async_run_tensor_parallel_workers_only:
|
||||||
ray_workers = self.non_driver_workers
|
ray_workers = self.non_driver_workers
|
||||||
ray_worker_outputs = [
|
ray_worker_outputs = [
|
||||||
worker.execute_method.remote(sent_method, *args, **kwargs)
|
worker.execute_method.remote( # type: ignore[attr-defined]
|
||||||
|
sent_method, *args, **kwargs
|
||||||
|
)
|
||||||
for worker in ray_workers
|
for worker in ray_workers
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -715,7 +721,7 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
tasks.append(
|
tasks.append(
|
||||||
asyncio.create_task(
|
asyncio.create_task(
|
||||||
_run_task_with_lock(
|
_run_task_with_lock(
|
||||||
driver_worker.execute_method.remote,
|
driver_worker.execute_method.remote, # type: ignore[attr-defined]
|
||||||
self.pp_locks[pp_rank],
|
self.pp_locks[pp_rank],
|
||||||
"execute_model",
|
"execute_model",
|
||||||
execute_model_req,
|
execute_model_req,
|
||||||
@@ -733,7 +739,7 @@ class RayDistributedExecutor(DistributedExecutorBase):
|
|||||||
"worker loop is disabled for VLLM_USE_RAY_SPMD_WORKER=1"
|
"worker loop is disabled for VLLM_USE_RAY_SPMD_WORKER=1"
|
||||||
)
|
)
|
||||||
coros = [
|
coros = [
|
||||||
worker.execute_method.remote("start_worker_execution_loop")
|
worker.execute_method.remote("start_worker_execution_loop") # type: ignore[attr-defined]
|
||||||
for worker in self.non_driver_workers
|
for worker in self.non_driver_workers
|
||||||
]
|
]
|
||||||
return await asyncio.gather(*coros)
|
return await asyncio.gather(*coros)
|
||||||
|
|||||||
@@ -90,14 +90,17 @@ try:
|
|||||||
|
|
||||||
execute_model_req = self.input_decoder.decode(serialized_req)
|
execute_model_req = self.input_decoder.decode(serialized_req)
|
||||||
|
|
||||||
|
assert self.worker is not None, "Worker is not initialized"
|
||||||
|
|
||||||
# TODO(swang): This is needed right now because Ray Compiled Graph
|
# TODO(swang): This is needed right now because Ray Compiled Graph
|
||||||
# executes on a background thread, so we need to reset torch's
|
# executes on a background thread, so we need to reset torch's
|
||||||
# current device.
|
# current device.
|
||||||
if not self.compiled_dag_cuda_device_set:
|
if not self.compiled_dag_cuda_device_set:
|
||||||
|
assert self.worker.device is not None
|
||||||
current_platform.set_device(self.worker.device)
|
current_platform.set_device(self.worker.device)
|
||||||
self.compiled_dag_cuda_device_set = True
|
self.compiled_dag_cuda_device_set = True
|
||||||
|
|
||||||
output = self.worker._execute_model_spmd(
|
output = self.worker._execute_model_spmd( # type: ignore[attr-defined]
|
||||||
execute_model_req, intermediate_tensors
|
execute_model_req, intermediate_tensors
|
||||||
)
|
)
|
||||||
# Pipeline model request and output to the next pipeline stage.
|
# Pipeline model request and output to the next pipeline stage.
|
||||||
@@ -119,6 +122,7 @@ try:
|
|||||||
# Not needed
|
# Not needed
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
assert self.worker.device is not None
|
||||||
current_platform.set_device(self.worker.device)
|
current_platform.set_device(self.worker.device)
|
||||||
|
|
||||||
self.compiled_dag_cuda_device_set = True
|
self.compiled_dag_cuda_device_set = True
|
||||||
@@ -139,6 +143,7 @@ try:
|
|||||||
scheduler_output, intermediate_tensors = scheduler_output
|
scheduler_output, intermediate_tensors = scheduler_output
|
||||||
else:
|
else:
|
||||||
scheduler_output, intermediate_tensors = scheduler_output, None
|
scheduler_output, intermediate_tensors = scheduler_output, None
|
||||||
|
assert self.worker.model_runner is not None
|
||||||
output = self.worker.model_runner.execute_model(
|
output = self.worker.model_runner.execute_model(
|
||||||
scheduler_output, intermediate_tensors
|
scheduler_output, intermediate_tensors
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user