Unity Protobuf+RPC+UniTask

Unity Protobuf+RPC+UniTask

码农世界 2024-06-14 后端 129 次浏览 0个评论

远程过程调用(RPC)协议详解

  • 什么是RPC协议
    • RPC的基本原理
    • RPC的关键组件
    • RPC的优缺点
    • Protobuf
    • 函数绑定
    • Call
    • Encode
    • Recv
    • Decode
    • Socket.Send和Recv
    • 项目地址

      什么是RPC协议

      远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。

      RPC的基本原理

      RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:

      1.客户端调用:客户端程序发起对某个远程过程的调用请求。

      2.请求打包:调用参数被打包成消息,发送到服务器。

      3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。

      4.结果打包和返回:执行结果被打包成消息,发送回客户端。

      5.客户端接收结果:客户端解包消息,获取调用结果

      RPC的关键组件

      1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。

      2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。

      3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。

      4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。

      RPC的优缺点

      优点

      1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。

      2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。

      缺点

      1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。

      2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。

      3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。

      Protobuf

      protoc.exe 生成C#文件

      Unity Protobuf+RPC+UniTask

      Gen.bat

      @echo off
      rem 设置路径变量
      set PROTOC_PATH="protoc.exe"
      set PROTO_DIR="Protos"
      set OUTPUT_DIR="ProtocolCodes"
      rem 创建日志头
      echo .......................proto2C#.......................
      echo.
      rem 检查目录是否存在
      if not exist %PROTO_DIR% (
          echo Error: Protocols directory does not exist.
          echo Please create the Protocols directory and place your .proto files in it.
          echo.
          pause
          exit /b
      )
      rem 创建输出目录
      if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%
      rem 批量处理 .proto 文件
      for %%f in (%PROTO_DIR%\*.proto) do (
          echo %%f complete
          %PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f
      )
      echo code generation complete. Press any key to close.
      pause > nul
      

      函数绑定

      1.使用反射自动获取所有RPC函数, 对其进行Hash绑定

      函数的定义 RPCMsgHandles.cs

      public sealed class RPCMsgHandles
      {
          private static void ReqMove(int unitId, Move move)
          {
          }
          private static void RecvAttack(int skillid, Attack attack, ItemList itemList)
          {
              LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}");
          }
          private static void RecvDelete(int msg)
          {
              LogHelper.Log($"Recv: state = {msg}");
          }
          private static void RecvReflectMove(Move move)
          {
              LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}");
          }
      }
      

      使用反射进行函数绑定 RPCMoudle.cs

      public sealed class RPCMoudle
      {
          private static Dictionary _msg = new Dictionary();
          public static void Init()
          {
              System.Type type = typeof(RPCMsgHandles);
              MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);
              foreach (MethodInfo methodInfo in methods)
              {
                  RPC method = new RPC(methodInfo);
                  int index = 0;
                  ParameterInfo[] infos = methodInfo.GetParameters();
                  foreach (var info in infos)
                  {
                      if (typeof(IMessage).IsAssignableFrom(info.ParameterType))
                      {
                          IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage;
                          method.AddParamType(DateType.Message);
                          method.AddParam(index, message);
                      }
                      else
                      {
                          DateType dateType = GetDateType(info.ParameterType);
                          method.AddParamType(dateType);
                      }
                      index++;
                  }
                  int hash = Globals.Hash(methodInfo.Name);
                  if (_msg.ContainsKey(hash))
                      throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name);
                  _msg.Add(hash, method);
              }
          }
      }
      

      2.使用泛型手动进行RPC函数绑定

      泛型类进行函数绑定 RPCMoudle.cs

      public static void Register(string methodName, Action action) where T : class, IMessage, new()
      {
          int id = Globals.Hash(methodName);
          RPCStatic method = new RPCStatic();
          method.Register(action, new T());
          if (_msg.ContainsKey(id))
          {
              LogHelper.LogError($"repeat id, id = {id}");
          }
          _msg[id] = method;
      }
      public static void Unregister(string methodName)
      {
          int id = Globals.Hash(methodName);
          if (_msg.ContainsKey(id))
          {
              _msg.Remove(id);
          }
          else
          {
              LogHelper.LogError($"no find method, id = {id}");
          }
      }
      

      Call

      Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid

      Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包

      Call函数有多个方法重载, 根据业务需求使用

      1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定

      2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷

      具体调用例子
      Move move = new Move();
      move.X = 10;
      move.Y = 20;
      move.Speed = 100;
      move.Dir = 20;
      这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
      使用起来简单方便, 业务逻辑开发上使用较为方便
      比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id);
      比如请求保存勾选 RPCMoudle.Call("Save", true);
      RPCMoudle.Call("ReqMove", 10016, move);
      这样是类型安全的, 也不会存在装箱拆箱的开销
      更加高效, 战斗场景较为适合
      RPCMoudle.Call("ReqMove", move);
      

      Call的实现, 将数据进行Encode转换成二进制

      public static void Call(string methodName, IMessage message)
      {
          if (message == null) 
              return;
          try
          {
              int id = Globals.Hash(methodName);
              int offset = 0;
              BuffMessage msg = GameFrame.message.GetBuffMessage();
              BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
              offset += sizeof(int);
              BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
              BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);
              msg.length = offset;
              Main.Instance.Send(msg);
          }
          catch(Exception ex)
          {
              LogHelper.LogError(ex.ToString());
          }
      }
      public static void Call(string id, params object[] args)
      {
          try
          {
              Profiler.BeginSample("rpc call");
              int hash = Globals.Hash(id);
              BuffMessage msg = Encode(hash, args);
              Main.Instance.Send(msg);
              Profiler.EndSample();
          }
          catch(Exception ex)
          {
              LogHelper.LogError(ex.ToString());
          }
      }
      

      Encode

      Encode函数的实现

      private static BuffMessage Encode(int id, params object[] args)
      {
        int offset = 0;
        BuffMessage msg = GameFrame.message.GetBuffMessage();
        BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
        offset += sizeof(int);
        BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
        foreach (object arg in args)
        {
            try
            {
                System.Type type = arg.GetType();
                switch (arg)
                {
                    case IMessage:
                        BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);
                        break;
                    case Int16:
                        BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);
                        break;
                    case Int32:
                        BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);
                        break;
                    case Int64:
                        BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);
                        break;
                    case UInt16:
                        BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);
                        break;
                    case UInt32:
                        BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);
                        break;
                    case UInt64:
                        BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);
                        break;
                    case bool:
                        BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);
                        break;
                    case Byte:
                        BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                        break;
                    case SByte:
                        BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                        break;
                    case Char:
                        BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);
                        break;
                    case Single:
                        BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);
                        break;
                    case Double:
                        BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);
                        break;
                    case string:
                        BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);
                        break;
                }
            }
            catch(Exception ex)
            {
                LogHelper.LogError($"id: {id}, " + ex.ToString());
                msg.Dispose();
                return msg;
            }
        }
        msg.length = offset;
        return msg;
      }
      

      0GC的TryWriteBytes方案

      namespace Game
      {
          public static class BitConverterHelper
          {
              private static readonly int BUFFER_SIZE = 1024 * 1024;
              private static readonly byte[] buffer = new byte[BUFFER_SIZE];
              private static CodedOutputStream _stream;
              private static Stopwatch _watch;
              public static void Init()
              {
                  CreateStream();
                  _watch = new Stopwatch();
                  _watch.Start();
              }
              private static void CreateStream()
              {
                  if (_stream != null)
                      _stream.Dispose();
                  if (_watch != null)
                  {
                      _watch.Stop();
                      LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s");
                      _watch.Restart();
                  }
                  _stream = new CodedOutputStream(buffer);
              }
              private static Span ToByteArray(IMessage message)
              {
                  if (message == null)
                      return new byte[0];
                  int length = message.CalculateSize();
                  if (length == 0)
                      return new byte[0];
                  if (length >= BUFFER_SIZE)
                  {
                      throw new Exception($"overflow: message length >= {BUFFER_SIZE}");
                  }
                  if (_stream.Position + length >= BUFFER_SIZE)
                      CreateStream();
                  int position = (int)_stream.Position;
                  message.WriteTo(_stream);
                  return buffer.AsSpan(position, length);
              }
              public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Int16;
                  Check(buffer, offset + sizeof(Int16));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Int16);
              }
              public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Int32;
                  Check(buffer, offset + sizeof(Int32));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Int32);
              }
              public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Int64;
                  Check(buffer, offset + sizeof(Int64));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Int64);
              }
              public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.UInt16;
                  Check(buffer, offset + sizeof(UInt16));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(UInt16);
              }
              public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.UInt32;
                  Check(buffer, offset + sizeof(UInt32));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(UInt32);
              }
              public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.UInt64;
                  Check(buffer, offset + sizeof(UInt64));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(UInt64);
              }
              public static void WriteBool(byte[] buffer, ref int offset, bool arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Boolean;
                  Check(buffer, offset + sizeof(bool));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(bool);
              }
              public static void WriteByte(byte[] buffer, ref int offset, byte arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Byte;
                  Check(buffer, offset + 1);
                  buffer[offset++] = arg;
              }
              public static void WriteChar(byte[] buffer, ref int offset, Char arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Char;
                  Check(buffer, offset + sizeof(Char));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Char);
              }
              public static void WriteSingle(byte[] buffer, ref int offset, Single arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Single;
                  Check(buffer, offset + sizeof(Single));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Single);
              }
              public static void WriteDouble(byte[] buffer, ref int offset, Double arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Double;
                  Check(buffer, offset + sizeof(Double));
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                  offset += sizeof(Double);
              }
              public static void WriteString(byte[] buffer, ref int offset, string arg)
              {
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.String;
                  byte[] bytes = Encoding.UTF8.GetBytes(arg);
                  Check(buffer, offset + bytes.Length);
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
                  offset += sizeof(int);
                  Span target = new Span(buffer, offset, buffer.Length - offset);
                  bytes.CopyTo(target);
                  offset += bytes.Length;
              }
              public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg)
              {
                  IMessage message = arg;
                  Span bytes = ToByteArray(message);
                  Check(buffer, offset + 1);
                  buffer[offset++] = (byte)DateType.Message;
                  Check(buffer, offset + bytes.Length);
                  BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
                  offset += sizeof(int);
                  Span target = new Span(buffer, offset, bytes.Length);
                  bytes.CopyTo(target);
                  offset += bytes.Length;
              }
              private static void Check(byte[] buffer, int offset)
              {
                  if (offset >= buffer.Length)
                      throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!");
              }
              public static void Dispose()
              {
                  _stream?.Dispose();
                  _stream = null;
              }
          }
      }
      

      Recv

      Decode 数据解析,调用本地方法

      public static void OnRPC(BuffMessage msg)
      {
          if(msg == null)
          {
              LogHelper.LogError("socket recv error, msg == null");
              return;
          }
          Decode(msg.bytes);
      }
      

      Decode

      0GC的Decode方案

      private static void Decode(byte[] buffer)
      {
          if (buffer == null || buffer.Length < sizeof(int))
          {
              LogHelper.LogError("Invalid buffer received");
              return;
          }
          int protoId = BitConverter.ToInt32(buffer, 0);
          if (!_msg.TryGetValue(protoId, out IRPC method))
          {
              LogHelper.LogError($"Method not found for protoId: {protoId}");
              return;
          }
          BuffMessage buffMessage = GameFrame.message.GetBuffMessage();
          try
          {
              Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));
              method.Decode(buffMessage.bytes);
          }
          catch (Exception ex)
          {
              LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}");
          }
          finally
          {
              GameFrame.message.PutBuffMessage(buffMessage);
          }
      }
      
      namespace Game
      {
          public interface IRPC : IDisposable
          {
              public void Decode(byte[] buffer);
          }
          public abstract class RPCBase : IRPC
          {
              protected byte[] buffer;
              public abstract void Decode(byte[] buffer);
              protected ReadOnlySpan ReadData(DateType type, ref int offset)
              {
                  ReadOnlySpan data = null;
                  int length = GetLength(type);
                  if (length > 0)
                  {
                      data = new ReadOnlySpan(buffer, offset, length);
                      offset += length;
                  }
                  return data;
              }
              protected bool ToBoolean(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Boolean, ref offset);
                  return BitConverter.ToBoolean(data);
              }
              protected Byte ToByte(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Char, ref offset);
                  return data[0];
              }
              protected char ToChar(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Char, ref offset);
                  return BitConverter.ToChar(data);
              }
              protected Int16 ToInt16(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Int16, ref offset);
                  return BitConverter.ToInt16(data);
              }
              protected UInt16 ToUInt16(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.UInt16, ref offset);
                  return BitConverter.ToUInt16(data);
              }
              protected Int32 ToInt32(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Int32, ref offset);
                  return BitConverter.ToInt32(data);
              }
              protected UInt32 ToUInt32(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.UInt32, ref offset);
                  return BitConverter.ToUInt32(data);
              }
              protected Int64 ToInt64(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Int64, ref offset);
                  return BitConverter.ToInt64(data);
              }
              protected UInt64 ToUInt64(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.UInt64, ref offset);
                  return BitConverter.ToUInt64(data);
              }
              protected Single ToSingle(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Single, ref offset);
                  return BitConverter.ToSingle(data);
              }
              protected Double ToDouble(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.Double, ref offset);
                  return BitConverter.ToDouble(data);
              }
              protected string ToString(ref int offset)
              {
                  ReadOnlySpan data = ReadData(DateType.String, ref offset);
                  return DecodeString(ref offset);
              }
              protected IMessage ToMessage(ref int offset, IMessage message)
              {
                  ReadOnlySpan data = ReadData(DateType.Message, ref offset);
                  return DecodeMessage(ref offset, message);
              }
              private IMessage DecodeMessage(ref int offset, IMessage message)
              {
                  int length = BitConverter.ToInt32(buffer, offset);
                  offset += sizeof(int);
                  ReadOnlySpan messageData = new ReadOnlySpan(buffer, offset, length);
                  offset += length;
                  return message.Descriptor.Parser.ParseFrom(messageData)!;
              }
              private string DecodeString(ref int offset)
              {
                  int length = BitConverter.ToInt32(buffer, offset);
                  offset += sizeof(int);
                  ReadOnlySpan messageData = new ReadOnlySpan(buffer, offset, length);
                  offset += length;
                  return Encoding.UTF8.GetString(messageData);
              }
              private static int GetLength(DateType type)
              {
                  switch (type)
                  {
                      case DateType.Boolean:
                          return sizeof(bool);
                      case DateType.Char:
                          return sizeof(char);
                      case DateType.SByte:
                      case DateType.Byte:
                          return sizeof(byte);
                      case DateType.Int16:
                          return sizeof(Int16);
                      case DateType.UInt16:
                          return sizeof(UInt16);
                      case DateType.Int32:
                          return sizeof(Int32);
                      case DateType.UInt32:
                          return sizeof(UInt32);
                      case DateType.Int64:
                          return sizeof(Int64);
                      case DateType.UInt64:
                          return sizeof(UInt64);
                      case DateType.Single:
                          return sizeof(Single);
                      case DateType.Double:
                          return sizeof(double);
                  }
                  return -1;
              }
              public virtual void Dispose()
              {
                  buffer = null;
              }
          }
      }
      

      Decode数据到对象列表, 然后Invoke

      namespace Game
      {
          public class RPC : RPCBase
          {
              private MethodInfo _method;
              private List _types;
              private List _params;
              private Dictionary _param;
              private int _paramIndex;
              public RPC(MethodInfo method)
              {
                  this._method = method;
                  _types = new List();
                  _params = new List();
                  _param = new Dictionary();
              }
              public void AddParamType(DateType type)
              {
                  _types?.Add(type!);
              }
              public void AddParam(int index, IMessage message)
              {
                  _param[index] = message;
              }
              public override void Decode(byte[] buffer)
              {
                  base.buffer = buffer;
                  _paramIndex = 0;
                  int offset = 0;
                  _params.Clear();
                  foreach (DateType type in _types)
                  {
                      DateType dateType = (DateType)buffer[offset++];
                      if (dateType != type)
                      {
                          LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}");
                      }
                      object obj = ToObject(dateType, ref offset);
                      _params.Add(obj!);
                      _paramIndex++;
                  }
                  _method?.Invoke(null, _params.ToArray());
              }
              private object ToObject(DateType type, ref int offset)
              {
                  switch (type)
                  {
                      case DateType.Message:
                          IMessage message = null;
                          if (!_param!.TryGetValue(_paramIndex, out message))
                          {
                              LogHelper.LogError("no find message");
                              return null;
                          }
                              
                          return ToMessage(ref offset, message);
                      case DateType.Boolean:
                          return ToBoolean(ref offset);
                      case DateType.Char:
                          return ToChar(ref offset);
                      case DateType.SByte:
                      case DateType.Byte:
                          return ToByte(ref offset);
                      case DateType.Int16:
                          return ToInt16(ref offset);
                      case DateType.UInt16:
                          return ToUInt16(ref offset);
                      case DateType.Int32:
                          return ToInt32(ref offset);
                      case DateType.UInt32:
                          return ToUInt32(ref offset);
                      case DateType.Int64:
                          return ToInt64(ref offset);
                      case DateType.UInt64:
                          return ToUInt64(ref offset);
                      case DateType.Single:
                          return ToSingle(ref offset);
                      case DateType.Double:
                          return ToDouble(ref offset);
                      case DateType.String:
                          return ToString(ref offset);
                      default:
                          LogHelper.LogError("no find dateType: " + type);
                          break;
                  }
                  return null;
              }
              public override void Dispose()
              {
                  base.Dispose();
                  _method = null;
                  _types = null;
              }
          }
      }
       
      

      泛型Decode,然后Invokde

      namespace Game
      {
          public class RPCStatic : RPCBase
          {
              private Action _action;
              private IMessage _message;
              public RPCStatic()
              {
              }
              public virtual void Register(Action action, IMessage message)
              {
                  this._message = message;
                  this._action = action;
              }
              public override void Decode(byte[] buffer)
              {
                  base.buffer = buffer;
                  int offset = 0;
                  DateType dateType = (DateType)buffer[offset++];
                  try
                  {
                      if (dateType == DateType.Message)
                      {
                          IMessage arg = ToMessage(ref offset, _message);
                          _action?.Invoke((T)arg);
                      }
                      else
                      {
                          LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}");
                      }
                  }
                  catch (Exception ex)
                  {
                      LogHelper.LogError(ex.ToString());
                  }
              }
              public override void Dispose()
              {
              }
          }
      }
      

      Socket.Send和Recv

      使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理

      namespace Game
      {
          public enum SocketState
          {
              None = 0,
              Connected = 1,
              Disconnected = 2,
              Connecting = 3,
              ConnectFailed = 4,
              Close = 5,
              Dispose = 6,
          }
          public class Tcp
          {
              private ConcurrentQueue _sendMsgs;
              private ConcurrentQueue _receiveMsgs;
              private TcpClient _tcpClient;
              private SocketState _socketState;
              private byte[] _recvBuff;
              private int _recvOffset;
              private int _delay = 10;
              private CancellationTokenSource _recvCancelToken;
              private CancellationTokenSource _sendCancelToken;
              public SocketState State { get { return _socketState; } }
              public string IP { get; set; }
              public int Port { get; set; }
              public NetworkStream Stream
              {
                  get { return _tcpClient.GetStream(); }
              }
              public Tcp()
              {
                  _sendMsgs = new ConcurrentQueue();
                  _receiveMsgs = new ConcurrentQueue();
                  _recvBuff = new byte[Globals.BUFFER_SIZE];
              }
              private void InitTcpClient()
              {
                  _tcpClient = new TcpClient();
                  _recvCancelToken = new CancellationTokenSource();
                  _sendCancelToken = new CancellationTokenSource();
              }
              public void Update()
              {
                  Profiler.BeginSample("on tcp rpc");
                  if (_receiveMsgs.TryDequeue(out BuffMessage msg))
                  {
                      RPCMoudle.OnRPC(msg);
                      GameFrame.message.PutBuffMessage(msg);
                  }
                  Profiler.EndSample();
              }
              public void Connect(string ip, int port)
              {
                  IP = ip;
                  Port = port;
                  Connect();
              }
              public async void Connect()
              {
                  try
                  {
                      Close();
                      InitTcpClient();
                      SetSocketState(SocketState.Connecting);
                      await _tcpClient.ConnectAsync(IP, Port);
                      OnConnect();
                  }
                  catch (Exception ex)
                  {
                      LogHelper.LogError(ex.ToString());
                  }
              }
              private void OnConnect()
              {
                  try
                  {
                      if (_tcpClient.Connected)
                      {
                          LogHelper.Log("connected...");
                          SetSocketState(SocketState.Connected);
                          StartAsyncTasks();
                      }
                      else
                      {
                          SetSocketState(SocketState.ConnectFailed);
                      }
                  }
                  catch (Exception ex)
                  {
                      LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message);
                      SetSocketState(SocketState.ConnectFailed);
                  }
              }
              private void StartAsyncTasks()
              {
                  UniTask send = UniTask.Create(SendThread);
                  UniTask recv = UniTask.Create(RecvThread);
              }
              private async UniTask SendThread()
              {
                  await UniTask.SwitchToThreadPool();
                  while (_socketState == SocketState.Connected)
                  {
                      while (true)
                      {
                          if (!_sendMsgs.TryDequeue(out BuffMessage msg))
                              break;
                          var timeoutToken = new CancellationTokenSource();
                          timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));
                          var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);
                          try
                          {
                              if(_sendCancelToken.IsCancellationRequested)
                                  break;
                              await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);
                              LogHelper.Log($"发送完成: {msg.length} byte");
                              GameFrame.message.PutBuffMessage(msg);
                          }
                          catch (OperationCanceledException ex)
                          {
                              if (timeoutToken.IsCancellationRequested)
                              {
                                  _sendMsgs.Enqueue(msg);
                                  LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送...");
                                  await UniTask.Delay(10);
                                  continue;
                              }
                              LogHelper.LogWarning("发送操作被终止..." + ex.Message);
                              break;
                          }
                          catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted)
                          {
                              LogHelper.Log("发送操作被终止...");
                              break;
                          }
                          catch (Exception ex)
                          {
                              LogHelper.LogError("发送错误: " + ex.Message);
                              break;
                          }
                      }
                      await UniTask.Delay(_delay);
                  }
              }
              private async UniTask RecvThread()
              {
                  await UniTask.SwitchToThreadPool();
                  while (_socketState == SocketState.Connected)
                  {
                      try
                      {
                          if (_recvCancelToken.IsCancellationRequested) 
                              break;
                          int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);
                          if (length == 0)
                          {
                              LogHelper.Log("connect failed...");
                              break;
                          }
                          _recvOffset += length;
                          int offset = 0;
                          while (true)
                          {
                              if (_recvOffset - offset < sizeof(int))
                                  // 没有足够的数据读取下一个消息的长度
                                  break;
                              int dataLength = BitConverter.ToInt32(_recvBuff, offset);
                              if (_recvOffset - offset < dataLength + sizeof(int))
                                  // 没有足够的数据读取完整的消息
                                  break;
                              // 读取完整消息
                              BuffMessage msg = GameFrame.message.GetBuffMessage();
                              Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength);
                              _receiveMsgs.Enqueue(msg);
                              // 移动偏移量到下一个消息
                              offset += sizeof(int) + dataLength;
                          }
                          // 将未处理的数据移到缓冲区开头
                          if (_recvOffset - offset > 0)
                              Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);
                          _recvOffset -= offset;
                      }
                      catch(OperationCanceledException ex)
                      {
                          LogHelper.Log("读取操作被终止: " + ex.Message);
                          break;
                      }
                      catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted)
                      {
                          LogHelper.Log("读取操作被终止...");
                          break;
                      }
                      catch (Exception ex)
                      {
                          LogHelper.LogError("读取错误: " + ex.ToString());
                          break;
                      }
                      await UniTask.Delay(_delay);
                  }
              }
              private void SetSocketState(SocketState state)
              {
                  _socketState = state;
              }
              public void Send(BuffMessage message)
              {
                  if (message.length > 0)
                  {
                      int headLength = sizeof(int);
                      Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);
                      BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);
                      message.length += headLength;
                      _sendMsgs.Enqueue(message);
                  }
                  else
                  {
                      GameFrame.message.PutBuffMessage(message);
                  }
              }
              public void Close()
              {
                  if (_tcpClient == null)
                      return;
                  try
                  {
                      if (_tcpClient.Connected)
                      {
                          SetSocketState(SocketState.Close);
                          _recvCancelToken.Dispose();
                          _sendCancelToken.Dispose();
                          _tcpClient.Close();
                      }
                  }
                  catch (Exception ex)
                  {
                      LogHelper.LogError(ex.ToString());
                  }
              }
              public void Dispose()
              {
                  Close();
              
                  if (_tcpClient != null)
                  {
                      _tcpClient.Dispose();
                      _tcpClient = null;
                  }
                  if (_sendMsgs != null)
                  {
                      _sendMsgs.Clear();
                      _sendMsgs = null;
                  }
                  if (_receiveMsgs != null)
                  {
                      _receiveMsgs.Clear();
                      _receiveMsgs = null;
                  }
                  SetSocketState(SocketState.Dispose);
              }
          }
      }
      

      项目地址

      SimpleRPC

      转载请注明来自码农世界,本文标题:《Unity Protobuf+RPC+UniTask》

      百度分享代码,如果开启HTTPS请参考李洋个人博客
      每一天,每一秒,你所做的决定都会改变你的人生!

      发表评论

      快捷回复:

      评论列表 (暂无评论,129人围观)参与讨论

      还没有评论,来说两句吧...

      Top