Commit progression

This commit is contained in:
Lynix 2017-01-27 14:48:31 +01:00
parent 12b4073033
commit 8a59dc88b8
11 changed files with 865 additions and 811 deletions

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -14,6 +14,7 @@
#include <Nazara/Network/ENetHost.hpp>
#include <Nazara/Network/ENetProtocol.hpp>
#include <Nazara/Network/IpAddress.hpp>
#include <Nazara/Network/NetBuffer.hpp>
#include <Nazara/Network/NetPacket.hpp>
#include <Nazara/Network/SocketPoller.hpp>
#include <Nazara/Network/UdpSocket.hpp>
@ -40,8 +41,10 @@ namespace Nz
void Broadcast(UInt8 channelId, ENetPacketFlags flags, NetPacket&& packet);
bool Connect(const IpAddress& remoteAddress, std::size_t channelCount = 0, UInt32 data = 0);
bool Connect(const String& hostName, NetProtocol protocol = NetProtocol_Any, const String& service = "http", ResolveError* error = nullptr, std::size_t channelCount = 0, UInt32 data = 0);
bool CheckEvents(ENetEvent* event);
ENetPeer* Connect(const IpAddress& remoteAddress, std::size_t channelCount = 0, UInt32 data = 0);
ENetPeer* Connect(const String& hostName, NetProtocol protocol = NetProtocol_Any, const String& service = "http", ResolveError* error = nullptr, std::size_t channelCount = 0, UInt32 data = 0);
inline bool Create(NetProtocol protocol, UInt16 port, std::size_t peerCount, std::size_t channelCount = 0);
bool Create(const IpAddress& address, std::size_t peerCount, std::size_t channelCount = 0);
@ -58,8 +61,10 @@ namespace Nz
private:
bool InitSocket(const IpAddress& address);
inline void AddToDispatchQueue(ENetPeer* peer);
inline void RemoveFromDispatchQueue(ENetPeer* peer);
void AddToDispatchQueue(ENetPeer* peer);
void RemoveFromDispatchQueue(ENetPeer* peer);
bool CheckTimeouts(ENetPeer* peer, ENetEvent* event);
bool DispatchIncomingCommands(ENetEvent* event);
@ -69,7 +74,11 @@ namespace Nz
bool HandleDisconnect(ENetPeer* peer, const ENetProtocol* command);
bool HandleIncomingCommands(ENetEvent* event);
bool HandlePing(ENetPeer* peer, const ENetProtocol* command);
bool HandleSendFragment(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData);
bool HandleSendReliable(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData);
bool HandleSendUnreliable(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData);
bool HandleSendUnreliableFragment(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData);
bool HandleSendUnsequenced(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData);
bool HandleThrottleConfigure(ENetPeer* peer, const ENetProtocol* command);
bool HandleVerifyConnect(ENetEvent* event, ENetPeer* peer, ENetProtocol* command);
@ -78,12 +87,19 @@ namespace Nz
void NotifyConnect(ENetPeer* peer, ENetEvent* event);
void NotifyDisconnect(ENetPeer*, ENetEvent* event);
void SendAcknowledgements(ENetPeer* peer);
bool SendReliableOutgoingCommands(ENetPeer* peer);
int SendOutgoingCommands(ENetEvent* event, bool checkForTimeouts);
void SendUnreliableOutgoingCommands(ENetPeer* peer);
void ThrottleBandwidth();
static std::size_t GetCommandSize(UInt8 commandNumber);
static bool Initialize();
static void Uninitialize();
std::array<ENetProtocol, ENetConstants::ENetProtocol_MaximumPacketCommands> m_commands;
std::array<NetBuffer, ENetConstants::ENetProtocol_MaximumPacketCommands * 2 + 1> m_buffers;
std::array<UInt8, ENetConstants::ENetProtocol_MaximumMTU> m_packetData[2];
std::bernoulli_distribution m_packetLossProbability;
std::size_t m_bandwidthLimitedPeers;
@ -93,6 +109,8 @@ namespace Nz
std::size_t m_duplicatePeers;
std::size_t m_maximumPacketSize;
std::size_t m_maximumWaitingData;
std::size_t m_packetSize;
std::size_t m_peerCount;
std::size_t m_receivedDataLength;
std::vector<ENetPeer> m_peers;
Bitset<UInt64> m_dispatchQueue;
@ -101,6 +119,7 @@ namespace Nz
IpAddress m_receivedAddress;
SocketPoller m_poller;
UdpSocket m_socket;
UInt16 m_headerFlags;
UInt32 m_bandwidthThrottleEpoch;
UInt32 m_connectedPeers;
UInt32 m_mtu;
@ -113,6 +132,7 @@ namespace Nz
UInt32 m_totalReceivedData;
UInt32 m_totalReceivedPackets;
UInt8* m_receivedData;
bool m_continueSending;
bool m_isSimulationEnabled;
bool m_shouldAcceptConnections;
bool m_recalculateBandwidthLimits;

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -50,16 +50,6 @@ namespace Nz
m_peers.clear();
m_socket.Close();
}
inline void ENetHost::AddToDispatchQueue(ENetPeer* peer)
{
m_dispatchQueue.UnboundedSet(peer->m_incomingPeerID);
}
inline void ENetHost::RemoveFromDispatchQueue(ENetPeer* peer)
{
m_dispatchQueue.UnboundedReset(peer->m_incomingPeerID);
}
}
#include <Nazara/Network/DebugOff.hpp>

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -40,7 +40,7 @@ namespace Nz
std::size_t referenceCount = 0;
};
struct ENetPacketRef
struct NAZARA_NETWORK_API ENetPacketRef
{
ENetPacketRef() = default;
@ -49,6 +49,18 @@ namespace Nz
Reset(packet);
}
ENetPacketRef(const ENetPacketRef& packet) :
ENetPacketRef()
{
Reset(packet);
}
ENetPacketRef(ENetPacketRef&& packet) :
m_packet(packet.m_packet)
{
packet.m_packet = nullptr;
}
~ENetPacketRef()
{
Reset();
@ -69,6 +81,23 @@ namespace Nz
ENetPacketRef& operator=(ENetPacket* packet)
{
Reset(packet);
return *this;
}
ENetPacketRef& operator=(const ENetPacketRef& packet)
{
Reset(packet);
return *this;
}
ENetPacketRef& operator=(ENetPacketRef&& packet)
{
m_packet = packet.m_packet;
packet.m_packet = nullptr;
return *this;
}
ENetPacket* m_packet = nullptr;

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -32,6 +32,7 @@ namespace Nz
friend struct PacketRef;
public:
ENetPeer(ENetHost* host, UInt16 peerId);
ENetPeer(const ENetPeer&) = delete;
ENetPeer(ENetPeer&&) = default;
~ENetPeer() = default;
@ -40,6 +41,8 @@ namespace Nz
void DisconnectLater(UInt32 data);
void DisconnectNow(UInt32 data);
inline const IpAddress& GetAddress() const;
void Ping();
bool Receive(ENetPacketRef* packet, UInt8* channelId);
@ -54,8 +57,6 @@ namespace Nz
ENetPeer& operator=(ENetPeer&&) = default;
private:
ENetPeer(ENetHost* host, UInt16 peerId);
void InitIncoming(std::size_t channelCount, const IpAddress& address, ENetProtocolConnect& incomingCommand);
void InitOutgoing(std::size_t channelCount, const IpAddress& address, UInt32 connectId, UInt32 windowSize);
@ -66,7 +67,7 @@ namespace Nz
// Protocol functions
inline void ChangeState(ENetPeerState state);
inline void DispatchState(ENetPeerState state);
void DispatchState(ENetPeerState state);
void DispatchIncomingReliableCommands(Channel& channel);
void DispatchIncomingUnreliableCommands(Channel& channel);
@ -80,7 +81,7 @@ namespace Nz
void ResetQueues();
bool QueueAcknowledgement(ENetProtocol* command, UInt16 sentTime);
IncomingCommmand* QueueIncomingCommand(ENetProtocol& command, const void* data, std::size_t dataLength, UInt32 flags, UInt32 fragmentCount);
IncomingCommmand* QueueIncomingCommand(const ENetProtocol& command, const void* data, std::size_t dataLength, UInt32 flags, UInt32 fragmentCount);
void QueueOutgoingCommand(ENetProtocol& command, ENetPacketRef packet, UInt32 offset, UInt16 length);
void SetupOutgoingCommand(OutgoingCommand& outgoingCommand);
@ -139,64 +140,66 @@ namespace Nz
UInt32 sentTime;
};
ENetHost* m_host;
IpAddress m_address; /**< Internet address of the peer */
std::vector<Channel> m_channels;
std::list<Acknowledgement> m_acknowledgements;
std::list<IncomingCommmand> m_dispatchedCommands;
std::list<OutgoingCommand> m_outgoingReliableCommands;
std::list<OutgoingCommand> m_outgoingUnreliableCommands;
std::list<OutgoingCommand> m_sentReliableCommands;
std::list<OutgoingCommand> m_sentUnreliableCommands;
MemoryPool m_packetPool;
//ENetListNode m_dispatchList;
ENetPeerState m_state;
UInt8 m_incomingSessionID;
UInt8 m_outgoingSessionID;
UInt16 m_incomingPeerID;
UInt16 m_incomingUnsequencedGroup;
UInt16 m_outgoingPeerID;
UInt16 m_outgoingReliableSequenceNumber;
UInt16 m_outgoingUnsequencedGroup;
UInt32 m_connectID;
UInt32 m_earliestTimeout;
UInt32 m_eventData;
UInt32 m_highestRoundTripTimeVariance;
UInt32 m_incomingBandwidth; /**< Downstream bandwidth of the client in bytes/second */
UInt32 m_incomingBandwidthThrottleEpoch;
UInt32 m_incomingDataTotal;
UInt32 m_lastReceiveTime;
UInt32 m_lastRoundTripTime;
UInt32 m_lastRoundTripTimeVariance;
UInt32 m_lastSendTime;
UInt32 m_lowestRoundTripTime;
UInt32 m_mtu;
UInt32 m_nextTimeout;
UInt32 m_outgoingBandwidth; /**< Upstream bandwidth of the client in bytes/second */
UInt32 m_outgoingBandwidthThrottleEpoch;
UInt32 m_outgoingDataTotal;
UInt32 m_packetLoss; /**< mean packet loss of reliable packets as a ratio with respect to the constant ENET_PEER_PACKET_LOSS_SCALE */
UInt32 m_packetLossEpoch;
UInt32 m_packetLossVariance;
UInt32 m_packetThrottle;
UInt32 m_packetThrottleAcceleration;
UInt32 m_packetThrottleCounter;
UInt32 m_packetThrottleDeceleration;
UInt32 m_packetThrottleEpoch;
UInt32 m_packetThrottleInterval;
UInt32 m_packetThrottleLimit;
UInt32 m_packetsLost;
UInt32 m_packetsSent;
UInt32 m_pingInterval;
UInt32 m_reliableDataInTransit;
UInt32 m_roundTripTime; /**< mean round trip time (RTT), in milliseconds, between sending a reliable packet and receiving its acknowledgment */
UInt32 m_roundTripTimeVariance;
UInt32 m_timeoutLimit;
UInt32 m_timeoutMaximum;
UInt32 m_timeoutMinimum;
UInt32 m_unsequencedWindow[ENetPeer_ReliableWindowSize / 32];
UInt32 m_windowSize;
std::size_t m_totalWaitingData;
static constexpr std::size_t unsequencedWindow = ENetPeer_ReliableWindowSize / 32;
ENetHost* m_host;
IpAddress m_address; /**< Internet address of the peer */
std::array<UInt32, unsequencedWindow> m_unsequencedWindow;
std::list<Acknowledgement> m_acknowledgements;
std::list<IncomingCommmand> m_dispatchedCommands;
std::list<OutgoingCommand> m_outgoingReliableCommands;
std::list<OutgoingCommand> m_outgoingUnreliableCommands;
std::list<OutgoingCommand> m_sentReliableCommands;
std::list<OutgoingCommand> m_sentUnreliableCommands;
std::size_t m_totalWaitingData;
std::vector<Channel> m_channels;
MemoryPool m_packetPool;
//ENetListNode m_dispatchList;
ENetPeerState m_state;
UInt8 m_incomingSessionID;
UInt8 m_outgoingSessionID;
UInt16 m_incomingPeerID;
UInt16 m_incomingUnsequencedGroup;
UInt16 m_outgoingPeerID;
UInt16 m_outgoingReliableSequenceNumber;
UInt16 m_outgoingUnsequencedGroup;
UInt32 m_connectID;
UInt32 m_earliestTimeout;
UInt32 m_eventData;
UInt32 m_highestRoundTripTimeVariance;
UInt32 m_incomingBandwidth; /**< Downstream bandwidth of the client in bytes/second */
UInt32 m_incomingBandwidthThrottleEpoch;
UInt32 m_incomingDataTotal;
UInt32 m_lastReceiveTime;
UInt32 m_lastRoundTripTime;
UInt32 m_lastRoundTripTimeVariance;
UInt32 m_lastSendTime;
UInt32 m_lowestRoundTripTime;
UInt32 m_mtu;
UInt32 m_nextTimeout;
UInt32 m_outgoingBandwidth; /**< Upstream bandwidth of the client in bytes/second */
UInt32 m_outgoingBandwidthThrottleEpoch;
UInt32 m_outgoingDataTotal;
UInt32 m_packetLoss; /**< mean packet loss of reliable packets as a ratio with respect to the constant ENET_PEER_PACKET_LOSS_SCALE */
UInt32 m_packetLossEpoch;
UInt32 m_packetLossVariance;
UInt32 m_packetThrottle;
UInt32 m_packetThrottleAcceleration;
UInt32 m_packetThrottleCounter;
UInt32 m_packetThrottleDeceleration;
UInt32 m_packetThrottleEpoch;
UInt32 m_packetThrottleInterval;
UInt32 m_packetThrottleLimit;
UInt32 m_packetsLost;
UInt32 m_packetsSent;
UInt32 m_pingInterval;
UInt32 m_reliableDataInTransit;
UInt32 m_roundTripTime; /**< mean round trip time (RTT), in milliseconds, between sending a reliable packet and receiving its acknowledgment */
UInt32 m_roundTripTimeVariance;
UInt32 m_timeoutLimit;
UInt32 m_timeoutMaximum;
UInt32 m_timeoutMinimum;
UInt32 m_windowSize;
};
}

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -8,6 +8,11 @@
namespace Nz
{
inline const IpAddress& ENetPeer::GetAddress() const
{
return m_address;
}
inline void ENetPeer::ChangeState(ENetPeerState state)
{
if (state == ENetPeerState::Connected || state == ENetPeerState::DisconnectLater)
@ -17,13 +22,6 @@ namespace Nz
m_state = state;
}
inline void ENetPeer::DispatchState(ENetPeerState state)
{
ChangeState(state);
m_host->AddToDispatchQueue(this);
}
}
#include <Nazara/Network/DebugOff.hpp>

