Add stream buffering (WIP)

This commit is contained in:
SirLynix 2022-04-09 18:22:57 +02:00
parent 2b66ea1e90
commit 22f58fdbf5
14 changed files with 291 additions and 189 deletions

View File

@ -24,17 +24,16 @@ namespace Nz
bool EndOfStream() const override;
UInt64 GetCursorPos() const override;
UInt64 GetSize() const override;
bool SetCursorPos(UInt64 offset) override;
EmptyStream& operator=(const EmptyStream&) = default;
EmptyStream& operator=(EmptyStream&&) noexcept = default;
private:
void FlushStream() override;
std::size_t ReadBlock(void* buffer, std::size_t size) override;
bool SeekStreamCursor(UInt64 offset) override;
UInt64 TellStreamCursor() const override;
std::size_t WriteBlock(const void* buffer, std::size_t size) override;
UInt64 m_size;

View File

@ -104,15 +104,16 @@ namespace Nz
enum class OpenMode
{
NotOpen, // Use the current mod of opening
NotOpen, // Use the current mod of opening
Append, // Disable writing on existing parts and put the cursor at the end
Lock, // Disable modifying the file before it is open
MustExist, // Fail if the file doesn't exists, even if opened in write mode
ReadOnly, // Open in read only
Text, // Open in text mod
Truncate, // Create the file if it doesn't exist and empty it if it exists
WriteOnly, // Open in write only, create the file if it doesn't exist
Append, // Disable writing on existing parts and put the cursor at the end
Lock, // Disable modifying the file before it is open
MustExist, // Fail if the file doesn't exists, even if opened in write mode
ReadOnly, // Open in read only
Text, // Open in text mod
Truncate, // Create the file if it doesn't exist and empty it if it exists
Unbuffered, // Each read/write operation is equivalent to a read or write system call (slow)
WriteOnly, // Open in write only, create the file if it doesn't exist
Max = WriteOnly
};
@ -224,8 +225,9 @@ namespace Nz
Sequential,
Text,
Unbuffered,
Max = Text
Max = Unbuffered
};
template<>

View File

@ -40,7 +40,6 @@ namespace Nz
bool Exists() const;
UInt64 GetCursorPos() const override;
std::filesystem::path GetDirectory() const override;
std::filesystem::path GetFileName() const;
std::filesystem::path GetPath() const override;
@ -51,8 +50,6 @@ namespace Nz
bool Open(OpenModeFlags openMode = OpenMode::NotOpen);
bool Open(const std::filesystem::path& filePath, OpenModeFlags openMode = OpenMode::NotOpen);
bool SetCursorPos(CursorPosition pos, Int64 offset = 0);
bool SetCursorPos(UInt64 offset) override;
bool SetFile(const std::filesystem::path& filePath);
bool SetSize(UInt64 size);
@ -65,6 +62,8 @@ namespace Nz
private:
void FlushStream() override;
std::size_t ReadBlock(void* buffer, std::size_t size) override;
bool SeekStreamCursor(UInt64 offset) override;
UInt64 TellStreamCursor() const override;
std::size_t WriteBlock(const void* buffer, std::size_t size) override;
std::filesystem::path m_filePath;

View File

@ -30,11 +30,9 @@ namespace Nz
inline ByteArray& GetBuffer();
inline const ByteArray& GetBuffer() const;
UInt64 GetCursorPos() const override;
UInt64 GetSize() const override;
void SetBuffer(ByteArray* byteArray, OpenModeFlags openMode = OpenMode_ReadWrite);
bool SetCursorPos(UInt64 offset) override;
MemoryStream& operator=(const MemoryStream&) = default;
MemoryStream& operator=(MemoryStream&&) noexcept = default;
@ -42,6 +40,8 @@ namespace Nz
private:
void FlushStream() override;
std::size_t ReadBlock(void* buffer, std::size_t size) override;
bool SeekStreamCursor(UInt64 offset) override;
UInt64 TellStreamCursor() const override;
std::size_t WriteBlock(const void* buffer, std::size_t size) override;
MovablePtr<ByteArray> m_buffer;

View File

