The Doms ( Digital Object Management System)The role and design of the DOMS system, our metadata storage system
DOMS is based on a Fedora 3 repository system.
Integrating other storage implementations with Fcrepo
Fcrepo has a storage interface called Akubra https://github.com/akubra/akubra
Using this storage interface, we can integrate arbitrary storage solutions with Fcrepo. The interface is split into three java classes, Blobstore, BlobstoreConnection, Blob. The basic design is that the Blobstore is created as a Singleton by the Fcrepo server system. To work with blobs, the blobstore is asked to open a connection, BlobstoreConnection. From this connection, Blobs can be read and written.
When requesting a Blob from the BlobstoreConnection, a Blob is returned, even if it does not exist. Like a File object, it has an exist(); method. You can then open input and outputstreams on the blob.
Tapes, the basic design
Invariant: No data will ever be overwritten. This is the fundamental invariant in the tape design. Every write creates a new instance of an object.
(If Fedora writes two changes to an object it could happen that only the latest write is "taped". More about the deferred writing later.)
The tapes are tar files. The can be said to exist in a long chain. Each tape is named according to the time it was created. Only the newest tape can be written. When the newest tape reaches a certain size, it is closed, and a new tape is started. This new tape is now the newest tape.
Only one thread can write to the tape system at a time.
A separate index is maintained. This index retains the mapping between object identifier and the newest instance of the object (ie. tape name and offset into tape).
Tape tar files, locking and the like
The tapes are tar files. To understand the following, see http://en.wikipedia.org/wiki/Tar_(computing)
The tapes are read with a library called JTar, see https://github.com/blekinge/jtar
Each object instance is named as "<objectId>#<currentTimeMillis>" in the tape. The naming is not really important. The name should contain the objectId, so reindexing of the tapes are possible. To help people reading the tapes with normal tar tools, the names should be unique (inside the tape, if not globally), as extracting the content becomes annoying otherwise. This system, however, would not mind. When reindexing the tape, the offset of the object in the tar file is used to determine if which object is newer.
The tapes themselves are named "tape<currentTimeMillis>.tar"
To understand how the system works, threads needs to be understood. Whenever a user/client performs a request to the fedora webservice, a new Thread is started to handle this request. This thread will then attempt to handle the request, and when finished send the result back to the user. The user is thus "blocked" while waiting for his thread to return. Many users can, however, work concurrently, so many threads can be executing concurrently in this system.
When a object is requested for reading, the thread goes to the XmlTapesBlobStore. XmlTapesBlobStore asks Cache, which holds recently changed objects in the cacheDir. If the object is not found there, Cache asks Taper, which holds objects about to be taped in the tapingDir. If the object is not found there, Taper asks the TapeArchive, which finds the object in the tar taper.
When the user operation deleted an object, the thread goes to Cache. Cache immediately goes to Taper, which immediately goes to TapeArchive to delete the object.
When the user operation changed or created a new object, the thread also goes to Cache. Here the new version of the object is written in cacheDir. The thread then returns. As can be seen, changes to the objects are written asynchronously to the tapes.
As will be explained in detail later, Taper holds a timer thread which constantly archives changed objects from the cacheDir.
Each of the three storage directories (cacheDir, tapingDir, tapeFolder) is protected by a lock. In order to change the content of one of these dirs, the lock must be acquired. Only one thread can hold the lock at any one time, but the thread can hold the lock multiple times. The lock is not released until the thread have released all the instances it holds. Acquiring the lock is a blocking operation.
Locks are only used for writing operations, never for reading. Due to the way filesystems work, a file can be read, even if it is deleted or changed after opening.
TapeArchive is the component that handles the tar files and the index.
When an outputstream is opened to a blob, the global write lock is acquired by this thread. As Fedora does not tell the blob how much data it is going to write, the outputstream will buffer the written data until the stream is closed. When the stream is closed, the buffer will be written to the newest tape as a new tar entry. The object instance will be registered in the index. Lastly, the write lock will be released.
Each outputstream will have a 1MB buffer per default. If the system attempts to write content exceeding the remaining bytes in the buffer, the buffer is marked as finished and a new buffer is allocated of size max(1MB,sizeNeeded). So, if you write 1 byte, 1 byte will be used of the 1MB buffer. If you then write 1MB in one operation, the default buffer will be finished, and a new buffer of 1MB will be allocated. This new buffer will then be filled with the 1MB written. So almost 1MB will be wasted here.
It is principially not nessesary to acquire the write lock until the stream is closed, but it is acquired when the stream is opened. If the write lock is acquired on closing, it needs to be able to determine what tape is the newest tape at that time. By acquiring it on the "open" time, it can be fed this information. Since Fcrepo seems to burst-write to the disk, deadlocks or even slowdowns, have not been seen.
Reading is done by querying the index for the tape name and offset. With this information, an inputstream can be opened to the exact entry in the relevant tape. No locking is necessary for reading. After skipping to the correct offset in the tape, we read until we get to tar record header, which ought to be after 0 bytes. We then read the tar record header to get the size of the record, and then returning an inputstream starting from this position and stopping when the entire record have been read (to prevent the user from reading into the next record). At no time in this do we examine the name of the tar record we are reading.
A tape is marked as indexed (in the index below) when it is closed and a new tape started. As will be explained, tapes that are marked for indexed will not be re-read upon server start-up.
Taper is the component that handles the deferred writing. It is a singleton object, which, upon creating, starts a timer task. Every few milliseconds, it examines all changes objects, and tapes all objects whose changes are more than tapeDelay seconds old.
The procedure is as follows
- Acquire a write-lock on the tapingDir
- Tape all that is in the tapingDir
- Acquire a write-lock on the cacheDir
- Iterate through all files in the cacheDir
- If the file is older than tapeDelay
- move to tapingDir
- If the file is older than tapeDelay
- release write-lock on cacheDir
- Tape all that is in the tapingDir
- release write-lock on tapingDir
- Acquire a write-lock on the tapingDir
- iterate through all the files in tapingDir
- forward the create/remove operation to the TapeArchive
- release write-lock on tapingDir
If the Fedora system requests to read an object not in the Cache, the request will be forwarded to the Taper. If the object is in the tapingDir, it will be served from that location, otherwise the request will be forwarded to the TapeArchive.
Removal operations are handled separately from this procedure, as we want the removal to happen instantly. When the Cache receives a removal request, it immediately forwards this request to the Taper. This operation then blocks until a write-lock can be acquired on the tapingDir and the cacheDir. This, of course, marks it as impossible to run concurrently with the taper timer task. As the timer tasks run basically constantly, it will finish a run, release the locks. The delete thread will hopefully then acquire the locks. The timer thread will thus be unable to start until the delete thread have completed.
The cache is the place where changed objects are written. Objects are served from the cache first, so that changed objects always get served in the changed version. The cache does not cache objects retrieved from the tapes, only changed objects submitted by the user.
To write a new file to the cacheDir, the write-lock for the cacheDir must of course be acquired. This means that writing operations are blocked until the timer thread described above have finished iterating through the cacheDir. Similarly, slow uploads will block the timer thread until the upload is complete.
For the index, a separate system called Redis http://redis.io/ is used.
A client, Jedis https://github.com/xetorthio/jedis is used as the interface to the system.
The metadata repository now requires the existence of a redis instance to function.
The index implementation have to provide the following methods
- tape,offset getLocation(objectID)
- void setLocation(objectID, tape, offset)
- iterator<objectID> list(idPrefix)
- boolean isTapeIndexed(tape)
- void setTapeIndexed(tape)
The Redis instance holds a number of keys and sets.
Firstly, we have the String keys. These are used to map a object id to a string of the form tape#offset. There will be one such key for each objectID. Lookup and writing these keys should be nearly O(1).
Secondly, we have the sorted set called "buckets". It holds references (names, really) to a number of other sorted set. Each of these other sorted sets holds a number of objectIds. Each objectId that is added to the index is hashed. The first 4 characters of the hash value is then used to determine which of these buckets to add the ID to. Each bucket is named solely from the 4 characters corresponding to the hash values of the IDs it hold. The purpose of this complex structure is to be able to iterate through the objectIDs in doms, while allowing paging.
The last datastructure is the set "tapes". This contains all the tapes that have been indexed so far. Lookup on key and adding to the set is fast operations.
Upon startup, the server goes through the following process.
- It lists and sorts by name all the tapes in the given tape folder. The tapes should therefore be named in a fashion that allows the system to sort by name to get a last-modified sorting.
- For each tape it checks if the index marks this tape as indexed.
- If the tape is not indexed, it is read through for indexing.
- Any further tapes in the list are also indexed the same way.
Indexing a tape
The tape is read through from the beginning. This process is fast, as we can skip over the actual record contents. The relevant information here are the objectIds, the tape name and the offset of the record. For each tar record, the index is updated in the same way as it would have been when the record was written.This update will overwrite any entry that already existed in the index. As the tapes are read in order of creation-time (this is encoded in the tape name) and the records in the tape are written in order, when all entries concerning a given objectId have been indexed, the index can be sure to have the information about the newest instance of the record.
When a tape have been indexed, it is marked as such in the index.
The newest tape will be indexed, but will not be marked as indexed.
Sometimes the tapes can become broken (not observed yet). This is most likely something to happen when the server terminates abnormally, and leave a tape with half a record. Upon startup, if the property "fixErrors" is set, the newest tape and only the newest tape is read through to check for this. Any tape but the newest tape should not be able to contain broken records, barring a general failure of the underlying filesystem, so they are not checked.
If an IOException occurs while reading the newest tape as detailed above (ie. on startup, if the property "fixErrors" is set), the system regards the tape as broken and attempts to fix it. The fix is rather brutal. Every record, from the beginning of the tape, is read and written to a new temp tape, until the broken tape either runs out of bytes or the IOException reoccurs. This is done in a way to ensure that the temp tape will only have complete records. When no more can be read from the broken tape, it is deleted and replaced with the temp tape.
If the archive is set to rebuild, it will flush (as in flush the toilet) the content of the redis database upon startup. This will mean that no tape will be listed as indexed, so the entire tape archive will be reindexed. The redis client have some harsh requirements about timeouts on commands, and at times the flush will not return in time. In that case the server will fail to start. Simply restarting the server afterwards is the cure, as the redis instance should be finished flushing by then.
The tapes do support purging objects. When a blob is deleted, it is written as a new instance of 0 bytes to the tape. The name of the record is now "objectId#timestampInMillis#DELETED". When an object is deleted, it is removed from the index. Upon indexing the tape (see above), records of 0 bytes with names ending in "#DELETED" cause the objectId to be removed from the index.
Packaging the old objects for Tapes
A traditional fcrepo backend can easily be converted to use tapes. To do so, the fedora server must be stopped (to ensure that the objects are not changed).
The script "tarExisting.sh" is included with the distribution. It takes two arguments, the folder to tape and the name of the tape. As mentioned above, the tapes should be named "tape#<currentTimeMillis>.tar". For each folder containing objects for fcrepo, run the command "./tarExisiting.sh <folder> "tape$(date '+%s%N').tar", and put the resulting tape in the configured tape folder for Fcrepo.
The system is configured from the file "akubra-llstore.xml" which is a spring config file. It is reproduced below with comments.
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans> <!--Standard--> <bean name="org.fcrepo.server.storage.lowlevel.ILowlevelStorage" class="org.fcrepo.server.storage.lowlevel.akubra.AkubraLowlevelStorageModule"> <constructor-arg index="0"> <map/> </constructor-arg> <constructor-arg index="1" ref="org.fcrepo.server.Server"/> <constructor-arg index="2" type="java.lang.String" value="org.fcrepo.server.storage.lowlevel.ILowlevelStorage"/> <property name="impl" ref="org.fcrepo.server.storage.lowlevel.akubra.AkubraLowlevelStorage"/> </bean> <bean name="org.fcrepo.server.storage.lowlevel.akubra.AkubraLowlevelStorage" class="org.fcrepo.server.storage.lowlevel.akubra.AkubraLowlevelStorage" singleton="true"> <constructor-arg ref="tapeObjectStore"> <description>The store of serialized Fedora objects</description> <!--Here we reference our tape system--> </constructor-arg> <constructor-arg ref="datastreamStore"> <description>The store of datastream content</description> </constructor-arg> <constructor-arg value="false"><!--This is set to false, as we do not ever delete stuff--> <description>if true, replaceObject calls will be done in a way that ensures the old content is not deleted until the new content is safely written. If the objectStore already does this, this should be given as false </description> </constructor-arg> <constructor-arg value="true"> <description>save as above, but for datastreamStore</description> </constructor-arg> </bean> <!--This is the tape store Akubra Implementation--> <bean name="tapeObjectStore" class="dk.statsbiblioteket.metadatarepository.xmltapes.XmlTapesBlobStore" singleton="true"> <constructor-arg value="urn:example.org:tapeObjectStore"/> <!--This parameter is the name of the storage. --> <property name="archive" ref="cacheTapeObjectStore"/> <!--And this is the reference to the actual implementation--> </bean> <!--The cache tape object store holds the objects while it is being written and until the taper is ready to tape the object--> <bean name="cacheTapeObjectStore" class="dk.statsbiblioteket.metadatarepository.xmltapes.deferred2.Cache" singleton="true"> <!--Where to store files until the taper is ready to tape them--> <constructor-arg value="/CHANGEME/cacheObjectStore" index="0"/> <!--Where to store files while they are still being written--> <constructor-arg value="/CHANGEME/cacheTempObjectStore" index="1"/> <!--The two adresses above must be on the same file system as we rely on atomic moves--> <!--The delegate for read operations--> <property name="delegate" ref="tapingTapeObjectStore"/> </bean> <!--The cache tape object store holds the objects while it is being written and until the taper is ready to tape the object--> <bean name="tapingTapeObjectStore" class="dk.statsbiblioteket.metadatarepository.xmltapes.deferred2.Taping" singleton="true"> <!--Where to store files until the taper is ready to tape them--> <constructor-arg value="/CHANGEME/tapingObjectStore" index="0"/> <!--The allowed age of an file before it is taped, in ms--> <property name="tapeDelay" value="600000"/> <!--The delay between invocations of the taper--> <property name="delay" value="100"/> <property name="delegate" ref="tarTapeObjectStore"/> <!--The cache, to get the objects ready for writing--> <property name="parent" ref="cacheTapeObjectStore"/> </bean> <!--The guts of the tape system--> <bean name="tarTapeObjectStore" class="dk.statsbiblioteket.metadatarepository.xmltapes.TapeArchive" init-method="rebuild" singleton="true"> <!--Change the init-method to init, if you do not want to rebuild the redis index on server startup --> <!--This constructor argument specifies the tape store location. --> <constructor-arg value="file:/CHANGEME/tapeObjectStore" type="java.net.URI"/> <!--This specifies the maximum length a tape can be before a new tape is started--> <constructor-arg value="10485760" type="long"/> <!--10 MB--> <!--This is the reference to the index--> <property name="index" ref="redisIndex"/> <property name="fixErrors" value="false"/> </bean> <!--This is our Redis index--> <bean name="redisIndex" class="dk.statsbiblioteket.metadatarepository.xmltapes.redis.RedisIndex" singleton="true"> <!--The redis server--> <constructor-arg value="localhost"/> <!--The port it is running on--> <constructor-arg value="6379"/> <!--The database name. Redis databases are always identified by integers--> <constructor-arg value="0"/> </bean> <!--Standard storage for managed datastreams. We do not use managed datastreams--> <bean name="datastreamStore" class="org.akubraproject.map.IdMappingBlobStore" singleton="true"> <constructor-arg value="urn:fedora:datastreamStore"/> <constructor-arg> <ref bean="fsDatastreamStore"/> </constructor-arg> <constructor-arg> <ref bean="fsDatastreamStoreMapper"/> </constructor-arg> </bean> <!--Standard storage for managed datastreams. We do not use managed datastreams--> <bean name="fsDatastreamStore" class="org.akubraproject.fs.FSBlobStore" singleton="true"> <constructor-arg value="urn:example.org:fsDatastreamStore"/> <constructor-arg value="/CHANGEME/datastreamStore"/> </bean> <!--Standard storage for managed datastreams. We do not use managed datastreams--> <bean name="fsDatastreamStoreMapper" class="org.fcrepo.server.storage.lowlevel.akubra.HashPathIdMapper" singleton="true"> <constructor-arg value="##"/> </bean> <bean name="fedoraStorageHintProvider" class="org.fcrepo.server.storage.NullStorageHintsProvider" singleton="true"> </bean> </beans>
Bitpreservation for the DOMS system is an issue we need to handle. Principally speaking, DOMS does not handle bit preservation. For this requirement, we have the excellent system called the Bit repository.
The fundamental problem is, that content in DOMS, even with the tape system detailed above, still only exists as one copy, and is thus vulnerable to a number of system failures. So, our strategy is to ensure that the content exist in more than one copy.
We have two ways to ensure this.
Fedora journaling is described here https://wiki.duraspace.org/display/FEDORA36/Journaling. It is a standard, if not widely used, component of the Fedora repository system, and one we could be use without much developing work.
The problem with using a journaling fedora is ensuring that the two servers are identical. While the journaling system SHOULD ensure identicallity, actually testing it would be difficult. Due to this problem, we have decided not to use journaling as a solution to bit preservation. We have not ruled out using journaling as a means for load balancing for read operations, thus that is a discussion for another day.
The tapes, detailed above, have a very nice quality. When a tape is closed for writing, it will never be updated again. This means that we can archive this tape in the bitrepository. As soon as the tape is ingested into the bit repository, we know that the content is stored in multiple copies.
The tapes have been designed so that, if any content have been written to a tape, it will be closed after a set time. This way, we can ensure that any content added to DOMS will be taped and ingested in the bit repository within a time frame.
To build this solution, we need to develop a simple component, detailed HERE.