Saturday, August 25, 2012

First linux test

First successful run of RavenDB on linux today.  Compiled on windows, run on linux.  Man C# is pretty neat.


Friday, August 24, 2012

BDB queues

BDB queues are said to support high concurrency since it only locks records not pages, and if you only have 64 byte (or so) records and your page size is 512, then locking a page could lock a large number of records.  But I ran into a problem with it today...

We have multiple writes and multiple readers for the index stats table.  Writes update the stats from multiple indexing threads, while consumers check the status of the index using IsIndexStale.  We use snapshot isolation to prevent read locks during the IsIndexStale call, but here comes the rub.

BDB queues don't support snapshot isolation.

So that's that.

Monday, August 20, 2012

Mapped results "table"

Most of the tables in RavenDB are very simple "storage dumps".  There is an ID, some row properties, and a data section.  Then there is typically at most one secondary key, and one sorting key.  For example, the documents table.  There's the internal document ID, the secondary index which is the document key and the sorting key is the etag.  These tables turn out to be easy to achieve in BDB since it has built-in support for secondary key lookups.

The mapped results table is a little different since we are looking items up by more complicated keys and sorts.  I will attempt to describe how this is done in BDB as clearly as I can, since I had quite a few misses before achieving the results I wanted.

The mapped results table

The mapped results table holds raven's mapped results (obviously).  I don't fully understand what all of the columns (nor do I have to) are used for since I'm not that familiar with the specifics of how raven pulls off map-reduce.  But here are the columns for the table:
  • primary key ID
  • view name
  • document key
  • reduce key
  • reduce key/view hash
  • document etag
  • timestamp
  • document data
Now, there are a few ways that we access the data in the table (writing all of these out will help us model our  secondary indexes)
  1. get the most recent etag for a view
  2. get the document data for a view after a specified etag
  3. delete documents given a view
  4. get the document data documents given a view and a reduce key
  5. delete documents given a view and a document key
Once we make a slight change from the way the esent storage engine operates, most of these are easy to implements and involve a since secondary index combined with the primary row data.  The change we are going to make is to make the etag the primary key and remove the auto-incrementing ID.

We can then add a secondary index for the view.  This will map every document etag to the view it's part of.  We sort the view in ascending order and the etag in this secondary index in reverse order.  We tell BDB that we want duplicated in the index and we want to sort these indexes

this.indexByViewFile = env.CreateDatabase(DbCreateFlags.None);
indexByViewFile.PageSize = 512;
indexByViewFile.SetFlags(DbFlags.Dup | DbFlags.DupSort);
indexByViewFile.DupCompareFast = GuidCompareDesc;

Now we use this index to solve:
  1. open a cursor on indexByView, jump to the given view, then get the first primary key for the index.  Since the duplicates are stored in reverse order it will always be the most recent etag
  2. open a cursor on indexByView, jump to the given view, now continue to pull the duplicate secondary index records until we hit the stop etag.  Again since the duplicates are stored in reverse order, all of the keys we pull from the index until we hit the stop etag will be more recent than that stop etag.
  3. open a cursor on indexByView, jump to the given view, look at all duplicates and delete them
The next secondary index we need is based on the reduce key/view hash.  This secondary index will solve:
  1. open a cursor on indexByHash, jump to the given hash, then enumerate all duplicate keys.  This will give us all the etag that match the reduce key and view.  And just like the esent engine we also verify that the reduce key and view match in case of hash clashes.
The final access is the slightly more complicated scenario since we are trying to match two separate fields.  Of course BDB supports this, it's just a little more work.  We want to match by view and document key.  And we already have the secondary index on view, so we need to add another secondary index on document key.  Now, we use something called a join cursor in BDB.  This allows us to start up two cursors then join them together to access the primary data while both cursors continue to match.
  1. open a cursor on indexByView and jump to the given view.  Then open a cursor on indexByKey and jump to the given document key.  Now, with the primary row table, join these two cursors into another cursor.  Iterating over this cursor will give us etag that match the view and the document key.

using(var cursorView = indexByView.OpenCursor(txn))
using(var cursorKey = indexByKey.OpenCursor(txn))
{
  string currentView, currentKey;

  if(cursorView.JumpTo(view, out currentView) == ReadStatus.NotFound)
   return reduceKeys;
  if (currentView != view)
   return reduceKeys;

  if (cursorKey.JumpTo(documentId, out currentKey) == ReadStatus.NotFound)
   return reduceKeys;
  if (currentKey != documentId)
   return reduceKeys;

  using(var cursor = primaryTable.Join(new DbFileCursor[] { cursorView, cursorKey }, Db.JoinMode.None, 0))
  {
   var vKey = DbEntry.Out(new byte[16]);
   var vData = DbEntry.Out(new byte[128]);

   while (true)
   {
    var status = cursor.Get(ref vKey, ref vData, DbJoinCursor.GetMode.None, DbJoinCursor.ReadFlags.None);
    if (status == ReadStatus.NotFound)
     break;

    int viewLength = BitConverter.ToInt32(vData.Buffer, 40);
    int keyLength = BitConverter.ToInt32(vData.Buffer, 44);
    int reduceKeyLength = BitConverter.ToInt32(vData.Buffer, 48);
    string reduceKey = Encoding.Unicode.GetString(vData.Buffer, 56+viewLength+keyLength, reduceKeyLength);

    reduceKeys.Add(reduceKey);
    deletes.Add(new Guid(vKey.Buffer));
   }
  }
 }
}