@ -23,17 +23,16 @@ namespace Nz
bool EndOfStream() const override;
UInt64 GetCursorPos() const override;
UInt64 GetSize() const override;
bool SetCursorPos(UInt64 offset) override;
MemoryView& operator=(const MemoryView&) = delete;
MemoryView& operator=(MemoryView&&) = delete; ///TODO
private:
void FlushStream() override;
std::size_t ReadBlock(void* buffer, std::size_t size) override;
bool SeekStreamCursor(UInt64 offset) override;
UInt64 TellStreamCursor() const override;
std::size_t WriteBlock(const void* buffer, std::size_t size) override;
UInt8* m_ptr;

View File

@ -11,6 +11,7 @@
#include <Nazara/Core/Endianness.hpp>
#include <Nazara/Core/Enums.hpp>
#include <filesystem>
#include <memory>
#include <string>
namespace Nz
@ -26,27 +27,29 @@ namespace Nz
virtual bool EndOfStream() const = 0;
inline void EnableBuffering(bool buffering, std::size_t bufferSize = DefaultBufferSize);
inline void EnableTextMode(bool textMode);
inline void Flush();
virtual UInt64 GetCursorPos() const = 0;
virtual std::filesystem::path GetDirectory() const;
virtual std::filesystem::path GetPath() const;
inline OpenModeFlags GetOpenMode() const;
inline StreamOptionFlags GetStreamOptions() const;
UInt64 GetCursorPos() const;
virtual UInt64 GetSize() const = 0;
inline std::size_t Read(void* buffer, std::size_t size);
std::size_t Read(void* buffer, std::size_t size);
virtual std::string ReadLine(unsigned int lineSize = 0);
inline bool IsBufferingEnabled() const;
inline bool IsReadable() const;
inline bool IsSequential() const;
inline bool IsTextModeEnabled() const;
inline bool IsWritable() const;
virtual bool SetCursorPos(UInt64 offset) = 0;
bool SetCursorPos(UInt64 offset);
bool Write(const ByteArray& byteArray);
bool Write(const std::string_view& string);
@ -55,15 +58,24 @@ namespace Nz
Stream& operator=(const Stream&) = default;
Stream& operator=(Stream&&) noexcept = default;
static constexpr std::size_t DefaultBufferSize = 0xFFFF;
protected:
inline Stream(StreamOptionFlags streamOptions = StreamOption::None, OpenModeFlags openMode = OpenMode::NotOpen);
virtual void FlushStream() = 0;
virtual std::size_t ReadBlock(void* buffer, std::size_t size) = 0;
virtual bool SeekStreamCursor(UInt64 offset) = 0;
virtual UInt64 TellStreamCursor() const = 0;
virtual std::size_t WriteBlock(const void* buffer, std::size_t size) = 0;
std::size_t m_bufferCapacity;
std::size_t m_bufferOffset;
std::size_t m_bufferSize;
std::unique_ptr<UInt8[]> m_buffer;
OpenModeFlags m_openMode;
StreamOptionFlags m_streamOptions;
UInt64 m_bufferCursor;
};
}

View File

