diff --git a/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs b/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs index 1acc9df0d1..fbcea60294 100644 --- a/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs +++ b/com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs @@ -471,9 +471,15 @@ public void NetworkUpdate(NetworkUpdateStage updateStage) // This should be invoked just prior to the MessageManager processes its outbound queue. SceneManager.CheckForAndSendNetworkObjectSceneChanged(); - - // Process outbound messages - MessageManager.ProcessSendQueues(); +#if UNIFIED_NETCODE + if (!NetworkConfig.Prefabs.HasGhostPrefabs) + { +#endif + // Process outbound messages + MessageManager.ProcessSendQueues(); +#if UNIFIED_NETCODE + } +#endif // Metrics update needs to be driven by NetworkConnectionManager's update to assure metrics are dispatched after the send queue is processed. MetricsManager.UpdateMetrics(); @@ -1354,7 +1360,6 @@ private bool CanStart(StartType type) /// The world instance assigned to this NetworkManager instance. /// public NetcodeWorld NetcodeWorld { get; internal set; } - internal void InitializeNetcodeWorld() { if (NetcodeWorld != null) diff --git a/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs b/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs index c7c813b872..d6dec10042 100644 --- a/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs +++ b/com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs @@ -43,37 +43,41 @@ public static NativeArray ToNativeArray(in FixedBytes1280 data) } } + internal struct TransportRpcData : IBufferElementData + { + public FixedBytes1280 Buffer; + } + [BurstCompile] internal struct TransportRpc : IOutOfBandRpcCommand, IRpcCommandSerializer { - public FixedBytes1280 Buffer; - public ulong Order; + public TransportRpcData Value; public unsafe void Serialize(ref DataStreamWriter writer, in RpcSerializerState state, in TransportRpc data) { - writer.WriteULong(data.Order); - writer.WriteInt(data.Buffer.Length); - var span = new Span(FixedBytes1280.GetUnsafePtr(data.Buffer), data.Buffer.Length); + writer.WriteInt(data.Value.Buffer.Length); + var span = new Span(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), data.Value.Buffer.Length); writer.WriteBytes(span); } public unsafe void Deserialize(ref DataStreamReader reader, in RpcDeserializerState state, ref TransportRpc data) { - data.Order = reader.ReadULong(); var length = reader.ReadInt(); - data.Buffer = new FixedBytes1280 + data.Value.Buffer = new FixedBytes1280 { Length = length }; - var span = new Span(FixedBytes1280.GetUnsafePtr(data.Buffer), length); + var span = new Span(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), length); reader.ReadBytes(span); } [BurstCompile(DisableDirectCall = true)] private static void InvokeExecute(ref RpcExecutor.Parameters parameters) { - RpcExecutor.ExecuteCreateRequestComponent(ref parameters); + var element = new TransportRpc(); + element.Deserialize(ref parameters.Reader, parameters.DeserializerState, ref element); + parameters.CommandBuffer.AppendToBuffer(parameters.JobIndex, parameters.Connection, element.Value); } private static readonly PortableFunctionPointer k_InvokeExecuteFunctionPointer = new PortableFunctionPointer(InvokeExecute); @@ -115,9 +119,19 @@ public void OnUpdate(ref SystemState state) } } + [WorldSystemFilter(WorldSystemFilterFlags.ServerSimulation | WorldSystemFilterFlags.ClientSimulation | WorldSystemFilterFlags.ThinClientSimulation)] + [UpdateInGroup(typeof(SimulationSystemGroup), OrderLast = true)] + [UpdateBefore(typeof(RpcSystem))] internal partial class UnifiedNetcodeUpdateSystem : SystemBase { + public void OnCreate(ref SystemState state) + { + state.RequireForUpdate(); + state.RequireForUpdate(); + } + public UnifiedNetcodeTransport Transport; + public NetworkManager NetworkManager; public List DisconnectQueue = new List(); @@ -125,23 +139,37 @@ public void Disconnect(Connection connection) { DisconnectQueue.Add(connection); } + + public void SendRpc(TransportRpc rpc, Entity connectionEntity) + { + var rpcQueue = SystemAPI.GetSingleton().GetRpcQueue(); + var ghostInstance = GetComponentLookup(); + var rpcDataStreamBuffer = EntityManager.GetBuffer(connectionEntity); + rpcQueue.Schedule(rpcDataStreamBuffer, ghostInstance, rpc); + } protected override void OnUpdate() { + NetworkManager.MessageManager.ProcessSendQueues(); + using var commandBuffer = new EntityCommandBuffer(Allocator.Temp); - foreach (var (request, rpc, entity) in SystemAPI.Query, RefRO>().WithEntityAccess()) + foreach(var (networkId, _, entity) in SystemAPI.Query, RefRO>().WithEntityAccess()) { - var connectionId = SystemAPI.GetComponent(request.ValueRO.SourceConnection).Value; - - var buffer = rpc.ValueRO.Buffer; - try - { - Transport.DispatchMessage(connectionId, buffer, rpc.ValueRO.Order); - } - finally + var connectionId = networkId.ValueRO.Value; + DynamicBuffer rpcs = EntityManager.GetBuffer(entity); + foreach (var rpc in rpcs) { - commandBuffer.DestroyEntity(entity); + var buffer = rpc.Buffer; + try + { + Transport.DispatchMessage(connectionId, buffer); + } + catch(Exception e) + { + Debug.LogException(e); + } } + rpcs.Clear(); } foreach (var connection in DisconnectQueue) @@ -171,34 +199,15 @@ private class ConnectionInfo public BatchedSendQueue SendQueue; public BatchedReceiveQueue ReceiveQueue; public Connection Connection; - public ulong LastSent; - public ulong LastReceived; public Dictionary DeferredMessages; } private Dictionary m_Connections; - internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong order) + internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer) { var connectionInfo = m_Connections[connectionId]; - if (order <= connectionInfo.LastReceived) - { - Debug.LogWarning("Received duplicate message, ignoring."); - return; - } - - if (order != connectionInfo.LastReceived + 1) - { - if (connectionInfo.DeferredMessages == null) - { - connectionInfo.DeferredMessages = new Dictionary(); - } - - connectionInfo.DeferredMessages[order] = buffer; - return; - } - using var arr = FixedBytes1280.ToNativeArray(buffer); var reader = new DataStreamReader(arr); if (connectionInfo.ReceiveQueue == null) @@ -209,20 +218,7 @@ internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong { connectionInfo.ReceiveQueue.PushReader(reader); } - - connectionInfo.LastReceived = order; - if (connectionInfo.DeferredMessages != null) - { - var next = order + 1; - while (connectionInfo.DeferredMessages.Remove(next, out var nextBuffer)) - { - reader = new DataStreamReader(FixedBytes1280.ToNativeArray(nextBuffer)); - connectionInfo.ReceiveQueue.PushReader(reader); - connectionInfo.LastReceived = next; - ++next; - } - } - + var message = connectionInfo.ReceiveQueue.PopMessage(); while (message.Count != 0) { @@ -243,20 +239,15 @@ public override unsafe void Send(ulong clientId, ArraySegment payload, Net while (!connectionInfo.SendQueue.IsEmpty) { - var rpc = new TransportRpc - { - Buffer = new FixedBytes1280(), - }; + var rpc = new TransportRpc(); - var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Buffer), k_MaxPacketSize); + var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Value.Buffer), k_MaxPacketSize); var amount = connectionInfo.SendQueue.FillWriterWithBytes(ref writer, k_MaxPacketSize); - rpc.Buffer.Length = amount; - rpc.Order = ++connectionInfo.LastSent; - - var req = m_NetworkManager.NetcodeWorld.EntityManager.CreateEntity(ComponentType.ReadWrite(), ComponentType.ReadWrite()); - m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, new SendRpcCommandRequest{TargetConnection = connectionInfo.Connection.ConnectionEntity}); - m_NetworkManager.NetcodeWorld.EntityManager.SetComponentData(req, rpc); + rpc.Value.Buffer.Length = amount; + + var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged(); + updateSystem.SendRpc(rpc, connectionInfo.Connection.ConnectionEntity); connectionInfo.SendQueue.Consume(amount); } @@ -280,6 +271,8 @@ private void OnClientConnectedToServer(Connection connection, NetCodeConnectionE }; m_ServerClientId = connection.NetworkId.Value; InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup); + var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged(); + updateSystem.EntityManager.AddBuffer(connection.ConnectionEntity); } private void OnServerNewClientConnection(Connection connection, NetCodeConnectionEvent connectionEvent) @@ -291,6 +284,8 @@ private void OnServerNewClientConnection(Connection connection, NetCodeConnectio Connection = connection }; ; InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup); + var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged(); + updateSystem.EntityManager.AddBuffer(connection.ConnectionEntity); } private const string k_InvalidRpcMessage = "An invalid RPC was received"; @@ -403,6 +398,7 @@ public override bool StartClient() m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnClientConnectionEvent; var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged(); updateSystem.Transport = this; + updateSystem.NetworkManager = m_NetworkManager; return true; } @@ -416,6 +412,7 @@ public override bool StartServer() m_NetworkManager.NetcodeWorld.OnConnectionEvent += OnServerConnectionEvent; var updateSystem = m_NetworkManager.NetcodeWorld.GetExistingSystemManaged(); updateSystem.Transport = this; + updateSystem.NetworkManager = m_NetworkManager; return true; }