Sunday, August 19, 2012

Making progress

Just an update on the progress ("are working" == unit test I can find for the operations are successful).
  • Documents are working
  • Transactions are working
  • Queues are working
  • Mapped indexes are working
The only things that seem to remain as far as full functionality are the map->reduce working set tables, the tasks tables and the attachment tables.

There's nothing much to report as far as anything interesting.  All of the extra operations involved are basically just copies of the existing access methods.

Saturday, August 18, 2012

Another document stream problem

I mentioned that secondary indexes are associated with the primary table.  Then whenever the primary table data changes, BDB calls the associate callback to update the secondary index based on the new primary table's row.  This is great, and it makes life much easier when using cursors, puts and gets from BDB.

But there is a problem with this with respect to the documents.db database: the callback reads the entire primary table row for you.   But like I said in one of the previous posts, we don't want to read the MBs of a document unless we have to.  So we cannot store the actual document data in the primary table's row.

There is a fairly easy solution to this problem.  We separate the primary document meta-metadata (like etag, last modification, key, ID, document size, metadata size) from the document data.  This leaves us adding another database section to the documents.db file (and we'll separate out the metadata json object while we are at it).  This leaves us with a documents.db file looking like:

documents.db

VERSION=3
format=bytevalue
database=data
type=btree
db_pagesize=512
HEADER=END
DATA=END

VERSION=3
format=bytevalue
database=indexByKey
type=btree
db_pagesize=512
HEADER=END
DATA=END

VERSION=3
format=bytevalue
database=indexByEtag
type=btree
db_pagesize=512
HEADER=END
DATA=END

documents_data.db

VERSION=3
format=bytevalue
database=documents
type=btree
db_pagesize=8192
HEADER=END
DATA=END

VERSION=3
format=bytevalue
database=metadatas
type=btree
db_pagesize=8192
HEADER=END
DATA=END

We have the main rows which is the meta-metadata, then the two indexes key->id and etag->id, then the actual documents table which will be id to document json, and finally the id to metadata json table.

This solves the problem since we will only associate indexByKey to primary and indexByEtag to primary, and the data rows will be very small.

It also helps fix another problem that will creep up once we get to performance testing/verification.  BDB locks all access based on page boundaries.  So it's important to optimize the page sizes so that we don't lock too many documents at once.  By separating out the document json and metatdata json from the primary key data, we can optimize the page size for the index sections while maintaining larger page sizes for the json object dumps.

I am hoping that this will also simplify some of the Stream handling since we don't have to worry about the header parts when streaming metadata json and document jsons.

And, for a future, future protection.  If we ever need to change the structure of the documents.db (by adding a "number of times accessed" or something like that) then we only have to modify the primary row.  If the document json data was present in the same data section, then we would have to do a bunch of moving data around in order to insert the new field into the header.

Ok, now I just have to do it...

Streaming the data out

Oren, the main ravendb guy, mentioned on the mailing list that we should be streaming the data to the database: "We have users that uses documents that are several MB in size, and we have to stream it to disk, instead of copying the data several times."

So the simple document adder function I mentioned a couple posts ago won't cut it since using MemoryStream.ToArray is a no-no for large documents.  BDB already supports using your provided byte buffer, and it also supports partial writes.  These are the two things required for us to support the Stream interface to the database.

Partial writes

Starting from square one with no documents in the database, we now need to be able to support that same simple PUT command, but never call MemoryStream.ToArray.  The main IDocumentStorageActions.AddDocument looks basically the same except that the streaming interface (I know you see a MemoryStream.ToArray in there which I just said was a no-no, but I'm hoping that I'll be allowed to get away with this since the metadata shouldn't ever be very big).

Guid newEtag = uuidGenerator.CreateSequentialUuid();

using(var ms = new MemoryStream()) { metadata.WriteTo(ms); metadataBuffer = ms.ToArray(); }
using(Stream stream = new BufferedStream(database.DocumentTable.AddDocument(transaction, key, newEtag, System.DateTime.Now, metadataBuffer)))
{
 using (var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
 {
  data.WriteTo(finalStream);
  stream.Flush();
 }
}

return newEtag;

The changes come from the DocumentTable's AddDocument function.  This function now needs to return a Stream object that can be used to partial write the document to the database.

public Stream AddDocument(Txn transaction, string key, Guid etag, DateTime dateTime, byte[] metadata)
{
 long docId, existingId;
 Guid existingEtag;

 //update or insert?
 if(!GetEtagByKey(transaction, key, out existingId, out existingEtag))
 {
  long lastId = 0;
  var vlastId = DbEntry.Out(new byte[8]);
  var vlastData = DbEntry.EmptyOut();
   using (var cursor = dataTable.OpenCursor(transaction, DbFileCursor.CreateFlags.None))
  {
   if (cursor.Get(ref vlastId, ref vlastData, DbFileCursor.GetMode.Last, DbFileCursor.ReadFlags.None) != ReadStatus.NotFound)
    lastId = BitConverter.ToInt64(vlastId.Buffer, 0);
  }
   docId = lastId + 1;
 }
 else
 {
  docId = existingId;
 }

 return new DocumentWriteStream(dataTable, transaction, docId, key,
   etag, dateTime, metadata);
}

Again, up to this point, the function is basically the same, the new stuff comes in the DocumentWriteStream object, which implements the abstract Stream interface.

private class DocumentWriteStream : Stream
{
 private readonly DbBTree data;
 private readonly Txn transaction;
 private readonly int headerSize;
 private DbEntry idBuffer;
 private long position;

 public DocumentWriteStream(DbBTree data, Txn transaction, long docId, string key, Guid etag, DateTime dateTime, byte[] metadata)
 {
  this.data = data;
  this.transaction = transaction;
  this.idBuffer = DbEntry.InOut(BitConverter.GetBytes(docId));
  this.position = 0;

  //put out the header we do know (we need to do this first since every put will call the secondary associate functions,
  // so we need the associte keys available in the header
  var header = new byte[documentBaseLength + key.Length * 2 + metadata.Length];
  Buffer.BlockCopy(etag.ToByteArray(), 0, header, 0, 16);
  Buffer.BlockCopy(BitConverter.GetBytes(dateTime.ToFileTime()), 0, header, 16, 8);
  Buffer.BlockCopy(BitConverter.GetBytes(key.Length * 2), 0, header, 24, 4);
  Buffer.BlockCopy(BitConverter.GetBytes(metadata.Length), 0, header, 28, 4);
  Buffer.BlockCopy(BitConverter.GetBytes(0), 0, header, 32, 4); //we don't know the document length yet
  Buffer.BlockCopy(Encoding.Unicode.GetBytes(key), 0, header, 36, key.Length * 2);
  Buffer.BlockCopy(metadata, 0, header, 36 + key.Length * 2, metadata.Length);
  this.headerSize = header.Length;

  var dvalue = DbEntry.InOut(header, 0, header.Length);
  data.Put(transaction, ref idBuffer, ref dvalue);
 }

We start up the document streamer by writing out the same header information we were writing before, the etag, modification time, sizes, key and metadata.  The difference is that we don't know what to put for the document size yet, so we just fill that in after the stream is flushed.

The important thing to note is that we have to set up the secondary key values in the header since when we issue a BDB put, the secondary index lookup function is going to be called in order to get the secondary index key.  We need this in place before we start streaming the data.

public override bool CanRead { get { return false; } }
public override bool CanSeek { get { return false; } } 
public override bool CanWrite { get { return true; } } 

public override long Length { get { throw new NotImplementedException(); } }
public override long Position { get { return position; } set { position = value; } }
public override int Read(byte[] buffer, int offset, int count) { throw new NotImplementedException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
public override void SetLength(long value) { throw new NotImplementedException(); }

Now for the boiler-place write-only non-seeking stream functions. And finally the actual methods we care about.

public override void Flush()
{
 var dvalue = DbEntry.InOut(BitConverter.GetBytes((int)position), 0, 4, 4, 32);
 data.Put(transaction, ref idBuffer, ref dvalue);
}

public override void Write(byte[] buffer, int offset, int count)
{
 var dvalue = DbEntry.InOut(buffer, offset, count, count, headerSize + (int)position);
 data.Put(transaction, ref idBuffer, ref dvalue);
 position += count;
}

The write command simply takes the buffer it's given and uses the current stream position to tell BDB where to add the data (BDB handles extending the size of the record as we do the partial writes).  And finally we on flush, we know the size of the document so we can re-issue the put command to put the document size in the correct header spot.

So, all-in-all doing the streaming writes is not that complicated.  It turns out that doing streaming reads is a little harder since we have to worry about opening, managing and disposing of cursors around the streaming read accesses, but we'll leave that for next time.