测试bert

测试bert

码农世界 2024-06-16 后端 97 次浏览 0个评论

测试bert_base不同并行方式下的推理性能

  • 一.测试数据
  • 二.测试步骤
    • 1.生成bert配置文件
    • 2.安装依赖
    • 3.deepspeed 4卡tp并行
    • 4.FSDP 4卡并行
    • 5.手动将权值平均拆到4张卡,单进程多卡推理
    • 6.手动切分成4份,基于NCCL实现pipeline并行

      本文测试了bert_base模型在不同并行方式下的推理性能

      约束

      • 1.当前服务器上GPU不支持P2P且链路仅为PCIE GEN1 X16

        可参考的点

        • deepspeed 推理的使用
        • FSDP推理的使用
        • 如果将权值拆到多卡
        • 自定义pipeline并行(切分网络并插入自定义修改)
        • 如何自动处理pytorch算子输入tensor不在同一个设备上的问题

          一.测试数据

          并行方式QPSGPU利用率
          deepspeed 4卡tp并行175.73rank:0 util:100.00
          rank:1 util:100.00
          rank:2 util:97.00
          rank:3 util:97.00
          FSDP 4卡并行137.80rank:0 util:40.00
          rank:1 util:40.00
          rank:2 util:39.00
          rank:3 util:40.00
          手动将权值平均拆到4张卡,单进程多卡推理29.34
          手动切分成4份,基于NCCL实现pipeline并行244.76rank:1 util:40.00
          rank:0 util:97.00
          rank:2 util:39.00
          rank:3 util:78.00

          二.测试步骤

          1.生成bert配置文件

          tee ./config.json <<-'EOF'
          {
            "architectures": [
              "BertForMaskedLM"
            ],
            "attention_probs_dropout_prob": 0.1,
            "directionality": "bidi",
            "hidden_act": "gelu",
            "hidden_dropout_prob": 0.1,
            "hidden_size": 768,
            "initializer_range": 0.02,
            "intermediate_size": 3072,
            "layer_norm_eps": 1e-12,
            "max_position_embeddings": 512,
            "model_type": "bert",
            "num_attention_heads": 12,
            "num_hidden_layers": 12,
            "pad_token_id": 0,
            "pooler_fc_size": 768,
            "pooler_num_attention_heads": 12,
            "pooler_num_fc_layers": 3,
            "pooler_size_per_head": 128,
            "pooler_type": "first_token_transform",
            "type_vocab_size": 2,
            "vocab_size": 21128
          }
          EOF
          

          2.安装依赖

          pip install nvidia-ml-py3
          

          3.deepspeed 4卡tp并行

          tee ds_bert_infer.py <<-'EOF'
          import torch
          import deepspeed
          import os
          from deepspeed.accelerator import get_accelerator
          import time
          import torch.distributed as dist
          import pynvml
          import numpy as np
          import threading
          #统计GPU利用率
          class PynvmlGPUUtilizationThread(threading.Thread):
              def __init__(self,device,interval=1):
                  super().__init__()
                  self.interval = interval
                  self.running = True
                  self.device=device        
                  self.handle = pynvml.nvmlDeviceGetHandleByIndex(device)
                  self.utilizations=[]
                  
              def run(self):
                  while self.running:
                      self.get_and_print_gpu_utilization()
                      time.sleep(self.interval)
              
              def stop(self):
                  self.running = False
              
              def get_and_print_gpu_utilization(self):
                  utilization = pynvml.nvmlDeviceGetUtilizationRates(self.handle)
                  self.utilizations.append(utilization.gpu)
                  
              def data(self):
                  return np.max(self.utilizations)
              
          def inference():    
              deepspeed.init_distributed(dist_backend='nccl')
              world_size = torch.distributed.get_world_size()
              local_rank=int(os.environ['LOCAL_RANK'])
              rank=torch.distributed.get_rank()
              pynvml.nvmlInit()
              
              torch.manual_seed(1)
              from transformers import AutoModelForMaskedLM,BertConfig
              config=BertConfig.from_pretrained("./config.json")
              model = AutoModelForMaskedLM.from_config(config)
              model.eval()
              engine = deepspeed.init_inference(model,
                                                  tensor_parallel={"tp_size": world_size},
                                                  dtype=torch.float32,
                                                  replace_with_kernel_inject=True)
              device=get_accelerator().current_device_name()
              input_tokens=torch.randint(0,config.vocab_size,(1,128)).to(device)
              epoch=1024
              gpu_thread = PynvmlGPUUtilizationThread(local_rank,interval=1)
              gpu_thread.start()    
              t0=time.time()
              for i in range(epoch):
                  outputs = engine(input_tokens)
              dist.barrier()
              torch.cuda.synchronize()
              t1=time.time()
              gpu_thread.stop()
              gpu_thread.join()       
              time.sleep(0.2*rank)        
              if rank==0:
                  qps=epoch/(t1-t0)
                  print(f"default stream qps:{qps:.2f}")
              print(f"rank:{rank} util:{gpu_thread.data():.2f}")
              
              stream_nbs=[1,2,4,8]    
              for n in stream_nbs:
                  dist.barrier()
                  if rank==0:
                      print("-----------------------------------------------")
                  streams=[torch.cuda.Stream() for _ in range(n)]
                  total_samples=0        
                  gpu_thread = PynvmlGPUUtilizationThread(local_rank,interval=1)
                  gpu_thread.start()
                  t0=time.time()
                  for _ in range(epoch//n):
                      for i in range(n):
                          with torch.cuda.stream(streams[i]):
                              total_samples+=1
                              outputs = engine(input_tokens)        
                  dist.barrier()
                  torch.cuda.synchronize()
                  t1=time.time()
                  gpu_thread.stop()
                  gpu_thread.join()    
                  time.sleep(0.2*rank)        
                  if rank==0:
                      qps=total_samples/(t1-t0)
                      print(f"{n} streams qps:{qps:.2f}")                
                  print(f"rank:{rank} util:{gpu_thread.data():.2f}")
                  
          if __name__ == "__main__":
              inference()
          EOF
          deepspeed --num_gpus=4 ds_bert_infer.py
          

          输出

          ------------------------------------------------------
          default stream qps: 147.10
          rank:0 util:90.00
          rank:1 util:86.00
          rank:2 util:89.00
          rank:3 util:89.00
          -----------------------------------------------
          1 streams qps:162.62
          rank:0 util:100.00
          rank:1 util:100.00
          rank:2 util:92.00
          rank:3 util:88.00
          -----------------------------------------------
          2 streams qps:177.31
          rank:0 util:100.00
          rank:1 util:100.00
          rank:2 util:99.00
          rank:3 util:98.00
          -----------------------------------------------
          4 streams qps:176.11
          rank:0 util:100.00
          rank:1 util:100.00
          rank:2 util:98.00
          rank:3 util:97.00
          -----------------------------------------------
          8 streams qps:175.73
          rank:0 util:100.00
          rank:1 util:100.00
          rank:2 util:97.00
          rank:3 util:97.00
          

          4.FSDP 4卡并行

          tee fsdp_bert_infer.py <<-'EOF'
          import time
          import os
          import torch
          import torch.distributed as dist
          import torch.multiprocessing as mp
          from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
          import torchvision.models as models
          import torch.nn as nn
          import torch.nn.init as init
          import time
          import pynvml
          import numpy as np
          import threading
          class PynvmlGPUUtilizationThread(threading.Thread):
              def __init__(self,device,interval=1):
                  super().__init__()
                  self.interval = interval
                  self.running = True
                  self.device=device        
                  self.handle = pynvml.nvmlDeviceGetHandleByIndex(device)
                  self.utilizations=[]
                  
              def run(self):
                  while self.running:
                      self.get_and_print_gpu_utilization()
                      time.sleep(self.interval)
              
              def stop(self):
                  self.running = False
              
              def get_and_print_gpu_utilization(self):
                  utilization = pynvml.nvmlDeviceGetUtilizationRates(self.handle)
                  self.utilizations.append(utilization.gpu)
                  
              def data(self):
                  return np.max(self.utilizations)
                  
          def cleanup():
              dist.destroy_process_group()
          def demo_fsdp(rank, world_size,multi_stream):
              
              pynvml.nvmlInit()
              device = torch.device(f"cuda:{rank}")
              torch.manual_seed(1)
              from transformers import AutoModelForMaskedLM,BertConfig
              config=BertConfig.from_pretrained("./config.json")
              model = AutoModelForMaskedLM.from_config(config)
              model.eval()
              
              fsdp_model = FSDP(model,forward_prefetch=True).to(device)
              input_tokens=torch.randint(0,config.vocab_size,(1,128)).to(device)
              epoch_sz=1024
              gpu_thread = PynvmlGPUUtilizationThread(local_rank,interval=1)
              gpu_thread.start()    
              sz=8
              total_sample=0
              streams=[torch.cuda.Stream() for _ in range(sz)]
              t0=time.time()
              for epoch in range(epoch_sz):
                  with torch.no_grad():
                      outputs=[]
                      if multi_stream:
                          for i in range(sz):
                              with torch.cuda.stream(streams[i]):
                                  total_sample+=1
                                  outputs.append(fsdp_model(input_tokens))
                      else:
                          output = fsdp_model(input_tokens)
                          total_sample+=1
              torch.cuda.synchronize(rank)
              t1=time.time()
              gpu_thread.stop()
              gpu_thread.join()       
              time.sleep(0.2*rank)        
              if rank==0:
                  qps=total_sample/(t1-t0)
                  print(f"qps:{qps:.2f}")
              print(f"rank:{rank} util:{gpu_thread.data():.2f}")
              cleanup()
          if __name__ == "__main__":
              dist.init_process_group(backend='nccl')
              world_size = torch.distributed.get_world_size()
              rank=torch.distributed.get_rank()
              local_rank=int(os.environ['LOCAL_RANK'])
              torch.cuda.set_device(local_rank)
              demo_fsdp(local_rank,world_size,True)
          EOF
          torchrun -m --nnodes=1 --nproc_per_node=4 fsdp_bert_infer
          

          输出

          qps:137.80
          rank:0 util:40.00
          rank:1 util:40.00
          rank:2 util:39.00
          rank:3 util:40.00
          

          5.手动将权值平均拆到4张卡,单进程多卡推理

          tee split_bert_infer.py <<-'EOF'
          import torch
          import os
          import time
          from torch.utils._python_dispatch import TorchDispatchMode
          from dataclasses import dataclass
          from typing import Any
          @dataclass
          class _ProfilerState:
              cls: Any
              object: Any = None
          class EmptyModule(torch.nn.Module):
              def __init__(self):
                  super(EmptyModule, self).__init__()
                  pass
              def forward(self,x):
                  return x
                  
          class TorchDumpDispatchMode(TorchDispatchMode):
              def __init__(self,parent):
                  super().__init__()
                  self.parent=parent
                  self.op_index=0
                  self.cvt_count=0
              def get_max_gpu_id(self,tensors):
                  max_gpu_id = -1
                  max_index = -1
                  tensor_index=[]
                  for i, tensor in enumerate(tensors):
                      if not isinstance(tensor, torch.Tensor):
                          continue
                      tensor_index.append(i)
                      if tensor.is_cuda:
                          gpu_id = tensor.get_device()
                          if gpu_id > max_gpu_id:
                              max_gpu_id = gpu_id
                              max_index = i
                  if max_gpu_id == -1:
                      return None, None,tensor_index
                  return max_index, max_gpu_id,tensor_index
              def convert(self,op_type,tensor_list):
                  index, gpu_id,tensor_index = self.get_max_gpu_id(tensor_list)
                  if index is None:
                      return
                  keep_index=set(tensor_index)-set([index])
                  device=torch.device(f"cuda:{gpu_id}")
                  for i in keep_index:
                      if tensor_list[i].device!=device:
                          #print(f"{op_type} {i} {tensor_list[i].device} -> {device}")
                          tensor_list[i].data=tensor_list[i].data.to(device,non_blocking=True) 
                          #卡间通信是串行的,所有多stream并不能充分提升性能
              def __torch_dispatch__(self, func, types, args=(),kwargs=None):
                  func_packet = func._overloadpacket
                  if kwargs is None:
                      kwargs = {}
                  op_type=f"{func}"
                  self.op_index+=1
                  if isinstance(args, list) or isinstance(args, tuple):
                      self.convert(op_type,args)
                  elif isinstance(args[0], list) or isinstance(args[0], tuple):
                      self.convert(op_type,args[0])
                  else:
                      print(op_type)
                  output= func(*args,**kwargs)
                  return output
          class TorchDumper:
              def __init__(self,**kwargs):
                  self.p= _ProfilerState(TorchDumpDispatchMode)
                  self.kwargs=kwargs
              def __enter__(self):
                  if self.p.object is None:
                      o = self.p.cls(self,**self.kwargs)
                      o.__enter__()
                      self.p.object = o
                  else:
                      self.p.object.step()
                  return self
              def __exit__(self, exc_type, exc_val, exc_tb):
                  TorchDumper._CURRENT_Dumper = None
                  if self.p.object is not None:
                      self.p.object.__exit__(exc_type, exc_val, exc_tb)
                      del self.p.object
                      
          torch.manual_seed(1)
          from transformers import AutoModelForMaskedLM,BertConfig
          config=BertConfig.from_pretrained("./config.json")
          model = AutoModelForMaskedLM.from_config(config)
          model.eval()
          cur_dev=0
          from collections import OrderedDict
          param_size=OrderedDict()
          total_size=0
          for name, param in model.named_parameters():
              #print(f"{name} {param.device} {param.shape}")
              sz=param.numel()*param.element_size()
              key=".".join(name.split(".")[:-1])
              if key not in param_size:
                  param_size[key]=0
              param_size[key]+=sz
              total_size+=sz
          for name, param in model.named_buffers():
              #print(name,param.device)
              sz=param.numel()*param.element_size()
              key=".".join(name.split(".")[:-1])
              if key not in param_size:
                  param_size[key]=0
              param_size[key]+=sz
              total_size+=sz
          sz_per_dev=total_size/4
          cur_size=0
          dev_map=OrderedDict()
          for k,v in param_size.items():
              sz=v
              cur_size+=sz
              if cur_size>=sz_per_dev:
                  print(cur_dev,cur_size)
                  cur_size=0
                  cur_dev+=1
              dev_map[k]=cur_dev
          for name, param in model.named_parameters():
              key=".".join(name.split(".")[:-1])
              op=dict(model.named_parameters())[name]
              device=f"cuda:{dev_map[key]}"
              op.data=op.data.to(device)
          for name, param in model.named_buffers():
              key=".".join(name.split(".")[:-1])
              op=dict(model.named_buffers())[name]
              device=f"cuda:{dev_map[key]}"
              op.data=op.data.to(device)
          with TorchDumper():
              sz=4
              input_tokens=torch.randint(0,config.vocab_size,(1,128)).to("cuda:0")
              streams=[torch.cuda.Stream() for _ in range(sz)]
              batch_size=0
              t0=time.time()
              for epoch in range(1024):
                  outputs=[]
                  for i in range(sz):
                      with torch.cuda.stream(streams[i]):
                          batch_size+=1
                          outputs.append(model(input_tokens))
              torch.cuda.synchronize()
              t1=time.time()
              print("qps:",batch_size/(t1-t0))
          EOF
          python split_bert_infer.py
          

          输出

          qps: 29.34
          

          6.手动切分成4份,基于NCCL实现pipeline并行

          tee pp_bert_infer.py <<-'EOF'
          import torch
          import os
          import time
          from collections import OrderedDict
          import torch.distributed as dist
          import torch.nn as nn
          import torch.nn.init as init
          import numpy as np
          import time
          import pynvml
          import numpy as np
          import threading
          class EmptyModule(torch.nn.Module):
              def __init__(self):
                  super(EmptyModule, self).__init__()
                  pass
              def forward(self,x):
                  return x[0]
          class PynvmlGPUUtilizationThread(threading.Thread):
              def __init__(self,device,interval=1):
                  super().__init__()
                  self.interval = interval
                  self.running = True
                  self.device=device        
                  self.handle = pynvml.nvmlDeviceGetHandleByIndex(device)
                  self.utilizations=[]
                  
              def run(self):
                  while self.running:
                      self.get_and_print_gpu_utilization()
                      time.sleep(self.interval)
              
              def stop(self):
                  self.running = False
              
              def get_and_print_gpu_utilization(self):
                  utilization = pynvml.nvmlDeviceGetUtilizationRates(self.handle)
                  self.utilizations.append(utilization.gpu)
                  
              def data(self):
                  return np.max(self.utilizations)
          pynvml.nvmlInit()
                  
          dist.init_process_group(backend='nccl')
          world_size = torch.distributed.get_world_size()
          rank=torch.distributed.get_rank()
          local_rank=int(os.environ['LOCAL_RANK'])
          torch.cuda.set_device(local_rank)
          torch.manual_seed(1)
          from transformers import AutoModelForMaskedLM,BertConfig
          config=BertConfig.from_pretrained("./config.json")
          model = AutoModelForMaskedLM.from_config(config)
          model.eval()
          divided=[]
          #查看modeling_bert.py找到相关的名字
          submodules=[]
          submodules.append(("embeddings",model.bert.embeddings))        
          for i,m in enumerate(model.bert.encoder.layer[:3]):
              submodules.append((f"{i}",m))
              submodules.append((f"{i}-1",EmptyModule()))   
          divided.append(submodules)
          submodules=[]
          for i,m in enumerate(model.bert.encoder.layer[3:7]):
              submodules.append((f"{i}",m))
              submodules.append((f"{i}-1",EmptyModule()))   
          divided.append(submodules)
          submodules=[]
             
          for i,m in enumerate(model.bert.encoder.layer[7:11]):
              submodules.append((f"{i}",m))
              submodules.append((f"{i}-1",EmptyModule()))   
          divided.append(submodules)
          submodules=[]
             
          for i,m in enumerate(model.bert.encoder.layer[11:]):
              submodules.append((f"{i}",m))
              submodules.append((f"{i}-1",EmptyModule()))
          submodules.append(("cls",model.cls))        
          divided.append(submodules)
          device=f"cuda:{local_rank}"
          example_input=torch.randint(0,config.vocab_size,(1,128)).to(device)
          submodule=torch.nn.Sequential(OrderedDict(divided[local_rank])).to(device)
          sreq=None
          ts=[]
          gpu_thread = PynvmlGPUUtilizationThread(local_rank,interval=1)
          gpu_thread.start()
          t0=time.time()
          for epoch in range(1000):
              if sreq is not None and not sreq.is_completed():
                  sreq.wait()
                  sreq=None
              if local_rank!=0:
                  tensor_size = torch.empty((3,), dtype=torch.int64).to(device)
                  torch.distributed.recv(tensor_size,local_rank-1)
                  example_input = torch.empty(tensor_size.tolist()).to(device)
                  torch.distributed.recv(example_input,local_rank-1)
                  #print("recv:",local_rank-1,example_input.shape)
              else:
                  torch.manual_seed(1)
              output=submodule(example_input)
              if local_ranklocal_rank} util:{gpu_thread.data():.2f}")
              
          if local_rank==world_size-1:
              ts=ts[len(ts)//2:]
              print("latency:",ts[1]-ts[0],"qps:",len(ts)/(ts[-1]-ts[0]),1000/(t1-t0))
          EOF
          torchrun -m --nnodes=1 --nproc_per_node=4 pp_bert_infer	
          

          输出:

          rank:1 util:40.00
          rank:0 util:97.00
          rank:2 util:39.00
          rank:3 util:78.00
          latency: 0.002396106719970703 qps: 408.6954420411698 244.76515394402227
          

转载请注明来自码农世界,本文标题:《测试bert》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,97人围观)参与讨论

还没有评论,来说两句吧...

Top