@ -17,8 +17,12 @@ namespace Nz
*/
inline Stream::Stream(StreamOptionFlags streamOptions, OpenModeFlags openMode) :
m_bufferCapacity(0),
m_bufferOffset(0),
m_bufferSize(0),
m_openMode(openMode),
m_streamOptions(streamOptions)
m_streamOptions(streamOptions),
m_bufferCursor(0)
{
}
@ -28,6 +32,28 @@ namespace Nz
* \param textMode Enables the mode or disables
*/
inline void Stream::EnableBuffering(bool buffering, std::size_t bufferSize)
{
if (buffering)
{
m_streamOptions &= ~StreamOption::Unbuffered;
if (m_bufferCapacity != bufferSize)
{
m_buffer = std::make_unique<UInt8[]>(bufferSize);
m_bufferCursor = TellStreamCursor();
m_bufferOffset = 0;
m_bufferSize = 0;
m_bufferCapacity = bufferSize;
}
}
else
{
m_streamOptions |= StreamOption::Unbuffered;
m_buffer.reset();
m_bufferCapacity = 0;
}
}
inline void Stream::EnableTextMode(bool textMode)
{
if (textMode)
@ -74,6 +100,11 @@ namespace Nz
* \return true if it is the case
*/
inline bool Stream::IsBufferingEnabled() const
{
return (m_streamOptions & StreamOption::Unbuffered) == 0;
}
inline bool Stream::IsReadable() const
{
return (m_openMode & OpenMode::ReadOnly) != 0;
@ -109,22 +140,27 @@ namespace Nz
return (m_openMode & OpenMode::WriteOnly) != 0;
}
/*!
* \brief Reads the stream and puts the result in a buffer
* \return Size of the read
*
* \param buffer Buffer to stock data
* \param size Size meant to be read
*
* \remark Produces a NazaraAssert if stream is not readable
* \remark If preallocated space of buffer is less than the size, the behavior is undefined
*/
inline std::size_t Stream::Read(void* buffer, std::size_t size)
inline bool Stream::SetCursorPos(UInt64 offset)
{
NazaraAssert(IsReadable(), "Stream is not readable");
if (m_bufferCapacity == 0)
return SeekStreamCursor(offset);
else
{
if (offset >= m_bufferCursor && offset - m_bufferCursor < m_bufferSize)
m_bufferOffset += offset - m_bufferCursor;
else
{
// Out of buffer
if (!SeekStreamCursor(offset))
return false;
return ReadBlock(buffer, size);
m_bufferCursor = offset;
m_bufferOffset = 0;
m_bufferSize = 0;
}
return true;
}
}
/*!
@ -137,12 +173,17 @@ namespace Nz
* \remark Produces a NazaraAssert if stream is not writable
* \remark If preallocated space of buffer is less than the size, the behavior is undefined
*/
inline std::size_t Stream::Write(const void* buffer, std::size_t size)
{
NazaraAssert(IsWritable(), "Stream is not writable");
return WriteBlock(buffer, size);
std::size_t writeSize = WriteBlock(buffer, size);
// For now, don't buffer writes
m_bufferCursor += writeSize;
m_bufferSize = 0;
return writeSize;
}
}

View File

@ -37,7 +37,6 @@ namespace Nz
bool EndOfStream() const override;
UInt64 GetCursorPos() const override;
inline UInt64 GetKeepAliveInterval() const;
inline UInt64 GetKeepAliveTime() const;
inline IpAddress GetRemoteAddress() const;
@ -55,8 +54,6 @@ namespace Nz
bool SendMultiple(const NetBuffer* buffers, std::size_t bufferCount, std::size_t* sent);
bool SendPacket(const NetPacket& packet);
bool SetCursorPos(UInt64 offset) override;
SocketState WaitForConnected(UInt64 msTimeout = 3000);
inline TcpClient& operator=(TcpClient&& tcpClient) = default;
@ -69,6 +66,8 @@ namespace Nz
std::size_t ReadBlock(void* buffer, std::size_t size) override;
void Reset(SocketHandle handle, const IpAddress& peerAddress);
bool SeekStreamCursor(UInt64 offset) override;
UInt64 TellStreamCursor() const override;
std::size_t WriteBlock(const void* buffer, std::size_t size) override;
struct PendingPacket

View File

