Saturday, August 18, 2012

Streaming the data out

Oren, the main ravendb guy, mentioned on the mailing list that we should be streaming the data to the database: "We have users that uses documents that are several MB in size, and we have to stream it to disk, instead of copying the data several times."

So the simple document adder function I mentioned a couple posts ago won't cut it since using MemoryStream.ToArray is a no-no for large documents.  BDB already supports using your provided byte buffer, and it also supports partial writes.  These are the two things required for us to support the Stream interface to the database.

Partial writes

Starting from square one with no documents in the database, we now need to be able to support that same simple PUT command, but never call MemoryStream.ToArray.  The main IDocumentStorageActions.AddDocument looks basically the same except that the streaming interface (I know you see a MemoryStream.ToArray in there which I just said was a no-no, but I'm hoping that I'll be allowed to get away with this since the metadata shouldn't ever be very big).

Guid newEtag = uuidGenerator.CreateSequentialUuid();

using(var ms = new MemoryStream()) { metadata.WriteTo(ms); metadataBuffer = ms.ToArray(); }
using(Stream stream = new BufferedStream(database.DocumentTable.AddDocument(transaction, key, newEtag, System.DateTime.Now, metadataBuffer)))
{
 using (var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
 {
  data.WriteTo(finalStream);
  stream.Flush();
 }
}

return newEtag;

The changes come from the DocumentTable's AddDocument function.  This function now needs to return a Stream object that can be used to partial write the document to the database.

public Stream AddDocument(Txn transaction, string key, Guid etag, DateTime dateTime, byte[] metadata)
{
 long docId, existingId;
 Guid existingEtag;

 //update or insert?
 if(!GetEtagByKey(transaction, key, out existingId, out existingEtag))
 {
  long lastId = 0;
  var vlastId = DbEntry.Out(new byte[8]);
  var vlastData = DbEntry.EmptyOut();
   using (var cursor = dataTable.OpenCursor(transaction, DbFileCursor.CreateFlags.None))
  {
   if (cursor.Get(ref vlastId, ref vlastData, DbFileCursor.GetMode.Last, DbFileCursor.ReadFlags.None) != ReadStatus.NotFound)
    lastId = BitConverter.ToInt64(vlastId.Buffer, 0);
  }
   docId = lastId + 1;
 }
 else
 {
  docId = existingId;
 }

 return new DocumentWriteStream(dataTable, transaction, docId, key,
   etag, dateTime, metadata);
}

Again, up to this point, the function is basically the same, the new stuff comes in the DocumentWriteStream object, which implements the abstract Stream interface.

private class DocumentWriteStream : Stream
{
 private readonly DbBTree data;
 private readonly Txn transaction;
 private readonly int headerSize;
 private DbEntry idBuffer;
 private long position;

 public DocumentWriteStream(DbBTree data, Txn transaction, long docId, string key, Guid etag, DateTime dateTime, byte[] metadata)
 {
  this.data = data;
  this.transaction = transaction;
  this.idBuffer = DbEntry.InOut(BitConverter.GetBytes(docId));
  this.position = 0;

  //put out the header we do know (we need to do this first since every put will call the secondary associate functions,
  // so we need the associte keys available in the header
  var header = new byte[documentBaseLength + key.Length * 2 + metadata.Length];
  Buffer.BlockCopy(etag.ToByteArray(), 0, header, 0, 16);
  Buffer.BlockCopy(BitConverter.GetBytes(dateTime.ToFileTime()), 0, header, 16, 8);
  Buffer.BlockCopy(BitConverter.GetBytes(key.Length * 2), 0, header, 24, 4);
  Buffer.BlockCopy(BitConverter.GetBytes(metadata.Length), 0, header, 28, 4);
  Buffer.BlockCopy(BitConverter.GetBytes(0), 0, header, 32, 4); //we don't know the document length yet
  Buffer.BlockCopy(Encoding.Unicode.GetBytes(key), 0, header, 36, key.Length * 2);
  Buffer.BlockCopy(metadata, 0, header, 36 + key.Length * 2, metadata.Length);
  this.headerSize = header.Length;

  var dvalue = DbEntry.InOut(header, 0, header.Length);
  data.Put(transaction, ref idBuffer, ref dvalue);
 }

We start up the document streamer by writing out the same header information we were writing before, the etag, modification time, sizes, key and metadata.  The difference is that we don't know what to put for the document size yet, so we just fill that in after the stream is flushed.

The important thing to note is that we have to set up the secondary key values in the header since when we issue a BDB put, the secondary index lookup function is going to be called in order to get the secondary index key.  We need this in place before we start streaming the data.

public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } } 
public override bool CanWrite { get { return true; } } 

public override long Length { get { throw new NotImplementedException(); } }
public override long Position { get { return position; } set { position = value; } }
public override int Read(byte[] buffer, int offset, int count) { throw new NotImplementedException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
public override void SetLength(long value) { throw new NotImplementedException(); }

Now for the boiler-place write-only non-seeking stream functions. And finally the actual methods we care about.

public override void Flush()
{
 var dvalue = DbEntry.InOut(BitConverter.GetBytes((int)position), 0, 4, 4, 32);
 data.Put(transaction, ref idBuffer, ref dvalue);
}

public override void Write(byte[] buffer, int offset, int count)
{
 var dvalue = DbEntry.InOut(buffer, offset, count, count, headerSize + (int)position);
 data.Put(transaction, ref idBuffer, ref dvalue);
 position += count;
}

The write command simply takes the buffer it's given and uses the current stream position to tell BDB where to add the data (BDB handles extending the size of the record as we do the partial writes).  And finally we on flush, we know the size of the document so we can re-issue the put command to put the document size in the correct header spot.

So, all-in-all doing the streaming writes is not that complicated.  It turns out that doing streaming reads is a little harder since we have to worry about opening, managing and disposing of cursors around the streaming read accesses, but we'll leave that for next time.

1 comment:

  1. Metadata - yes, there is _very rare_ big, if ever. We assume we can always materialize it.

    ReplyDelete