/* Copyright 2010-2014 MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
namespace MongoDB.Bson.IO
{
///
/// An IBsonBuffer that has multiple chunks.
///
public class MultiChunkBuffer : IByteBuffer
{
// private fields
private readonly BsonChunkPool _chunkPool;
private readonly int _chunkSize;
private readonly int _sliceOffset;
private int _capacity;
private List _chunks;
private bool _disposed;
private bool _isReadOnly;
private int _length;
private int _position;
// constructors
///
/// Initializes a new instance of the class.
///
/// The chunk pool.
/// chunkPool
public MultiChunkBuffer(BsonChunkPool chunkPool)
{
if (chunkPool == null)
{
throw new ArgumentNullException("chunkPool");
}
_chunkPool = chunkPool;
_chunks = new List();
_chunkSize = chunkPool.ChunkSize;
_sliceOffset = 0;
_capacity = 0; // EnsureSpaceAvailable will add capacity as needed
_length = 0;
_position = 0;
}
///
/// Initializes a new instance of the class.
///
/// The chunks.
/// The slice offset.
/// The length.
/// Whether the buffer is read only.
/// chunks
internal MultiChunkBuffer(IEnumerable chunks, int sliceOffset, int length, bool isReadOnly)
{
if (chunks == null)
{
throw new ArgumentNullException("chunks");
}
_chunks = new List(chunks);
if (_chunks.Count == 0)
{
throw new ArgumentException("No chunks where provided.", "chunks");
}
_chunkSize = _chunks[0].Bytes.Length;
foreach (var chunk in _chunks)
{
if (chunk.Bytes.Length != _chunkSize) { throw new ArgumentException("The chunks are not all the same size."); }
}
if (sliceOffset < 0)
{
throw new ArgumentOutOfRangeException("sliceOffset");
}
_sliceOffset = sliceOffset;
var maxCapacity = _chunks.Count * _chunkSize - _sliceOffset;
if (length < 0 || length > maxCapacity)
{
throw new ArgumentOutOfRangeException("length");
}
_capacity = isReadOnly ? length : maxCapacity; // the capacity is fixed
_length = length;
_chunkPool = null;
_isReadOnly = isReadOnly;
_position = 0;
foreach (var chunk in _chunks)
{
chunk.IncrementReferenceCount();
}
}
// public properties
///
/// Gets or sets the capacity.
///
///
/// The capacity.
///
/// MultiChunkBuffer
/// The capacity of a MultiChunkBuffer cannot be changed.
public int Capacity
{
get
{
ThrowIfDisposed();
return _capacity;
}
set
{
ThrowIfDisposed();
EnsureIsWritable();
if (value < 0)
{
throw new ArgumentOutOfRangeException("Capacity");
}
if (value < _capacity)
{
ShrinkCapacity(value);
}
else if (value > _capacity)
{
ExpandCapacity(value);
}
}
}
///
/// Gets a value indicating whether this instance is read only.
///
///
/// true if this instance is read only; otherwise, false.
///
/// MultiChunkBuffer
public bool IsReadOnly
{
get
{
ThrowIfDisposed();
return _isReadOnly;
}
}
///
/// Gets or sets the length.
///
///
/// The length.
///
/// MultiChunkBuffer
/// Length
/// The length of a read only buffer cannot be changed.
public int Length
{
get
{
ThrowIfDisposed();
return _length;
}
set
{
ThrowIfDisposed();
if (value < 0 || value > _capacity)
{
throw new ArgumentOutOfRangeException("Length");
}
EnsureIsWritable();
EnsureSpaceAvailable(value - _position);
_length = value;
if (_position > _length)
{
_position = _length;
}
}
}
///
/// Gets or sets the position.
///
///
/// The position.
///
/// MultiChunkBuffer
/// Position
public int Position
{
get
{
ThrowIfDisposed();
return _position;
}
set
{
ThrowIfDisposed();
if (value < 0 || value > _capacity)
{
throw new ArgumentOutOfRangeException("Position");
}
EnsureSpaceAvailable(value - _position);
_position = value;
if (_length < _position)
{
_length = _position;
}
}
}
// public methods
///
/// Clears this instance.
///
/// MultiChunkBuffer
/// The MultiChunkBuffer is read only.
public void Clear()
{
ThrowIfDisposed();
EnsureIsWritable();
ShrinkCapacity(0);
}
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Finds the next null byte.
///
///
/// The position of the next null byte.
///
/// MultiChunkBuffer
public int FindNullByte()
{
ThrowIfDisposed();
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var remaining = _length - _position;
while (remaining > 0)
{
var chunkRemaining = _chunkSize - chunkOffset;
var index = Array.IndexOf(_chunks[chunkIndex].Bytes, 0, chunkOffset, chunkRemaining);
if (index != -1)
{
return (chunkIndex * _chunkSize + index) - _sliceOffset;
}
chunkIndex += 1;
chunkOffset = 0;
remaining -= chunkRemaining;
}
return -1;
}
///
/// Gets a slice of this buffer.
///
/// The position of the start of the slice.
/// The length of the slice.
///
/// A slice of this buffer.
///
/// MultiChunkBuffer
/// GetSlice can only be called for read only buffers.
///
/// position
/// or
/// length
///
public IByteBuffer GetSlice(int position, int length)
{
ThrowIfDisposed();
EnsureIsReadOnly();
if (position < 0 || position >= _length)
{
throw new ArgumentOutOfRangeException("position");
}
if (length <= 0 || length > _length - position)
{
throw new ArgumentOutOfRangeException("length");
}
var firstChunk = (_sliceOffset + position) / _chunkSize;
var lastChunk = (_sliceOffset + position + length - 1) / _chunkSize;
var sliceOffset = (_sliceOffset + position) - (firstChunk * _chunkSize);
if (firstChunk == lastChunk)
{
return new SingleChunkBuffer(_chunks[firstChunk], sliceOffset, length, true);
}
else
{
var chunks = _chunks.Skip(firstChunk).Take(lastChunk - firstChunk + 1);
return new MultiChunkBuffer(chunks, sliceOffset, length, true);
}
}
///
/// Loads the buffer from a stream.
///
/// The stream.
/// The count.
/// MultiChunkBuffer
/// The MultiChunkBuffer is read only.
/// stream
/// count
public void LoadFrom(Stream stream, int count)
{
ThrowIfDisposed();
EnsureIsWritable();
if (stream == null)
{
throw new ArgumentNullException("stream");
}
EnsureSpaceAvailable(count);
var position = _position; // don't advance position
while (count > 0)
{
var chunkIndex = (_sliceOffset + position) / _chunkSize;
var chunkOffset = (_sliceOffset + position) % _chunkSize;
var chunkRemaining = _chunkSize - chunkOffset;
var bytesToRead = (count <= chunkRemaining) ? count : chunkRemaining;
var bytesRead = stream.Read(_chunks[chunkIndex].Bytes, chunkOffset, bytesToRead);
if (bytesRead == 0)
{
throw new EndOfStreamException();
}
position += bytesRead;
count -= bytesRead;
}
if (_length < position)
{
_length = position;
}
}
///
/// Makes this buffer read only.
///
/// ByteArrayBuffer
public void MakeReadOnly()
{
ThrowIfDisposed();
_isReadOnly = true;
}
///
/// Read directly from the backing bytes. The returned ArraySegment points directly to the backing bytes for
/// the current position and you can read the bytes directly from there. If the backing bytes happen to span
/// a chunk boundary shortly after the current position there might not be enough bytes left in the current
/// chunk in which case the returned ArraySegment will have a Count of zero and you should call ReadBytes instead.
///
/// When ReadBackingBytes returns the position will have been advanced by count bytes *if and only if* there
/// were count bytes left in the current chunk.
///
/// The number of bytes you need to read.
///
/// An ArraySegment pointing directly to the backing bytes for the current position.
///
/// MultiChunkBuffer
public ArraySegment ReadBackingBytes(int count)
{
ThrowIfDisposed();
EnsureDataAvailable(count);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var chunkRemaining = _chunkSize - chunkOffset;
if (count <= chunkRemaining)
{
_position += count;
return new ArraySegment(_chunks[chunkIndex].Bytes, chunkOffset, count);
}
else
{
return new ArraySegment();
}
}
///
/// Reads a byte.
///
///
/// A byte.
///
/// MultiChunkBuffer
public byte ReadByte()
{
ThrowIfDisposed();
EnsureDataAvailable(1);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var value = _chunks[chunkIndex].Bytes[chunkOffset];
_position += 1;
return value;
}
///
/// Reads bytes.
///
/// The destination.
/// The destination offset.
/// The count.
/// MultiChunkBuffer
public void ReadBytes(byte[] destination, int destinationOffset, int count)
{
ThrowIfDisposed();
EnsureDataAvailable(count);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
while (count > 0)
{
var chunkRemaining = _chunkSize - chunkOffset;
var bytesToCopy = (count < chunkRemaining) ? count : chunkRemaining;
Buffer.BlockCopy(_chunks[chunkIndex].Bytes, chunkOffset, destination, destinationOffset, bytesToCopy);
chunkIndex += 1;
chunkOffset = 0;
count -= bytesToCopy;
destinationOffset += bytesToCopy;
_position += bytesToCopy;
}
}
///
/// Reads bytes.
///
/// The count.
///
/// The bytes.
///
/// MultiChunkBuffer
public byte[] ReadBytes(int count)
{
ThrowIfDisposed();
var destination = new byte[count];
ReadBytes(destination, 0, count);
return destination;
}
///
/// Write directly to the backing bytes. The returned ArraySegment points directly to the backing bytes for
/// the current position and you can write the bytes directly to there. If the backing bytes happen to span
/// a chunk boundary shortly after the current position there might not be enough bytes left in the current
/// chunk in which case the returned ArraySegment will have a Count of zero and you should call WriteBytes instead.
///
/// When WriteBackingBytes returns the position has not been advanced. After you have written up to count
/// bytes directly to the backing bytes advance the position by the number of bytes actually written.
///
/// The count.
///
/// An ArraySegment pointing directly to the backing bytes for the current position.
///
/// MultiChunkBuffer
public ArraySegment WriteBackingBytes(int count)
{
ThrowIfDisposed();
EnsureSpaceAvailable(count);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var chunkRemaining = _chunkSize - chunkOffset;
if (count <= chunkRemaining)
{
return new ArraySegment(_chunks[chunkIndex].Bytes, chunkOffset, count);
}
else
{
return new ArraySegment();
}
}
///
/// Writes a byte.
///
/// The byte.
/// MultiChunkBuffer
/// The MultiChunkBuffer is read only.
public void WriteByte(byte source)
{
ThrowIfDisposed();
EnsureIsWritable();
EnsureSpaceAvailable(1);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
_chunks[chunkIndex].Bytes[chunkOffset] = source;
_position += 1;
if (_length < _position)
{
_length = _position;
}
}
///
/// Writes bytes.
///
/// The bytes (in the form of a byte array).
/// MultiChunkBuffer
/// The MultiChunkBuffer is read only.
public void WriteBytes(byte[] source)
{
ThrowIfDisposed();
EnsureIsWritable();
EnsureSpaceAvailable(source.Length);
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var remaining = source.Length;
var sourceOffset = 0;
while (remaining > 0)
{
var chunkRemaining = _chunkSize - chunkOffset;
var bytesToCopy = (remaining < chunkRemaining) ? remaining : chunkRemaining;
Buffer.BlockCopy(source, sourceOffset, _chunks[chunkIndex].Bytes, chunkOffset, bytesToCopy);
chunkIndex += 1;
chunkOffset = 0;
remaining -= bytesToCopy;
sourceOffset += bytesToCopy;
_position += bytesToCopy;
}
if (_length < _position)
{
_length = _position;
}
}
///
/// Writes bytes.
///
/// The bytes (in the form of an IByteBuffer).
/// MultiChunkBuffer
/// The MultiChunkBuffer is read only.
public void WriteBytes(IByteBuffer source)
{
ThrowIfDisposed();
EnsureIsWritable();
EnsureSpaceAvailable(source.Length);
var savedPosition = source.Position;
source.Position = 0;
var chunkIndex = (_sliceOffset + _position) / _chunkSize;
var chunkOffset = (_sliceOffset + _position) % _chunkSize;
var remaining = source.Length;
while (remaining > 0)
{
var chunkRemaining = _chunkSize - chunkOffset;
var bytesToCopy = (remaining < chunkRemaining) ? remaining : chunkRemaining;
source.ReadBytes(_chunks[chunkIndex].Bytes, chunkOffset, bytesToCopy);
chunkIndex += 1;
chunkOffset = 0;
remaining -= bytesToCopy;
_position += bytesToCopy;
}
if (_length < _position)
{
_length = _position;
}
source.Position = savedPosition;
}
///
/// Writes Length bytes from this buffer starting at Position 0 to a stream.
///
/// The stream.
/// MultiChunkBuffer
public void WriteTo(Stream stream)
{
ThrowIfDisposed();
var chunkIndex = _sliceOffset / _chunkSize;
var chunkOffset = _sliceOffset % _chunkSize;
var remaining = _length;
while (remaining > 0)
{
var chunkRemaining = _chunkSize - chunkOffset;
var bytesToWrite = (remaining < chunkRemaining) ? remaining : chunkRemaining;
stream.Write(_chunks[chunkIndex].Bytes, chunkOffset, bytesToWrite);
chunkIndex += 1;
chunkOffset = 0;
remaining -= bytesToWrite;
}
}
// protected methods
///
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
///
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
if (_chunks != null)
{
foreach (var chunk in _chunks)
{
chunk.DecrementReferenceCount();
}
_chunks = null;
}
}
_disposed = true;
}
}
///
/// Throws if disposed.
///
///
protected void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
// private methods
private void EnsureDataAvailable(int needed)
{
if (needed > _length - _position)
{
var available = _length - _position;
var message = string.Format(
"Not enough input bytes available. Needed {0}, but only {1} are available (at position {2}).",
needed, available, _position);
throw new EndOfStreamException(message);
}
}
private void EnsureIsReadOnly()
{
if (!_isReadOnly)
{
throw new InvalidOperationException("MultiChunkBuffer is not read only.");
}
}
private void EnsureIsWritable()
{
if (_isReadOnly)
{
throw new InvalidOperationException("MultiChunkBuffer is not writable.");
}
}
private void EnsureSpaceAvailable(int needed)
{
if (needed > _capacity - _position)
{
var targetCapacity = _position + needed;
ExpandCapacity(targetCapacity);
}
}
private void ExpandCapacity(int targetCapacity)
{
if (_chunkPool == null)
{
throw new InvalidOperationException("Capacity cannot be expanded because this buffer was created without specifying a chunk pool.");
}
while (_capacity < targetCapacity)
{
var chunk = _chunkPool.AcquireChunk();
chunk.IncrementReferenceCount();
_chunks.Add(chunk);
_capacity += _chunkSize;
}
}
private void ShrinkCapacity(int targetCapacity)
{
while (_capacity > targetCapacity && (_capacity - targetCapacity) >= _chunkSize)
{
var lastIndex = _chunks.Count - 1;
_chunks[lastIndex].DecrementReferenceCount();
_chunks.RemoveAt(lastIndex);
_capacity -= _chunkSize;
}
if (_length > targetCapacity)
{
_length = targetCapacity;
}
if (_position > targetCapacity)
{
_position = targetCapacity;
}
}
}
}