View File

@ -1,4 +1,4 @@
// Copyright (C) 2017 Jérôme Leclercq
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Network module"
// For conditions of distribution and use, see copyright notice in Config.hpp
@ -13,6 +13,8 @@
namespace Nz
{
class ENetPeer;
// Constants for the ENet implementation and protocol
enum ENetConstants
{
@ -140,27 +142,38 @@ namespace Nz
ENetPacketRef packet;
};
struct ENetProtocolHeader
#ifdef _MSC_VER
#pragma pack(push, 1)
#define NAZARA_PACKED
#elif defined(__GNUC__) || defined(__clang__)
#define NAZARA_PACKED __attribute__ ((packed))
#else
#define NAZARA_PACKED
#endif
struct NAZARA_PACKED ENetProtocolHeader
{
UInt16 peerID;
UInt16 sentTime;
};
struct ENetProtocolCommandHeader
struct NAZARA_PACKED ENetProtocolCommandHeader
{
UInt8 command;
UInt8 channelID;
UInt16 reliableSequenceNumber;
};
struct ENetProtocolAcknowledge
struct NAZARA_PACKED ENetProtocolAcknowledge
{
ENetProtocolCommandHeader header;
UInt16 receivedReliableSequenceNumber;
UInt16 receivedSentTime;
};
struct ENetProtocolConnect
struct NAZARA_PACKED ENetProtocolConnect
{
ENetProtocolCommandHeader header;
UInt16 outgoingPeerID;
@ -178,25 +191,25 @@ namespace Nz
UInt32 data;
};
struct ENetProtocolBandwidthLimit
struct NAZARA_PACKED ENetProtocolBandwidthLimit
{
ENetProtocolCommandHeader header;
UInt32 incomingBandwidth;
UInt32 outgoingBandwidth;
};
struct ENetProtocolDisconnect
struct NAZARA_PACKED ENetProtocolDisconnect
{
ENetProtocolCommandHeader header;
UInt32 data;
};
struct ENetProtocolPing
struct NAZARA_PACKED ENetProtocolPing
{
ENetProtocolCommandHeader header;
};
struct ENetProtocolSendFragment
struct NAZARA_PACKED ENetProtocolSendFragment
{
ENetProtocolCommandHeader header;
UInt16 startSequenceNumber;
@ -207,27 +220,27 @@ namespace Nz
UInt32 fragmentOffset;
};
struct ENetProtocolSendReliable
struct NAZARA_PACKED ENetProtocolSendReliable
{
ENetProtocolCommandHeader header;
UInt16 dataLength;
};
struct ENetProtocolSendUnreliable
struct NAZARA_PACKED ENetProtocolSendUnreliable
{
ENetProtocolCommandHeader header;
UInt16 unreliableSequenceNumber;
UInt16 dataLength;
};
struct ENetProtocolSendUnsequenced
struct NAZARA_PACKED ENetProtocolSendUnsequenced
{
ENetProtocolCommandHeader header;
UInt16 unsequencedGroup;
UInt16 dataLength;
};
struct ENetProtocolThrottleConfigure
struct NAZARA_PACKED ENetProtocolThrottleConfigure
{
ENetProtocolCommandHeader header;
UInt32 packetThrottleInterval;
@ -235,7 +248,7 @@ namespace Nz
UInt32 packetThrottleDeceleration;
};
struct ENetProtocolVerifyConnect
struct NAZARA_PACKED ENetProtocolVerifyConnect
{
ENetProtocolCommandHeader header;
UInt16 outgoingPeerID;
@ -252,7 +265,7 @@ namespace Nz
UInt32 connectID;
};
union ENetProtocol
union NAZARA_PACKED ENetProtocol
{
ENetProtocolCommandHeader header;
ENetProtocolAcknowledge acknowledge;
@ -267,6 +280,10 @@ namespace Nz
ENetProtocolThrottleConfigure throttleConfigure;
ENetProtocolVerifyConnect verifyConnect;
};
#ifdef _MSC_VER
#pragma pack(pop)
#endif
}
#endif // NAZARA_ENETPROTOCOL_HPP

View File