@ -32,15 +32,6 @@ namespace Nz
return false;
}
/*!
* \brief Gets the position of the cursor (which is always zero)
* \return Always zero
*/
UInt64 EmptyStream::GetCursorPos() const
{
return 0;
}
/*!
* \brief Gets the size of the raw memory (how many bytes would have been written on a regular stream)
* \return Size occupied until now
@ -50,17 +41,6 @@ namespace Nz
return m_size;
}
/*!
* \brief Does nothing
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool EmptyStream::SetCursorPos(UInt64 /*offset*/)
{
return true;
}
/*!
* \brief Flushes the stream (does nothing)
*/
@ -83,6 +63,26 @@ namespace Nz
return 0;
}
/*!
* \brief Does nothing
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool EmptyStream::SeekStreamCursor(UInt64 /*offset*/)
{
return true;
}
/*!
* \brief Gets the position of the cursor (which is always zero)
* \return Always zero
*/
UInt64 EmptyStream::TellStreamCursor() const
{
return 0;
}
/*!
* \brief Writes data
* \return size

View File

@ -154,20 +154,6 @@ namespace Nz
return std::filesystem::exists(m_filePath);
}
/*!
* \brief Gets the position of the cursor in the file
* \return Position of the cursor
*
* \remark Produces a NazaraAssert if file is not open
*/
UInt64 File::GetCursorPos() const
{
NazaraAssert(IsOpen(), "File is not open");
return m_impl->GetCursorPos();
}
/*!
* \brief Gets the directory of the file
* \return Directory of the file
@ -240,6 +226,8 @@ namespace Nz
m_openMode = openMode;
m_impl = std::move(impl);
EnableBuffering(!m_openMode.Test(OpenMode::Unbuffered));
if (m_openMode & OpenMode::Text)
m_streamOptions |= StreamOption::Text;
else
@ -266,39 +254,6 @@ namespace Nz
return Open(openMode);
}
/*!
* \brief Sets the position of the cursor
* \return true if cursor is successfully positioned
*
* \param pos Position of the cursor
* \param offset Offset according to the cursor position
*
* \remark Produces a NazaraAssert if file is not open
*/
bool File::SetCursorPos(CursorPosition pos, Int64 offset)
{
NazaraAssert(IsOpen(), "File is not open");
return m_impl->SetCursorPos(pos, offset);
}
/*!
* \brief Sets the position of the cursor
* \return true if cursor is successfully positioned
*
* \param offset Offset according to the cursor begin position
*
* \remark Produces a NazaraAssert if file is not open
*/
bool File::SetCursorPos(UInt64 offset)
{
NazaraAssert(IsOpen(), "File is not open");
return m_impl->SetCursorPos(CursorPosition::AtBegin, offset);
}
/*!
* \brief Sets the file path
* \return true if file opening is successful
@ -389,6 +344,35 @@ namespace Nz
}
}
/*!
* \brief Sets the position of the cursor
* \return true if cursor is successfully positioned
*
* \param offset Offset according to the cursor begin position
*
* \remark Produces a NazaraAssert if file is not open
*/
bool File::SeekStreamCursor(UInt64 offset)
{
NazaraAssert(IsOpen(), "File is not open");
return m_impl->SetCursorPos(CursorPosition::AtBegin, offset);
}
/*!
* \brief Gets the position of the cursor in the file
* \return Position of the cursor
*
* \remark Produces a NazaraAssert if file is not open
*/
UInt64 File::TellStreamCursor() const
{
NazaraAssert(IsOpen(), "File is not open");
return m_impl->GetCursorPos();
}
/*!
* \brief Writes blocks
* \return Number of blocks written

View File

@ -36,16 +36,6 @@ namespace Nz
return m_pos >= m_buffer->size();
}
/*!
* \brief Gets the position of the cursor
* \return Position of the cursor
*/
UInt64 MemoryStream::GetCursorPos() const
{
return m_pos;
}
/*!
* \brief Gets the size of the raw memory
* \return Size of the memory
@ -73,20 +63,6 @@ namespace Nz
m_openMode = openMode;
}
/*!
* \brief Sets the position of the cursor
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool MemoryStream::SetCursorPos(UInt64 offset)
{
m_pos = offset;
return true;
}
/*!
* \brief Flushes the stream
*/
@ -118,6 +94,28 @@ namespace Nz
return readSize;
}
/*!
* \brief Sets the position of the cursor
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool MemoryStream::SeekStreamCursor(UInt64 offset)
{
m_pos = offset;
return true;
}
/*!
* \brief Gets the position of the cursor
* \return Position of the cursor
*/
UInt64 MemoryStream::TellStreamCursor() const
{
return m_pos;
}
/*!
* \brief Writes blocks
* \return Number of blocks written
@ -127,7 +125,6 @@ namespace Nz
*
* \remark Produces a NazaraAssert if buffer is nullptr
*/
std::size_t MemoryStream::WriteBlock(const void* buffer, std::size_t size)
{
if (size > 0)

View File

@ -59,16 +59,6 @@ namespace Nz
return m_pos >= m_size;
}
/*!
* \brief Gets the position of the cursor
* \return Position of the cursor
*/
UInt64 MemoryView::GetCursorPos() const
{
return m_pos;
}
/*!
* \brief Gets the size of the raw memory
* \return Size of the memory
@ -79,20 +69,6 @@ namespace Nz
return m_size;
}
/*!
* \brief Sets the position of the cursor
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool MemoryView::SetCursorPos(UInt64 offset)
{
m_pos = std::min(offset, m_size);
return true;
}
/*!
* \brief Flushes the stream
*/
@ -121,6 +97,28 @@ namespace Nz
return readSize;
}
/*!
* \brief Sets the position of the cursor
* \return true
*
* \param offset Offset according to the beginning of the stream
*/
bool MemoryView::SeekStreamCursor(UInt64 offset)
{
m_pos = std::min(offset, m_size);
return true;
}
/*!
* \brief Gets the position of the cursor
* \return Position of the cursor
*/
UInt64 MemoryView::TellStreamCursor() const
{
return m_pos;
}
/*!
* \brief Writes blocks
* \return Number of blocks written

View File

@ -30,7 +30,7 @@ namespace Nz
std::filesystem::path Stream::GetDirectory() const
{
return std::filesystem::path();
return {};
}
/*!
@ -40,7 +40,81 @@ namespace Nz
std::filesystem::path Stream::GetPath() const
{
return std::filesystem::path();
return {};
}
UInt64 Stream::GetCursorPos() const
{
if (m_bufferCapacity == 0)
return TellStreamCursor();
else
{
assert(m_bufferCursor >= m_bufferSize);
return m_bufferCursor - m_bufferSize + m_bufferOffset;
}
}
/*!
* \brief Reads the stream and puts the result in a buffer
* \return Size of the read
*
* \param buffer Buffer to stock data
* \param size Size meant to be read
*
* \remark Produces a NazaraAssert if stream is not readable
* \remark If preallocated space of buffer is less than the size, the behavior is undefined
*/
std::size_t Stream::Read(void* buffer, std::size_t size)
{
NazaraAssert(IsReadable(), "Stream is not readable");
if (m_bufferCapacity == 0)
return ReadBlock(buffer, size);
UInt8* ptr = static_cast<UInt8*>(buffer);
std::size_t readSize = 0;
if (m_bufferOffset < m_bufferSize)
{
std::size_t availableSize = std::min(size, m_bufferSize - m_bufferOffset);
if (ptr)
{
std::memcpy(ptr, &m_buffer[m_bufferOffset], availableSize);
ptr += availableSize;
}
m_bufferOffset += availableSize;
readSize += availableSize;
size -= availableSize;
}
if (size > m_bufferCapacity)
{
// Unbuffered read
m_bufferSize = 0;
std::size_t blockSize = ReadBlock(ptr, size);
m_bufferCursor += blockSize;
readSize += blockSize;
}
else if (size > 0)
{
m_bufferOffset = 0;
m_bufferSize = ReadBlock(&m_buffer[0], m_bufferCapacity);
m_bufferCursor += m_bufferSize;
std::size_t remainingSize = std::min(m_bufferSize, size);
if (ptr)
{
std::memcpy(ptr, &m_buffer[m_bufferOffset], remainingSize);
ptr += remainingSize;
}
m_bufferOffset += remainingSize;
readSize += remainingSize;
size -= remainingSize;
}
return readSize;
}
/*!

View File

@ -147,19 +147,6 @@ namespace Nz
return QueryAvailableBytes() == 0;
}
/*!
* \brief Gets the position of the cursor
* \return 0
*
* \remark Produces a NazaraError because it is a special stream
*/
UInt64 TcpClient::GetCursorPos() const
{
NazaraError("GetCursorPos() cannot be used on sequential streams");
return 0;
}
/*!
* \brief Gets the size of the raw memory available
* \return Size of the memory available
@ -456,21 +443,6 @@ namespace Nz
return Send(ptr, size, nullptr);
}
/*!
* \brief Sets the position of the cursor
* \return false
*
* \param offset Offset according to the beginning of the stream
*
* \remark Produces a NazaraError because it is a special stream
*/
bool TcpClient::SetCursorPos(UInt64 /*offset*/)
{
NazaraError("SetCursorPos() cannot be used on sequential streams");
return false;
}
/*!
* \brief Waits for being connected before time out
* \return The new socket state, either Connected if connection did succeed or NotConnected if an error occurred
@ -605,6 +577,32 @@ namespace Nz
UpdateState(SocketState::Connected);
}
/*!
* \brief Sets the position of the cursor
* \return false
*
* \param offset Offset according to the beginning of the stream
*
* \remark Produces a NazaraError because it is a special stream
*/
bool TcpClient::SeekStreamCursor(UInt64 /*offset*/)
{
NazaraError("SeekStreamCursor() cannot be used on sequential streams");
return false;
}
/*!
* \brief Gets the position of the cursor
* \return 0
*
* \remark Produces a NazaraError because it is a special stream
*/
UInt64 TcpClient::TellStreamCursor() const
{
NazaraError("TellStreamCursor() cannot be used on sequential streams");
return 0;
}
/*!
* \brief Writes blocks
* \return Number of blocks written