Thursday, July 15, 2010

Cassandra file storage

Cassandra doesn't support big blobs, there is no ready equivalent like gridfs (MongoDb). It's very important to keep in mind that Cassandra is written in Java.
http://wiki.apache.org/cassandra/FAQ#large_file_and_blob_storage

We have to slit big files into multiple chunks.


(file_name =>(size) )
(chunk_id =>( data) )
(file_name => (chunk_id, chunk_id ... )

Store file example in python (lazyboy).

# -*- coding: utf-8 -*-
# <Keyspaces>
#     <Keyspace Name=\"BigStorage\">
#        <ColumnFamily CompareWith=\"BytesType\" Name=\"Files\"/>
#        <ColumnFamily CompareWith=\"BytesType\" Name=\"Chunks\"/>
#        <ColumnFamily CompareWith=\"TimeUUIDType\" Name=\"FilesChunks\"/>
#     </Keyspace>
# </Keyspaces>
#

import sys
import uuid

from lazyboy import *
from lazyboy.key import Key

# Define your cluster(s)
connection.add_pool(\'BigStorage\', [\'10.10.2.29:9160\'])

CHUNK_SIZE = 1024*512


class FileKey(Key):
    def __init__(self, key=None):
        Key.__init__(self, \"BigStorage\", \"Files\", key)

class File(record.Record):
    _required = (\'size\',)

    def __init__(self, *args, **kwargs):
        record.Record.__init__(self, *args, **kwargs)
        self.key = FileKey()

class ChunkKey(Key):
    def __init__(self, key=None):
        Key.__init__(self, \"BigStorage\", \"Chunks\", key)

class Chunk(record.Record):
    _required = (\'data\',)

    def __init__(self, *args, **kwargs):
        record.Record.__init__(self, *args, **kwargs)
        self.key = ChunkKey()

class FileChunkKey(Key):
    def __init__(self, key=None):
        Key.__init__(self, \"BigStorage\", \"FilesChunks\", key)

class FileChunk(record.Record):
    # Anything in here _must_ be set before the object is saved
    #_required = (\'data\',)

    def __init__(self, *args, **kwargs):
        \"\"\"Initialize the record, along with a new key.\"\"\"
        record.Record.__init__(self, *args, **kwargs)
        self.key = FileChunkKey()


def store_file(file_name, file_object):
    chunk_keys = []
    file_size = 0

    new_file = File()
    new_file.key = FileKey(file_name)
    new_file.update({\'size\':0,\'stored\':0})
    new_file.save()
    
    while True:
        data = file_object.read(CHUNK_SIZE)
        if not data:
            break
            
        file_size += len(data)
        chunk = Chunk({\'data\': data } )
        key =  str(uuid.uuid1())
        chunk.key = ChunkKey( key )
        chunk_keys.append(key)
        chunk.save()
        print key
    
    for chunk_key in chunk_keys:
        file_chunk = FileChunk()
        file_chunk.update( {uuid.uuid1().bytes: chunk_key} )
        file_chunk.key = FileChunkKey(file_name)
        file_chunk.save()
    
    new_file.update({\'size\':file_size,\'stored\':1})
    new_file.save()

package eu.iddqd.casstorage;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.softao.jassandra.ByteArray;
import org.softao.jassandra.ConsistencyLevel;
import org.softao.jassandra.DriverManager;
import org.softao.jassandra.IColumn;
import org.softao.jassandra.IColumnFamily;
import org.softao.jassandra.IConnection;
import org.softao.jassandra.ICriteria;
import org.softao.jassandra.IKeySpace;
import org.softao.jassandra.JassandraException;

public class CasStorage {

 /**
  * @param args
  */
 public static void main(String[] args) {
  // TODO Auto-generated method stub
  Properties info = new Properties();
  info.put(DriverManager.CONSISTENCY_LEVEL,
    ConsistencyLevel.ONE.toString());

  try {
   IConnection connection = DriverManager.getConnection(
     \\&quot;thrift://127.0.0.1:9160\\&quot;, info);
   IKeySpace keySpace = connection.getKeySpace(\\&quot;BigStorage\\&quot;);
   IColumnFamily cfFilesChunks = keySpace.getColumnFamily(\\&quot;FilesChunks\\&quot;);
   IColumnFamily cfChunks = keySpace.getColumnFamily(\\&quot;Chunks\\&quot;);
   

   ICriteria criteria = cfFilesChunks.createCriteria();
   ICriteria chunksCriteria = cfChunks.createCriteria();
   
   String fileName = args[1];  
   criteria.keyList(fileName).columnRange(ByteArray.EMPTY, ByteArray.EMPTY, Integer.MAX_VALUE);
   
   Map&lt;String, List&lt;IColumn&gt;&gt; map = criteria.select();
   List&lt;IColumn&gt; list = map.get(fileName);
   FileOutputStream out = new FileOutputStream(args[2]);      
   
   for (int i=0; i&lt;list.size(); i++){
    String chunkKey = new String(list.get(i).getValue().toByteArray());
    chunksCriteria.keyList(chunkKey).
     columnRange(ByteArray.EMPTY, ByteArray.EMPTY, Integer.MAX_VALUE);
    Map&lt;String, List&lt;IColumn&gt;&gt; chunkMap = chunksCriteria.select();
    out.write(chunkMap.get(chunkKey).get(0).getValue().toByteArray());    
   }
   
   out.close();
   
  } catch (JassandraException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (IOException ioe) {
   ioe.printStackTrace();
  }
  

 }
}