@ -7,6 +7,8 @@
#ifndef NAZARA_ENUMS_NETWORK_HPP
#define NAZARA_ENUMS_NETWORK_HPP
#include <Nazara/Prerequesites.hpp>
namespace Nz
{
enum NetCode : UInt16

View File

@ -1,9 +1,10 @@
#include <Nazara/Network/ENetHost.hpp>
#include <Nazara/Network/ENetHost.hpp>
#include <Nazara/Core/Clock.hpp>
#include <Nazara/Core/Endianness.hpp>
#include <Nazara/Core/OffsetOf.hpp>
#include <Nazara/Network/ENetPeer.hpp>
#include <Nazara/Network/NetPacket.hpp>
#include <iostream>
#include <Nazara/Network/Debug.hpp>
#define ENET_TIME_OVERFLOW 86400000
@ -57,11 +58,6 @@ namespace Nz
sizeof(ENetProtocolThrottleConfigure),
sizeof(ENetProtocolSendFragment)
};
std::size_t enet_protocol_command_size(UInt8 commandNumber)
{
return s_commandSizes[commandNumber & ENetProtocolCommand_Mask];
}
}
@ -81,7 +77,19 @@ namespace Nz
}
}
bool ENetHost::Connect(const IpAddress& remoteAddress, std::size_t channelCount, UInt32 data)
bool ENetHost::CheckEvents(ENetEvent* event)
{
if (!event)
return false;
event->type = ENetEventType::None;
event->peer = nullptr;
event->packet.Reset();
return DispatchIncomingCommands(event);
}
ENetPeer* ENetHost::Connect(const IpAddress& remoteAddress, std::size_t channelCount, UInt32 data)
{
NazaraAssert(remoteAddress.IsValid(), "Invalid remote address");
NazaraAssert(remoteAddress.GetPort() != 0, "Remote address has no port");
@ -96,7 +104,7 @@ namespace Nz
if (peerId >= m_peers.size())
{
NazaraError("Insufficient peers");
return false;
return nullptr;
}
m_channelLimit = Clamp<std::size_t>(channelCount, ENetConstants::ENetProtocol_MinimumChannelCount, ENetConstants::ENetProtocol_MaximumChannelCount);
@ -130,10 +138,10 @@ namespace Nz
peer.QueueOutgoingCommand(command, nullptr, 0, 0);
return true;
return &peer;
}
bool ENetHost::Connect(const String& hostName, NetProtocol protocol, const String& service, ResolveError* error, std::size_t channelCount, UInt32 data)
ENetPeer* ENetHost::Connect(const String& hostName, NetProtocol protocol, const String& service, ResolveError* error, std::size_t channelCount, UInt32 data)
{
std::vector<HostnameInfo> results = IpAddress::ResolveHostname(protocol, hostName, service, error);
if (results.empty())
@ -173,7 +181,9 @@ namespace Nz
if (!InitSocket(address))
return false;
m_peers.resize(peerCount);
m_peers.reserve(peerCount);
for (std::size_t i = 0; i < peerCount; ++i)
m_peers.emplace_back(this, UInt16(i));
m_address = address;
m_randomSeed = *reinterpret_cast<UInt32*>(this);
@ -187,6 +197,7 @@ namespace Nz
m_mtu = ENetConstants::ENetHost_DefaultMTU;
m_commandCount = 0;
m_bufferCount = 0;
m_peerCount = peerCount;
m_receivedAddress = IpAddress::AnyIpV4;
m_receivedData = nullptr;
m_receivedDataLength = 0;
@ -214,8 +225,6 @@ namespace Nz
int ENetHost::Service(ENetEvent* event, UInt32 timeout)
{
UInt32 waitCondition;
if (event)
{
event->type = ENetEventType::None;
@ -265,7 +274,7 @@ namespace Nz
break;
}
switch (SendOutgoingCommands(event, 1))
switch (SendOutgoingCommands(event, true))
{
case 1:
return 1;
@ -283,21 +292,8 @@ namespace Nz
if (event)
{
switch (DispatchIncomingCommands(event))
{
case 1:
return 1;
case -1:
#ifdef ENET_DEBUG
perror("Error dispatching incoming packets");
#endif
return -1;
default:
break;
}
if (DispatchIncomingCommands(event))
return 1;
}
if (ENET_TIME_GREATER_EQUAL(m_serviceTime, timeout))
@ -310,12 +306,8 @@ namespace Nz
if (ENET_TIME_GREATER_EQUAL(m_serviceTime, timeout))
return 0;
SocketError error;
if (m_poller.Wait(ENET_TIME_DIFFERENCE(timeout, m_serviceTime), &error))
if (m_poller.Wait(ENET_TIME_DIFFERENCE(timeout, m_serviceTime)))
break;
if (error != SocketError_NoError)
return -1;
}
m_serviceTime = GetElapsedMilliseconds();
@ -349,6 +341,62 @@ namespace Nz
return true;
}
void ENetHost::AddToDispatchQueue(ENetPeer* peer)
{
m_dispatchQueue.UnboundedSet(peer->m_incomingPeerID);
}
void ENetHost::RemoveFromDispatchQueue(ENetPeer* peer)
{
m_dispatchQueue.UnboundedReset(peer->m_incomingPeerID);
}
bool ENetHost::CheckTimeouts(ENetPeer* peer, ENetEvent* event)
{
auto currentCommand = peer->m_sentReliableCommands.begin();
while (currentCommand != peer->m_sentReliableCommands.end())
{
auto outgoingCommand = currentCommand;
++currentCommand;
if (ENET_TIME_DIFFERENCE(m_serviceTime, outgoingCommand->sentTime) < outgoingCommand->roundTripTimeout)
continue;
if (peer->m_earliestTimeout == 0 || ENET_TIME_LESS(outgoingCommand->sentTime, peer->m_earliestTimeout))
peer->m_earliestTimeout = outgoingCommand->sentTime;
if (peer->m_earliestTimeout != 0 && (ENET_TIME_DIFFERENCE(m_serviceTime, peer->m_earliestTimeout) >= peer->m_timeoutMaximum ||
(outgoingCommand->roundTripTimeout >= outgoingCommand->roundTripTimeoutLimit && ENET_TIME_DIFFERENCE(m_serviceTime, peer->m_earliestTimeout) >= peer->m_timeoutMinimum)))
{
NotifyDisconnect(peer, event);
return true;
}
if (outgoingCommand->packet)
peer->m_reliableDataInTransit -= outgoingCommand->fragmentLength;
++peer->m_packetsLost;
outgoingCommand->roundTripTimeout *= 2;
peer->m_outgoingReliableCommands.emplace_front(std::move(*outgoingCommand));
peer->m_sentReliableCommands.erase(outgoingCommand);
// Okay this should just never procs, I don't see how it would be possible
/*if (currentCommand == enet_list_begin(&peer->sentReliableCommands) &&
!enet_list_empty(&peer->sentReliableCommands))
{
outgoingCommand = (ENetOutgoingCommand *) currentCommand;
peer->nextTimeout = outgoingCommand->sentTime + outgoingCommand->roundTripTimeout;
}*/
}
return false;
}
bool ENetHost::DispatchIncomingCommands(ENetEvent* event)
{
for (std::size_t bit = m_dispatchQueue.FindFirst(); bit != m_dispatchQueue.npos; bit = m_dispatchQueue.FindNext(bit))
@ -391,6 +439,9 @@ namespace Nz
AddToDispatchQueue(&peer);
return true;
default:
break;
}
}
@ -436,7 +487,7 @@ namespace Nz
if (peer->m_roundTripTimeVariance > peer->m_highestRoundTripTimeVariance)
peer->m_highestRoundTripTimeVariance = peer->m_roundTripTimeVariance;
if (peer->m_packetThrottleEpoch == 0 || ENET_TIME_DIFFERENCE(m_serviceTime, peer->m_packetThrottleEpoch) >= peer->packetThrottleInterval)
if (peer->m_packetThrottleEpoch == 0 || ENET_TIME_DIFFERENCE(m_serviceTime, peer->m_packetThrottleEpoch) >= peer->m_packetThrottleInterval)
{
peer->m_lastRoundTripTime = peer->m_lowestRoundTripTime;
peer->m_lastRoundTripTimeVariance = peer->m_highestRoundTripTimeVariance;
@ -770,7 +821,7 @@ namespace Nz
return commandError();
}
if (peer && (command->header.command & ENetProtocolCommand_Acknowledge) != 0)
if (peer && (command->header.command & ENetProtocolFlag_Acknowledge) != 0)
{
UInt16 sentTime;
@ -810,6 +861,90 @@ namespace Nz
return true;
}
bool ENetHost::HandleSendFragment(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData)
{
if (command->header.channelID >= peer->m_channels.size() || (peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater))
return false;
UInt16 fragmentLength = NetToHost(command->sendFragment.dataLength);
*currentData += fragmentLength;
if (fragmentLength >= m_maximumPacketSize || *currentData < m_receivedData || *currentData > &m_receivedData[m_receivedDataLength])
return false;
ENetPeer::Channel& channel = peer->m_channels[command->header.channelID];
UInt32 startSequenceNumber = NetToHost(command->sendFragment.startSequenceNumber);
UInt16 startWindow = startSequenceNumber / ENetConstants::ENetPeer_ReliableWindowSize;
UInt16 currentWindow = channel.incomingReliableSequenceNumber / ENetConstants::ENetPeer_ReliableWindowSize;
if (startSequenceNumber < channel.incomingReliableSequenceNumber)
startWindow += ENetConstants::ENetPeer_ReliableWindows;
if (startWindow < currentWindow || startWindow >= currentWindow + ENetConstants::ENetPeer_FreeReliableWindows - 1)
return true;
UInt32 fragmentNumber = NetToHost(command->sendFragment.fragmentNumber);
UInt32 fragmentCount = NetToHost(command->sendFragment.fragmentCount);
UInt32 fragmentOffset = NetToHost(command->sendFragment.fragmentOffset);
UInt32 totalLength = NetToHost(command->sendFragment.totalLength);
if (fragmentCount > ENetConstants::ENetProtocol_MaximumFragmentCount || fragmentNumber >= fragmentCount || totalLength > m_maximumPacketSize ||
fragmentOffset >= totalLength || fragmentLength > totalLength - fragmentOffset)
return false;
ENetPeer::IncomingCommmand* startCommand = nullptr;
for (auto currentCommand = channel.incomingReliableCommands.rbegin(); currentCommand != channel.incomingReliableCommands.rend(); ++currentCommand)
{
ENetPeer::IncomingCommmand& incomingCommand = *currentCommand;
if (startSequenceNumber >= channel.incomingReliableSequenceNumber)
{
if (incomingCommand.reliableSequenceNumber < channel.incomingReliableSequenceNumber)
continue;
}
else if (incomingCommand.reliableSequenceNumber >= channel.incomingReliableSequenceNumber)
break;
if (incomingCommand.reliableSequenceNumber <= startSequenceNumber)
{
if (incomingCommand.reliableSequenceNumber < startSequenceNumber)
break;
if ((incomingCommand.command.header.command & ENetProtocolCommand_Mask) != ENetProtocolCommand_SendFragment ||
totalLength != incomingCommand.packet->data.GetDataSize() || fragmentCount != incomingCommand.fragments.size())
return false;
startCommand = &incomingCommand;
break;
}
}
if (startCommand)
{
ENetProtocol hostCommand = *command;
hostCommand.header.reliableSequenceNumber = startSequenceNumber;
if (!peer->QueueIncomingCommand(hostCommand, nullptr, totalLength, ENetPacketFlag_Reliable, fragmentCount))
return false;
}
if ((startCommand->fragments[fragmentNumber / 32] & (1 << (fragmentNumber % 32))) == 0)
{
--startCommand->fragmentsRemaining;
startCommand->fragments[fragmentNumber / 32] |= (1 << (fragmentNumber % 32));
if (fragmentOffset + fragmentLength > startCommand->packet->data.GetDataSize())
fragmentLength = startCommand->packet->data.GetDataSize() - fragmentOffset;
std::memcpy(startCommand->packet->data.GetData() + Nz::NetPacket::HeaderSize + fragmentOffset, reinterpret_cast<const UInt8*>(command) + sizeof(ENetProtocolSendFragment), fragmentLength);
if (startCommand->fragmentsRemaining <= 0)
peer->DispatchIncomingReliableCommands(channel);
}
return false;
}
bool ENetHost::HandleSendReliable(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData)
{
if (command->header.channelID >= peer->m_channels.size() || (peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater))
@ -826,6 +961,152 @@ namespace Nz
return true;
}
bool ENetHost::HandleSendUnreliable(ENetPeer * peer, const ENetProtocol * command, UInt8 ** currentData)
{
if (command->header.channelID >= peer->m_channels.size() || (peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater))
return false;
UInt16 dataLength = NetToHost(command->sendUnreliable.dataLength);
*currentData += dataLength;
if (dataLength >= m_maximumPacketSize || *currentData < m_receivedData || *currentData > &m_receivedData[m_receivedDataLength])
return false;
if (!peer->QueueIncomingCommand(*command, reinterpret_cast<const UInt8*>(command) + sizeof(ENetProtocolSendUnreliable), dataLength, 0, 0))
return false;
return true;
}
bool ENetHost::HandleSendUnreliableFragment(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData)
{
if (command->header.channelID >= peer->m_channels.size() || (peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater))
return false;
UInt16 fragmentLength = NetToHost(command->sendFragment.dataLength);
*currentData += fragmentLength;
if (fragmentLength >= m_maximumPacketSize || *currentData < m_receivedData || *currentData > &m_receivedData[m_receivedDataLength])
return false;
ENetPeer::Channel& channel = peer->m_channels[command->header.channelID];
UInt32 reliableSequenceNumber = command->header.reliableSequenceNumber;
UInt32 startSequenceNumber = NetToHost(command->sendFragment.startSequenceNumber);
UInt16 reliableWindow = reliableSequenceNumber / ENetConstants::ENetPeer_ReliableWindowSize;
UInt16 currentWindow = channel.incomingReliableSequenceNumber / ENetConstants::ENetPeer_ReliableWindowSize;
if (startSequenceNumber < channel.incomingReliableSequenceNumber)
reliableWindow += ENetConstants::ENetPeer_ReliableWindows;
if (reliableWindow < currentWindow || reliableWindow >= currentWindow + ENetConstants::ENetPeer_FreeReliableWindows - 1)
return true;
if (reliableSequenceNumber == channel.incomingReliableSequenceNumber && startSequenceNumber <= channel.incomingUnreliableSequenceNumber)
return true;
UInt32 fragmentNumber = NetToHost(command->sendFragment.fragmentNumber);
UInt32 fragmentCount = NetToHost(command->sendFragment.fragmentCount);
UInt32 fragmentOffset = NetToHost(command->sendFragment.fragmentOffset);
UInt32 totalLength = NetToHost(command->sendFragment.totalLength);
if (fragmentCount > ENetConstants::ENetProtocol_MaximumFragmentCount || fragmentNumber >= fragmentCount || totalLength > m_maximumPacketSize ||
fragmentOffset >= totalLength || fragmentLength > totalLength - fragmentOffset)
return false;
ENetPeer::IncomingCommmand* startCommand = nullptr;
for (auto currentCommand = channel.incomingUnreliableCommands.rbegin(); currentCommand != channel.incomingUnreliableCommands.rend(); ++currentCommand)
{
ENetPeer::IncomingCommmand& incomingCommand = *currentCommand;
if (startSequenceNumber >= channel.incomingReliableSequenceNumber)
{
if (incomingCommand.reliableSequenceNumber < channel.incomingReliableSequenceNumber)
continue;
}
else if (incomingCommand.reliableSequenceNumber >= channel.incomingReliableSequenceNumber)
break;
if (incomingCommand.reliableSequenceNumber < reliableSequenceNumber)
break;
if (incomingCommand.reliableSequenceNumber > reliableSequenceNumber)
continue;
if (incomingCommand.unreliableSequenceNumber <= startSequenceNumber)
{
if (incomingCommand.unreliableSequenceNumber < startSequenceNumber)
break;
if ((incomingCommand.command.header.command & ENetProtocolCommand_Mask) != ENetProtocolCommand_SendUnreliableFragment ||
totalLength != incomingCommand.packet->data.GetDataSize() || fragmentCount != incomingCommand.fragments.size())
return false;
startCommand = &incomingCommand;
break;
}
}
if (startCommand)
{
if (!peer->QueueIncomingCommand(*command, nullptr, totalLength, ENetPacketFlag_UnreliableFragment, fragmentCount))
return false;
}
if ((startCommand->fragments[fragmentNumber / 32] & (1 << (fragmentNumber % 32))) == 0)
{
--startCommand->fragmentsRemaining;
startCommand->fragments[fragmentNumber / 32] |= (1 << (fragmentNumber % 32));
if (fragmentOffset + fragmentLength > startCommand->packet->data.GetDataSize())
fragmentLength = startCommand->packet->data.GetDataSize() - fragmentOffset;
std::memcpy(startCommand->packet->data.GetData() + Nz::NetPacket::HeaderSize + fragmentOffset, reinterpret_cast<const UInt8*>(command) + sizeof(ENetProtocolSendFragment), fragmentLength);
if (startCommand->fragmentsRemaining <= 0)
peer->DispatchIncomingUnreliableCommands(channel);
}
return true;
}
bool ENetHost::HandleSendUnsequenced(ENetPeer* peer, const ENetProtocol* command, UInt8** currentData)
{
if (command->header.channelID >= peer->m_channels.size() || peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater)
return false;
std::size_t dataLength = NetToHost(command->sendUnsequenced.dataLength);
*currentData += dataLength;
if (dataLength >= m_maximumPacketSize || *currentData < m_receivedData || *currentData > &m_receivedData[m_receivedDataLength])
return false;
UInt32 unsequencedGroup = NetToHost(command->sendUnsequenced.unsequencedGroup);
UInt32 index = unsequencedGroup % ENetConstants::ENetPeer_UnsequencedWindowSize;
if (unsequencedGroup < peer->m_incomingUnsequencedGroup)
unsequencedGroup += 0x10000;
if (unsequencedGroup >= static_cast<UInt32>(peer->m_incomingUnsequencedGroup) + ENetConstants::ENetPeer_UnsequencedWindows * ENetConstants::ENetPeer_UnsequencedWindowSize)
return true;
unsequencedGroup &= 0xFFFF;
if (unsequencedGroup - index != peer->m_incomingUnsequencedGroup)
{
peer->m_incomingUnsequencedGroup = unsequencedGroup - index;
peer->m_unsequencedWindow.fill(0);
}
else if (peer->m_unsequencedWindow[index / 32] & (1 << (index % 32)))
return true;
if (!peer->QueueIncomingCommand(*command, reinterpret_cast<const UInt8*>(command) + sizeof(ENetProtocolSendUnsequenced), dataLength, ENetPacketFlag_Unsequenced, 0))
return false;
peer->m_unsequencedWindow[index / 32] |= 1 << (index % 32);
return true;
}
bool ENetHost::HandleThrottleConfigure(ENetPeer* peer, const ENetProtocol* command)
{
if (peer->m_state != ENetPeerState::Connected && peer->m_state != ENetPeerState::DisconnectLater)
@ -901,17 +1182,8 @@ namespace Nz
// Intercept
switch (HandleIncomingCommands(event))
{
case 1:
return 1;
case -1:
return -1;
default:
break;
}
if (HandleIncomingCommands(event))
return 1;
}
return -1;
@ -956,6 +1228,346 @@ namespace Nz
}
}
void ENetHost::SendAcknowledgements(ENetPeer* peer)
{
std::cout << "SendAcknowledgements " << peer->m_acknowledgements.size() << std::endl;
auto currentAcknowledgement = peer->m_acknowledgements.begin();
while (currentAcknowledgement != peer->m_acknowledgements.end())
{
if (m_commandCount >= m_commands.size() || m_bufferCount >= m_buffers.size() || peer->m_mtu - m_packetSize < sizeof(ENetProtocolAcknowledge))
{
m_continueSending = true;
break;
}
ENetPeer::Acknowledgement& acknowledgement = *currentAcknowledgement;
ENetProtocol& command = m_commands[m_commandCount];
NetBuffer& buffer = m_buffers[m_bufferCount];
buffer.data = &command;
buffer.dataLength = sizeof(ENetProtocolAcknowledge);
m_packetSize += buffer.dataLength;
UInt16 reliableSequenceNumber = HostToNet(acknowledgement.command.header.reliableSequenceNumber);
command.header.command = ENetProtocolCommand_Acknowledge;
command.header.channelID = acknowledgement.command.header.channelID;
command.header.reliableSequenceNumber = reliableSequenceNumber;
command.acknowledge.receivedReliableSequenceNumber = reliableSequenceNumber;
command.acknowledge.receivedSentTime = HostToNet(acknowledgement.sentTime);
if ((acknowledgement.command.header.command & ENetProtocolCommand_Mask) == ENetProtocolCommand_Disconnect)
peer->DispatchState(ENetPeerState::Zombie);
currentAcknowledgement = peer->m_acknowledgements.erase(currentAcknowledgement);
++m_bufferCount;
++m_commandCount;
}
}
bool ENetHost::SendReliableOutgoingCommands(ENetPeer* peer)
{
bool canPing = true;
bool windowExceeded = false;
bool windowWrap = false;
auto currentCommand = peer->m_outgoingReliableCommands.begin();
while (currentCommand != peer->m_outgoingReliableCommands.end())
{
auto outgoingCommand = currentCommand;
UInt16 reliableWindow = outgoingCommand->reliableSequenceNumber / ENetConstants::ENetPeer_ReliableWindowSize;
ENetPeer::Channel* channel = (outgoingCommand->command.header.channelID < peer->m_channels.size()) ? &peer->m_channels[outgoingCommand->command.header.channelID] : nullptr;
if (channel)
{
if (!windowWrap && outgoingCommand->sendAttempts < 1 && !(outgoingCommand->reliableSequenceNumber % ENetPeer_ReliableWindowSize) &&
((channel->reliableWindows[(reliableWindow + ENetPeer_ReliableWindows - 1) % ENetPeer_ReliableWindows] >= ENetPeer_ReliableWindowSize) ||
channel->usedReliableWindows & ((((1 << ENetPeer_ReliableWindows) - 1) << reliableWindow) |
(((1 << ENetPeer_FreeReliableWindows) - 1) >> (ENetPeer_ReliableWindows - reliableWindow)))))
windowWrap = true;
if (windowWrap)
{
++currentCommand;
continue;
}
}
if (outgoingCommand->packet)
{
if (!windowExceeded)
{
UInt32 windowSize = (peer->m_packetThrottle * peer->m_windowSize) / ENetPeer_PacketThrottleScale;
if (peer->m_reliableDataInTransit + outgoingCommand->fragmentLength > std::max(windowSize, peer->m_mtu))
windowExceeded = true;
}
if (windowExceeded)
{
++currentCommand;
continue;
}
}
canPing = false;
std::size_t commandSize = s_commandSizes[outgoingCommand->command.header.command & ENetProtocolCommand_Mask];
if (m_commandCount >= m_commands.size() || m_bufferCount + 1 >= m_buffers.size() || peer->m_mtu - m_packetSize < commandSize ||
(outgoingCommand->packet && UInt16(peer->m_mtu - m_packetSize) < UInt16(commandSize + outgoingCommand->fragmentLength)))
{
m_continueSending = true;
break;
}
++currentCommand;
if (channel && outgoingCommand->sendAttempts < 1)
{
channel->usedReliableWindows |= 1 << reliableWindow;
++channel->reliableWindows[reliableWindow];
}
++outgoingCommand->sendAttempts;
if (outgoingCommand->roundTripTimeout == 0)
{
outgoingCommand->roundTripTimeout = peer->m_roundTripTime + 4 * peer->m_roundTripTimeVariance;
outgoingCommand->roundTripTimeoutLimit = peer->m_timeoutLimit * outgoingCommand->roundTripTimeout;
}
if (peer->m_sentReliableCommands.empty())
peer->m_nextTimeout = m_serviceTime + outgoingCommand->roundTripTimeout;
peer->m_sentReliableCommands.emplace_back(std::move(*outgoingCommand));
peer->m_outgoingReliableCommands.erase(outgoingCommand);
outgoingCommand = peer->m_sentReliableCommands.end();
--outgoingCommand;
outgoingCommand->sentTime = m_serviceTime;
ENetProtocol& command = m_commands[m_commandCount];
NetBuffer& buffer = m_buffers[m_bufferCount];
buffer.data = &command;
buffer.dataLength = commandSize;
m_packetSize += buffer.dataLength;
m_headerFlags |= ENetProtocolHeaderFlag_SentTime;
command = outgoingCommand->command;
if (outgoingCommand->packet)
{
++m_bufferCount;
NetBuffer& packetBuffer = m_buffers[m_bufferCount];
packetBuffer.data = outgoingCommand->packet->data.GetData() + Nz::NetPacket::HeaderSize + outgoingCommand->fragmentOffset;
packetBuffer.dataLength = outgoingCommand->fragmentLength;
m_packetSize += packetBuffer.dataLength;
peer->m_reliableDataInTransit += outgoingCommand->fragmentLength;
}
++peer->m_packetsSent;
++m_bufferCount;
++m_commandCount;
}
return canPing;
}
int ENetHost::SendOutgoingCommands(ENetEvent* event, bool checkForTimeouts)
{
std::array<UInt8, sizeof(ENetProtocolHeader) + sizeof(UInt32)> headerData;
ENetProtocolHeader* header = reinterpret_cast<ENetProtocolHeader*>(headerData.data());
m_continueSending = true;
while (m_continueSending)
{
m_continueSending = false;
for (std::size_t peer = 0; peer < m_peerCount; ++peer)
{
ENetPeer* currentPeer = &m_peers[peer];
if (currentPeer->m_state == ENetPeerState::Disconnected || currentPeer->m_state == ENetPeerState::Zombie)
continue;
m_headerFlags = 0;
m_commandCount = 0;
m_bufferCount = 1;
m_packetSize = sizeof(ENetProtocolHeader);
if (!currentPeer->m_acknowledgements.empty())
SendAcknowledgements(currentPeer);
if (checkForTimeouts && !currentPeer->m_sentReliableCommands.empty() && ENET_TIME_GREATER_EQUAL(m_serviceTime, currentPeer->m_nextTimeout) &&
CheckTimeouts(currentPeer, event))
{
if (event && event->type != ENetEventType::None)
return 1;
else
continue;
}
if ((currentPeer->m_outgoingReliableCommands.empty() || SendReliableOutgoingCommands(currentPeer)) && currentPeer->m_sentReliableCommands.empty() &&
ENET_TIME_DIFFERENCE(m_serviceTime, currentPeer->m_lastReceiveTime) >= currentPeer->m_pingInterval && currentPeer->m_mtu - m_packetSize >= sizeof(ENetProtocolPing))
{
currentPeer->Ping();
SendReliableOutgoingCommands(currentPeer);
}
if (!currentPeer->m_outgoingUnreliableCommands.empty())
SendUnreliableOutgoingCommands(currentPeer);
if (m_commandCount == 0)
continue;
if (currentPeer->m_packetLossEpoch == 0)
currentPeer->m_packetLossEpoch = m_serviceTime;
else if (ENET_TIME_DIFFERENCE(m_serviceTime, currentPeer->m_packetLossEpoch) >= ENetPeer_PacketLossInterval && currentPeer->m_packetsSent > 0)
{
UInt32 packetLoss = currentPeer->m_packetsLost * ENetPeer_PacketLossScale / currentPeer->m_packetsSent;
#ifdef ENET_DEBUG
printf("peer %u: %f%%+-%f%% packet loss, %u+-%u ms round trip time, %f%% throttle, %u/%u outgoing, %u/%u incoming\n", currentPeer->incomingPeerID, currentPeer->packetLoss / (float) ENET_PEER_PACKET_LOSS_SCALE, currentPeer->packetLossVariance / (float) ENET_PEER_PACKET_LOSS_SCALE, currentPeer->roundTripTime, currentPeer->roundTripTimeVariance, currentPeer->packetThrottle / (float) ENET_PEER_PACKET_THROTTLE_SCALE, enet_list_size(&currentPeer->outgoingReliableCommands), enet_list_size(&currentPeer->outgoingUnreliableCommands), currentPeer->channels != NULL ? enet_list_size(&currentPeer->channels->incomingReliableCommands) : 0, currentPeer->channels != NULL ? enet_list_size(&currentPeer->channels->incomingUnreliableCommands) : 0);
#endif
currentPeer->m_packetLossVariance -= currentPeer->m_packetLossVariance / 4;
if (packetLoss >= currentPeer->m_packetLoss)
{
currentPeer->m_packetLoss += (packetLoss - currentPeer->m_packetLoss) / 8;
currentPeer->m_packetLossVariance += (packetLoss - currentPeer->m_packetLoss) / 4;
}
else
{
currentPeer->m_packetLoss -= (currentPeer->m_packetLoss - packetLoss) / 8;
currentPeer->m_packetLossVariance += (currentPeer->m_packetLoss - packetLoss) / 4;
}
currentPeer->m_packetLossEpoch = m_serviceTime;
currentPeer->m_packetsSent = 0;
currentPeer->m_packetsLost = 0;
}
m_buffers[0].data = headerData.data();
if (m_headerFlags & ENetProtocolHeaderFlag_SentTime)
{
header->sentTime = HostToNet(static_cast<UInt16>(m_serviceTime));
m_buffers[0].dataLength = sizeof(ENetProtocolHeader);
}
else
m_buffers[0].dataLength = NazaraOffsetOf(ENetProtocolHeader, sentTime);
if (currentPeer->m_outgoingPeerID < ENetConstants::ENetProtocol_MaximumPeerId)
m_headerFlags |= currentPeer->m_outgoingSessionID << ENetProtocolHeaderSessionShift;
header->peerID = HostToNet(static_cast<UInt16>(currentPeer->m_outgoingPeerID | m_headerFlags));
currentPeer->m_lastSendTime = m_serviceTime;
std::size_t sentLength;
if (!m_socket.SendMultiple(currentPeer->m_address, m_buffers.data(), m_bufferCount, &sentLength))
return -1;
currentPeer->RemoveSentUnreliableCommands();
m_totalSentData += sentLength;
m_totalSentPackets++;
}
}
return 0;
}
void ENetHost::SendUnreliableOutgoingCommands(ENetPeer* peer)
{
auto currentCommand = peer->m_outgoingUnreliableCommands.begin();
while (currentCommand != peer->m_outgoingUnreliableCommands.end())
{
auto outgoingCommand = currentCommand;
std::size_t commandSize = s_commandSizes[outgoingCommand->command.header.command & ENetProtocolCommand_Mask];
if (m_commandCount >= m_commands.size() || m_bufferCount + 1 >= m_buffers.size() || peer->m_mtu - m_packetSize < commandSize ||
(outgoingCommand->packet && peer->m_mtu - m_packetSize < commandSize + outgoingCommand->fragmentLength))
{
m_continueSending = true;
break;
}
++currentCommand;
if (outgoingCommand->packet && outgoingCommand->fragmentOffset == 0)
{
peer->m_packetThrottleCounter += ENetConstants::ENetPeer_PacketThrottleCounter;
peer->m_packetThrottleCounter %= ENetConstants::ENetPeer_PacketThrottleScale;
if (peer->m_packetThrottleCounter > peer->m_packetThrottle)
{
UInt16 reliableSequenceNumber = outgoingCommand->reliableSequenceNumber;
UInt16 unreliableSequenceNumber = outgoingCommand->unreliableSequenceNumber;
for (;;)
{
peer->m_outgoingUnreliableCommands.erase(outgoingCommand);
if (currentCommand == peer->m_outgoingUnreliableCommands.end())
break;
outgoingCommand = currentCommand;
if (outgoingCommand->reliableSequenceNumber != reliableSequenceNumber || outgoingCommand->unreliableSequenceNumber != unreliableSequenceNumber)
break;
++currentCommand;
}
continue;
}
}
ENetProtocol& command = m_commands[m_commandCount];
NetBuffer& buffer = m_buffers[m_bufferCount];
buffer.data = &command;
buffer.dataLength = commandSize;
command = outgoingCommand->command;
if (outgoingCommand->packet)
{
++m_bufferCount;
NetBuffer& packetBuffer = m_buffers[m_bufferCount];
packetBuffer.data = outgoingCommand->packet->data.GetData() + Nz::NetPacket::HeaderSize + outgoingCommand->fragmentOffset;
packetBuffer.dataLength = outgoingCommand->fragmentLength;
m_packetSize += packetBuffer.dataLength;
peer->m_sentUnreliableCommands.emplace_back(std::move(*outgoingCommand));
}
peer->m_outgoingUnreliableCommands.erase(outgoingCommand);
++m_bufferCount;
++m_commandCount;
}
if (peer->m_state == ENetPeerState::DisconnectLater && peer->m_outgoingReliableCommands.empty() &&
peer->m_outgoingUnreliableCommands.empty() && peer->m_sentReliableCommands.empty())
peer->Disconnect(peer->m_eventData);
}
void ENetHost::ThrottleBandwidth()
{
UInt32 currentTime = GetElapsedMilliseconds();
@ -1108,6 +1720,11 @@ namespace Nz
}
}
std::size_t ENetHost::GetCommandSize(UInt8 commandNumber)
{
return s_commandSizes[commandNumber & ENetProtocolCommand_Mask];
}
bool ENetHost::Initialize()
{
std::random_device device;

View File

@ -1,11 +1,11 @@
#include <Nazara/Network/ENetPacket.hpp>
#include <Nazara/Network/ENetPacket.hpp>
#include <Nazara/Core/MemoryPool.hpp>
#include <Nazara/Network/Debug.hpp>
namespace Nz
{
/// Temporary
void ENetPacketRef::Reset(ENetPacket* packet = nullptr)
void ENetPacketRef::Reset(ENetPacket* packet)
{
if (m_packet)
{
@ -20,4 +20,4 @@ namespace Nz
if (m_packet)
m_packet->referenceCount++;
}
}
}

View File

@ -1,7 +1,8 @@
#include <Nazara/Network/ENetPeer.hpp>
#include <Nazara/Network/ENetPeer.hpp>
#include <Nazara/Core/Endianness.hpp>
#include <Nazara/Network/ENetHost.hpp>
#include <Nazara/Network/NetPacket.hpp>
#include <iostream>
#include <Nazara/Network/Debug.hpp>
namespace Nz
@ -129,7 +130,7 @@ namespace Nz
IncomingCommmand& incomingCommand = m_dispatchedCommands.front();
m_totalWaitingData -= incomingCommand.packet->data.GetSize();
m_totalWaitingData -= incomingCommand.packet->data.GetDataSize();
if (packet)
*packet = std::move(incomingCommand.packet);
@ -192,7 +193,7 @@ namespace Nz
m_eventData = 0;
m_totalWaitingData = 0;
std::memset(m_unsequencedWindow, 0, sizeof(m_unsequencedWindow));
m_unsequencedWindow.fill(0);
ResetQueues();
}
@ -209,7 +210,7 @@ namespace Nz
bool ENetPeer::Send(UInt8 channelId, ENetPacketRef packetRef)
{
if (m_state != ENetPeerState::Connected || channelId >= m_channels.size() || packetRef->data.GetSize() > m_host->m_maximumPacketSize)
if (m_state != ENetPeerState::Connected || channelId >= m_channels.size() || packetRef->data.GetDataSize() > m_host->m_maximumPacketSize)
return false;
Channel& channel = m_channels[channelId];
@ -218,7 +219,7 @@ namespace Nz
//if (m_host->m_checksum != nullptr)
// fragmentLength -= sizeof(UInt32);
UInt32 packetSize = static_cast<UInt32>(packetRef->data.GetSize());
UInt32 packetSize = static_cast<UInt32>(packetRef->data.GetDataSize());
if (packetSize > fragmentLength)
{
UInt32 fragmentCount = (packetSize + fragmentLength - 1) / fragmentLength;
@ -254,7 +255,7 @@ namespace Nz
OutgoingCommand outgoingCommand;
outgoingCommand.fragmentOffset = fragmentOffset;
outgoingCommand.fragmentLength = fragmentLength;
outgoingCommand.fragmentLength = static_cast<UInt16>(fragmentLength);
outgoingCommand.packet = packetRef;
outgoingCommand.command.header.command = commandNumber;
outgoingCommand.command.header.channelID = channelId;
@ -275,11 +276,20 @@ namespace Nz
command.header.channelID = channelId;
if ((packetRef->flags & (ENetPacketFlag_Reliable | ENetPacketFlag_Unsequenced)) == ENetPacketFlag_Unsequenced)
{
command.header.command = ENetProtocolCommand_SendUnsequenced | ENetProtocolFlag_Unsequenced;
command.sendUnsequenced.dataLength = HostToNet(UInt16(packetRef->data.GetDataSize()));
}
else if (packetRef->flags & ENetPacketFlag_Reliable || channel.outgoingUnreliableSequenceNumber >= 0xFFFF)
{
command.header.command = ENetProtocolCommand_SendReliable | ENetProtocolFlag_Acknowledge;
command.sendReliable.dataLength = HostToNet(UInt16(packetRef->data.GetDataSize()));
}
else
{
command.header.command = ENetProtocolCommand_SendUnreliable;
command.sendUnreliable.dataLength = HostToNet(UInt16(packetRef->data.GetDataSize()));
}
QueueOutgoingCommand(command, packetRef, 0, packetSize);
@ -354,6 +364,13 @@ namespace Nz
m_windowSize = Clamp<UInt32>(windowSize, ENetConstants::ENetProtocol_MinimumWindowSize, ENetConstants::ENetProtocol_MaximumWindowSize);
}
void ENetPeer::DispatchState(ENetPeerState state)
{
ChangeState(state);
m_host->AddToDispatchQueue(this);
}
void ENetPeer::DispatchIncomingReliableCommands(Channel& channel)
{
auto currentCommand = channel.incomingReliableCommands.begin();
@ -375,7 +392,7 @@ namespace Nz
channel.incomingUnreliableSequenceNumber = 0;
m_dispatchedCommands.splice(m_dispatchedCommands.end(), m_dispatchedCommands, channel.incomingReliableCommands.begin(), currentCommand);
m_dispatchedCommands.splice(m_dispatchedCommands.end(), channel.incomingReliableCommands, channel.incomingReliableCommands.begin(), currentCommand);
m_host->AddToDispatchQueue(this);
@ -408,7 +425,7 @@ namespace Nz
if (startCommand != currentCommand)
{
m_dispatchedCommands.splice(m_dispatchedCommands.end(), m_dispatchedCommands, startCommand, currentCommand);
m_dispatchedCommands.splice(m_dispatchedCommands.end(), channel.incomingUnreliableCommands, startCommand, currentCommand);
m_host->AddToDispatchQueue(this);
@ -436,7 +453,7 @@ namespace Nz
if (startCommand != currentCommand)
{
m_dispatchedCommands.splice(m_dispatchedCommands.end(), m_dispatchedCommands, startCommand, currentCommand);
m_dispatchedCommands.splice(m_dispatchedCommands.end(), channel.incomingUnreliableCommands, startCommand, currentCommand);
m_host->AddToDispatchQueue(this);
}
@ -448,7 +465,7 @@ namespace Nz
if (startCommand != currentCommand)
{
m_dispatchedCommands.splice(m_dispatchedCommands.end(), m_dispatchedCommands, startCommand, currentCommand);
m_dispatchedCommands.splice(m_dispatchedCommands.end(), channel.incomingUnreliableCommands, startCommand, currentCommand);
m_host->AddToDispatchQueue(this);
@ -587,7 +604,7 @@ namespace Nz
}
Acknowledgement acknowledgment;
acknowledgment.command = command;
acknowledgment.command = *command;
acknowledgment.sentTime = sentTime;
m_outgoingDataTotal += sizeof(Acknowledgement);
@ -597,7 +614,7 @@ namespace Nz
return true;
}
ENetPeer::IncomingCommmand* ENetPeer::QueueIncomingCommand(ENetProtocol& command, const void* data, std::size_t dataLength, UInt32 flags, UInt32 fragmentCount)
ENetPeer::IncomingCommmand* ENetPeer::QueueIncomingCommand(const ENetProtocol& command, const void* data, std::size_t dataLength, UInt32 flags, UInt32 fragmentCount)
{
static IncomingCommmand dummyCommand;
@ -740,6 +757,9 @@ namespace Nz
incomingCommand.fragments.resize(fragmentCount, 0);
incomingCommand.fragmentsRemaining = fragmentCount;
if (packet)
m_totalWaitingData += packet->data.GetDataSize();
auto it = commandList->insert(currentCommand.base(), incomingCommand);
switch (command.header.command & ENetProtocolCommand_Mask)
@ -762,7 +782,7 @@ namespace Nz
OutgoingCommand outgoingCommand;
outgoingCommand.command = command;
outgoingCommand.fragmentLength = length;
outgoingCommand.fragmentOffset = length;
outgoingCommand.fragmentOffset = offset;
outgoingCommand.packet = packet;
SetupOutgoingCommand(outgoingCommand);
@ -770,7 +790,7 @@ namespace Nz
void ENetPeer::SetupOutgoingCommand(OutgoingCommand& outgoingCommand)
{
m_outgoingDataTotal += enet_protocol_command_size(outgoingCommand.command.header.command) + outgoingCommand.fragmentLength;
m_outgoingDataTotal += ENetHost::GetCommandSize(outgoingCommand.command.header.command) + outgoingCommand.fragmentLength;
if (outgoingCommand.command.header.channelID == 0xFF)
{
@ -827,7 +847,7 @@ namespace Nz
break;
}
if (outgoingCommand.command.header.command & ENetProtocolCommand_Acknowledge)
if (outgoingCommand.command.header.command & ENetProtocolFlag_Acknowledge)
m_outgoingReliableCommands.emplace_back(outgoingCommand);
else
m_outgoingUnreliableCommands.emplace_back(outgoingCommand);

View File

@ -1,642 +0,0 @@
// Copyright (C) 2017 Jérôme Leclercq
// This file is part of the "Nazara Engine - Utility module"
// For conditions of distribution and use, see copyright notice in Config.hpp
#include <Nazara/Network/ENetHost.hpp>
#include <Nazara/Core/CallOnExit.hpp>
#include <Nazara/Core/Log.hpp>
#include <Nazara/Network/NetPacket.hpp>
#include <Nazara/Network/Debug.hpp>
namespace Nz
{
/*!
* \ingroup network
* \class Nz::ENetConnection
* \brief Network class that represents a reliable UDP connection, based on ENet library
*/
/*!
* \brief Constructs a RUdpConnection object by default
*/
ENetSocket::ENetSocket() :
m_bandwidthThrottleEpoch(0),
m_mtu(ENetConstants::DefaultMTU),
m_isSimulationEnabled(false),
m_shouldAcceptConnections(true)
{
m_randomSeed = *reinterpret_cast<UInt32*>(this);
m_randomSeed += s_randomGenerator();
m_randomSeed = (m_randomSeed << 16) | (m_randomSeed >> 16);
}
/*!
* \brief Connects to the IpAddress
* \return true
*
* \param remoteAddress Address to connect to
*
* \remark Produces a NazaraAssert if socket is not bound
* \remark Produces a NazaraAssert if remote is invalid
* \remark Produces a NazaraAssert if port is not specified
*/
bool RUdpConnection::Connect(const IpAddress& remoteAddress)
{
NazaraAssert(m_socket.GetState() == SocketState_Bound, "Socket must be bound first");
NazaraAssert(remoteAddress.IsValid(), "Invalid remote address");
NazaraAssert(remoteAddress.GetPort() != 0, "Remote address has no port");
PeerData& client = RegisterPeer(remoteAddress, PeerState_Connecting);
client.stateData1 = s_randomGenerator();
NetPacket connectionRequestPacket(NetCode_RequestConnection);
connectionRequestPacket << client.stateData1;
EnqueuePacket(client, PacketPriority_Immediate, PacketReliability_Reliable, connectionRequestPacket);
return true;
}
/*!
* \brief Connects to the hostname
* \return true If successful
*
* \param hostName Hostname of the remote
* \param protocol Net protocol to use
* \param service Specify the protocol used
* \param error Optional argument to get the error
*/
bool RUdpConnection::Connect(const String& hostName, NetProtocol protocol, const String& service, ResolveError* error)
{
std::vector<HostnameInfo> results = IpAddress::ResolveHostname(protocol, hostName, service, error);
if (results.empty())
{
m_lastError = SocketError_ResolveError;
return false;
}
IpAddress hostnameAddress;
for (const HostnameInfo& result : results)
{
if (!result.address)
continue;
if (result.socketType != SocketType_UDP)
continue;
hostnameAddress = result.address;
break; //< Take first valid address
}
return Connect(hostnameAddress);
}
/*!
* \brief Listens to a socket
* \return true If successfully bound
*
* \param remoteAddress Address to listen to
*/
bool RUdpConnection::Listen(const IpAddress& address)
{
if (!InitSocket(address.GetProtocol()))
return false;
return m_socket.Bind(address) == SocketState_Bound;
}
/*!
* \brief Polls the message
* \return true If there is a message
*
* \param message Message to poll
*
* \remark Produces a NazaraAssert if message is invalid
*/
bool RUdpConnection::PollMessage(RUdpMessage* message)
{
NazaraAssert(message, "Invalid message");
if (m_receivedMessages.empty())
return false;
*message = std::move(m_receivedMessages.front());
m_receivedMessages.pop();
return true;
}
/*!
* \brief Sends the packet to a peer
* \return true If peer exists (false may result from disconnected client)
*
* \param peerIp IpAddress of the peer
* \param priority Priority of the packet
* \param reliability Policy of reliability of the packet
* \param packet Packet to send
*/
bool RUdpConnection::Send(const IpAddress& peerIp, PacketPriority priority, PacketReliability reliability, const NetPacket& packet)
{
auto it = m_peerByIP.find(peerIp);
if (it == m_peerByIP.end())
return false; /// Silently fail (probably a disconnected client)
EnqueuePacket(m_peers[it->second], priority, reliability, packet);
return true;
}
/*!
* \brief Updates the reliable connection
*/
void RUdpConnection::Update()
{
m_currentTime = m_clock.GetMicroseconds();
NetPacket receivedPacket;
IpAddress senderIp;
while (m_socket.ReceivePacket(&receivedPacket, &senderIp))
OnPacketReceived(senderIp, std::move(receivedPacket));
//for (unsigned int i = m_activeClients.FindFirst(); i != m_activeClients.npos; i = m_activeClients.FindNext(i))
//{
// PeerData& clientData = m_peers[i];
CallOnExit resetIterator([this] () { m_peerIterator = m_peers.size(); });
for (m_peerIterator = 0; m_peerIterator < m_peers.size(); ++m_peerIterator)
{
PeerData& peer = m_peers[m_peerIterator];
UInt32 timeSinceLastPacket = static_cast<UInt32>(m_currentTime - peer.lastPacketTime);
if (timeSinceLastPacket > m_timeBeforeTimeOut)
{
DisconnectPeer(peer.index);
continue;
}
else if (timeSinceLastPacket > m_timeBeforePing)
{
if (m_currentTime - peer.lastPingTime > m_pingInterval)
{
NetPacket pingPacket(NetCode_Ping);
EnqueuePacket(peer, PacketPriority_Low, PacketReliability_Unreliable, pingPacket);
}
}
if (peer.state == PeerState_WillAck && m_currentTime - peer.stateData1 > m_forceAckSendTime)
{
NetPacket acknowledgePacket(NetCode_Acknowledge);
EnqueuePacket(peer, PacketPriority_Low, PacketReliability_Reliable, acknowledgePacket);
}
for (unsigned int priority = PacketPriority_Highest; priority <= PacketPriority_Lowest; ++priority)
{
std::vector<PendingPacket>& pendingPackets = peer.pendingPackets[priority];
for (PendingPacket& packetData : pendingPackets)
SendPacket(peer, std::move(packetData));
pendingPackets.clear();
}
auto it = peer.pendingAckQueue.begin();
while (it != peer.pendingAckQueue.end())
{
if (m_currentTime - it->timeSent > 2 * peer.roundTripTime)
{
OnPacketLost(peer, std::move(*it));
it = peer.pendingAckQueue.erase(it);
}
else
++it;
}
}
//m_activeClients.Reset();
}
/*!
* \brief Disconnects a peer
*
* \param peerIndex Index of the peer
*
* \remark Produces a NazaraNotice
*/
void RUdpConnection::DisconnectPeer(std::size_t peerIndex)
{
PeerData& peer = m_peers[peerIndex];
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": " + peer.address.ToString() + " has been disconnected due to time-out");
OnPeerDisconnected(this, peer.address);
// Remove from IP lookup table
m_peerByIP.erase(peer.address);
// Can we safely "remove" this slot?
if (m_peerIterator >= m_peers.size() - 1 || peerIndex > m_peerIterator)
{
// Yes we can
PeerData& newSlot = m_peers[peerIndex];
newSlot = std::move(m_peers.back());
newSlot.index = peerIndex; //< Update the moved slot index before resizing (in case it's the last one)
}
else
{
// Nope, let's be tricky
PeerData& current = m_peers[m_peerIterator];
PeerData& newSlot = m_peers[peerIndex];
newSlot = std::move(current);
newSlot.index = peerIndex; //< Update the moved slot index
current = std::move(m_peers.back());
current.index = m_peerIterator; //< Update the moved slot index
--m_peerIterator;
}
// Pop the last entry (from where we moved our slot)
m_peers.pop_back();
}
/*!
* \brief Enqueues a packet in the sending list
*
* \param peer Data relative to the peer
* \param priority Priority of the packet
* \param reliability Policy of reliability of the packet
* \param packet Packet to send
*/
void RUdpConnection::EnqueuePacket(PeerData& peer, PacketPriority priority, PacketReliability reliability, const NetPacket& packet)
{
UInt16 protocolBegin = static_cast<UInt16>(m_protocol & 0xFFFF);
UInt16 protocolEnd = static_cast<UInt16>((m_protocol & 0xFFFF0000) >> 16);
NetPacket data(packet.GetNetCode(), MessageHeader + packet.GetDataSize() + MessageFooter);
data << protocolBegin;
data.GetStream()->SetCursorPos(NetPacket::HeaderSize + MessageHeader);
data.Write(packet.GetConstData() + NetPacket::HeaderSize, packet.GetDataSize());
data << protocolEnd;
EnqueuePacketInternal(peer, priority, reliability, std::move(data));
}
/*!
* \brief Enqueues internally a packet in the sending list
*
* \param peer Data relative to the peer
* \param priority Priority of the packet
* \param reliability Policy of reliability of the packet
* \param packet Packet to send
*/
void RUdpConnection::EnqueuePacketInternal(PeerData& peer, PacketPriority priority, PacketReliability reliability, NetPacket&& data)
{
PendingPacket pendingPacket;
pendingPacket.data = std::move(data);
pendingPacket.priority = priority;
pendingPacket.reliability = reliability;
peer.pendingPackets[priority].emplace_back(std::move(pendingPacket));
m_activeClients.UnboundedSet(peer.index);
}
/*!
* \brief Inits the internal socket
* \return true If successful
*
* \param protocol Net protocol to use
*/
bool ENetSocket::InitSocket(NetProtocol protocol)
{
CallOnExit updateLastError([this]
{
m_lastError = m_socket.GetLastError();
});
if (!m_socket.Create(protocol))
return false;
m_socket.EnableBlocking(false);
m_socket.SetReceiveBufferSize(ENetConstants::ReceiveBufferSize);
m_socket.SetSendBufferSize(ENetConstants::SendBufferSize);
return true;
}
/*!
* \brief Processes the acks
*
* \param peer Data relative to the peer
* \param lastAck Last index of the ack
* \param ackBits Bits for acking
*/
void RUdpConnection::ProcessAcks(PeerData& peer, SequenceIndex lastAck, UInt32 ackBits)
{
auto it = peer.pendingAckQueue.begin();
while (it != peer.pendingAckQueue.end())
{
bool acked = false;
if (lastAck == it->sequenceId)
acked = true;
else if (!IsAckMoreRecent(it->sequenceId, lastAck))
{
unsigned int difference = ComputeSequenceDifference(lastAck, it->sequenceId);
if (difference <= 32)
acked = (ackBits >> (difference - 1)) & 1;
}
if (acked)
{
it = peer.pendingAckQueue.erase(it);
}
else
++it;
}
}
/*!
* \brief Registers a peer
* \return Data relative to the peer
*
* \param address Address of the peer
* \param state Status of the peer
*/
RUdpConnection::PeerData& RUdpConnection::RegisterPeer(const IpAddress& address, PeerState state)
{
PeerData data;
data.address = address;
data.localSequence = 0;
data.remoteSequence = 0;
data.index = m_peers.size();
data.lastPacketTime = m_currentTime;
data.lastPingTime = m_currentTime;
data.roundTripTime = 1'000'000; ///< Okay that's quite a lot
data.state = state;
m_activeClients.UnboundedSet(data.index);
m_peerByIP[address] = data.index;
m_peers.emplace_back(std::move(data));
return m_peers.back();
}
/*!
* \brief Operation to do when client requests a connection
*
* \param address Address of the peer
* \param sequenceId Sequence index for the ack
* \param token Token for connection
*/
void RUdpConnection::OnClientRequestingConnection(const IpAddress& address, SequenceIndex sequenceId, UInt64 token)
{
// Call hook to check if client should be accepted or not
OnPeerConnection(this, address);
PeerData& client = RegisterPeer(address, PeerState_Aknowledged);
client.remoteSequence = sequenceId;
/// Acknowledge connection
NetPacket connectionAcceptedPacket(NetCode_AcknowledgeConnection);
//connectionAcceptedPacket << address;
connectionAcceptedPacket << ~token;
EnqueuePacket(client, PacketPriority_Immediate, PacketReliability_Reliable, connectionAcceptedPacket);
}
/*!
* \brief Operation to do when a packet is lost
*
* \param peer Data relative to the peer
* \param packet Pending packet
*/
void RUdpConnection::OnPacketLost(PeerData& peer, PendingAckPacket&& packet)
{
//NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Lost packet " + String::Number(packet.sequenceId));
if (IsReliable(packet.reliability))
EnqueuePacketInternal(peer, packet.priority, packet.reliability, std::move(packet.data));
}
/*!
* \brief Operation to do when receiving a packet
*
* \param peerIndex Index of the peer
*
* \remark Produces a NazaraNotice
*/
void RUdpConnection::OnPacketReceived(const IpAddress& peerIp, NetPacket&& packet)
{
UInt16 protocolBegin;
UInt16 protocolEnd;
SequenceIndex sequenceId;
SequenceIndex lastAck;
UInt32 ackBits;
packet.GetStream()->SetCursorPos(packet.GetSize() - MessageFooter);
packet >> protocolEnd;
packet.GetStream()->SetCursorPos(NetPacket::HeaderSize);
packet >> protocolBegin;
UInt32 protocolId = static_cast<UInt32>(protocolEnd) << 16 | protocolBegin;
if (protocolId != m_protocol)
return; ///< Ignore
packet >> sequenceId >> lastAck >> ackBits;
auto it = m_peerByIP.find(peerIp);
if (it == m_peerByIP.end())
{
switch (packet.GetNetCode())
{
case NetCode_RequestConnection:
{
UInt64 token;
packet >> token;
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received NetCode_RequestConnection from " + peerIp.ToString() + ": " + String::Number(token));
if (!m_shouldAcceptConnections)
return; //< Ignore
OnClientRequestingConnection(peerIp, sequenceId, token);
break;
}
default:
return; //< Ignore
}
}
else
{
PeerData& peer = m_peers[it->second];
peer.lastPacketTime = m_currentTime;
if (peer.receivedQueue.find(sequenceId) != peer.receivedQueue.end())
return; //< Ignore
if (m_isSimulationEnabled && m_packetLossProbability(s_randomGenerator))
{
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Lost packet " + String::Number(sequenceId) + " from " + peerIp.ToString() + " for simulation purpose");
return;
}
///< Receiving a packet from an acknowledged client means the connection works in both ways
if (peer.state == PeerState_Aknowledged && packet.GetNetCode() != NetCode_RequestConnection)
{
peer.state = PeerState_Connected;
OnPeerAcknowledged(this, peerIp);
}
if (IsAckMoreRecent(sequenceId, peer.remoteSequence))
peer.remoteSequence = sequenceId;
ProcessAcks(peer, lastAck, ackBits);
peer.receivedQueue.insert(sequenceId);
switch (packet.GetNetCode())
{
case NetCode_Acknowledge:
return; //< Do not switch to will ack mode (to prevent infinite replies, just let's ping/pong do that)
case NetCode_AcknowledgeConnection:
{
if (peer.state == PeerState_Connected)
break;
IpAddress externalAddress;
UInt64 token;
packet /*>> externalAddress*/ >> token;
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received NetCode_AcknowledgeConnection from " + peerIp.ToString() + ": " + String::Number(token));
if (token == ~peer.stateData1)
{
peer.state = PeerState_Connected;
OnConnectedToPeer(this);
}
else
{
NazaraNotice("Received wrong token (" + String::Number(token) + " instead of " + String::Number(~peer.stateData1) + ") from client " + peer.address);
return; //< Ignore
}
break;
}
case NetCode_RequestConnection:
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received NetCode_RequestConnection from " + peerIp.ToString());
return; //< Ignore
case NetCode_Ping:
{
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received NetCode_Ping from " + peerIp.ToString());
NetPacket pongPacket(NetCode_Pong);
EnqueuePacket(peer, PacketPriority_Low, PacketReliability_Unreliable, pongPacket);
break;
}
case NetCode_Pong:
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received NetCode_Pong from " + peerIp.ToString());
break;
default:
{
NazaraNotice(m_socket.GetBoundAddress().ToString() + ": Received 0x" + String::Number(packet.GetNetCode(), 16) + " from " + peerIp.ToString());
RUdpMessage receivedMessage;
receivedMessage.from = peerIp;
receivedMessage.data = std::move(packet);
m_receivedMessages.emplace(std::move(receivedMessage));
break;
}
}
if (!HasPendingPackets(peer))
{
peer.state = PeerState_WillAck;
peer.stateData1 = m_currentTime;
}
}
}
/*!
* \brief Sends a packet to a peer
*
* \param peer Data relative to the peer
* \param packet Pending packet
*/
void RUdpConnection::SendPacket(PeerData& peer, PendingPacket&& packet)
{
if (peer.state == PeerState_WillAck)
peer.state = PeerState_Connected;
SequenceIndex remoteSequence = peer.remoteSequence;
UInt32 previousAcks = 0;
for (SequenceIndex ack : peer.receivedQueue)
{
if (ack == remoteSequence)
continue;
unsigned int difference = ComputeSequenceDifference(remoteSequence, ack);
if (difference <= 32U)
previousAcks |= (1U << (difference - 1));
}
SequenceIndex sequenceId = ++peer.localSequence;
packet.data.GetStream()->SetCursorPos(NetPacket::HeaderSize + sizeof(UInt16)); ///< Protocol begin has already been filled
packet.data << sequenceId;
packet.data << remoteSequence;
packet.data << previousAcks;
m_socket.SendPacket(peer.address, packet.data);
PendingAckPacket pendingAckPacket;
pendingAckPacket.data = std::move(packet.data);
pendingAckPacket.priority = packet.priority;
pendingAckPacket.reliability = packet.reliability;
pendingAckPacket.sequenceId = sequenceId;
pendingAckPacket.timeSent = m_currentTime;
peer.pendingAckQueue.emplace_back(std::move(pendingAckPacket));
}
/*!
* \brief Initializes the RUdpConnection class
* \return true
*/
bool RUdpConnection::Initialize()
{
std::random_device device;
s_randomGenerator.seed(device());
return true;
}
/*!
* \brief Uninitializes the RUdpConnection class
*/
void RUdpConnection::Uninitialize()
{
}
std::mt19937_64 RUdpConnection::s_randomGenerator;
}