* 생각.

6월부터 계속 뉴스가 나오거나 발표자료가 나왔던거 오픈소스준비를 하겠다고 말을 했었다. 11월 7일 오픈소스로 공유되었다. 


역시 사람의 머리로 사용하기에는 2차원 테이블의 SQL이 가장 낫고, 이에 맞게 툴이 개발되는게 맞는 거 같다.

그동안 dw 상용버전 혹은 opensource(hadoop, impala, Hawk같은 분석툴)을 고쳐서 내부적으로 사용(Vendor Customization) 했었는데, 점점 의존적이 될지는 의문이긴 하지만, 속도가 이게 훨씬 잘 나온다면, 안쓸리가 없지.. Hive도 너무 느려서 사실 개인적으로 쓰고 싶지 않은 느낌이었는데, 역시. 8~10배 빠른 fresto를 쓰고 싶어졌다...


---



* Fresto 

- Apache 2 Llicense

- Presto has grown to have 850 internal users per day performing 27,000 queries and fiddling with 320TB of data. existing data warehouse is 250PB in size, and growing rapidly: 600TB is added to the warehouse every day. 

- 3 regional cluster, successfully scaled to 1000 nodes

- stored in the Hadoop Distributed File System, so although some may question why Facebook doesn't just use a SQL DB engine for its queries, the reason is that it needs to have as few layers of abstraction between it and the underlying HDFS data. For that reason, creating add-ons that inteface directly with HDFS, such as Presto, is better for performance than abstracting away.

- 4-7x more cpu-efficient than Hive, 8-10x faster than Hive

- one-time/continuous import from external system



http://www.techsuda.com/archives/2224

https://github.com/facebook/presto

https://www.facebook.com/notes/facebook-engineering/presto-interacting-with-petabytes-of-data-at-facebook/10151786197628920



동영상

https://www.facebook.com/photo.php?v=10202463462128185

Posted by 김용환 '김용환'



cqlsh에서 간단하게 table을 하나 생성했다. 

cqlsh:userkeyspace> CREATE TABLE playlists (

               id      uuid,

               type    text,

              name    text,

              size    int,

              artist text,

              PRIMARY KEY (id, type, name, size)

             );




script를 통해서 describe table playlist을 얻어올 수 있다. 이를 통해서 cassandra node가 현재 active한지 파악을 간단하게 할 수 있다. 



echo 'use userkeyspace; describe table playlists; select count(*) from playlists;' | ./cqlsh -f /dev/stdin


CREATE TABLE playlists (

  id uuid,

  type text,

  name text,

  size int,

  artist text,

  PRIMARY KEY (id, type, name, size)

) WITH

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};



 count

-------

     0


(1 rows)


Posted by 김용환 '김용환'

cassandra 2.0.2를 사용하고 있다. (1.2부터 지원된 듯 하다.. 정확한 것은 잘 모름..)


cassandra in query가 지원될까 테스트해봤는데, 지원이 된다. primary key로 지정한 column 에 대해서 in query를 사용하면 에러 난다. 



cqlsh:userkeyspace> describe table cf21;


CREATE TABLE cf21 (

  key text,

  number text,

  name text,

  PRIMARY KEY (key, number)

) WITH COMPACT STORAGE AND

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};


아래와 같이 primary key단위로 사용하면 동작된다. 

cqlsh:userkeyspace> select * from cf21 where key in ('row1') and number in ('1') ;


 key  | number | name

------+--------+------

 row1 |      1Kim


(1 rows)


cqlsh:userkeyspace> select * from cf21 where key in ('row1')  ;


 key  | number | name

------+--------+------

 row1 |      1Kim

 row1 |      2Yun

 row1 |      3 | Park




그러나 pk 중 두번째 에 대해서  in query를 사용하면 에러가 발생한다.  


cqlsh:userkeyspace> select * from cf21 where number in ('1');

Bad Request: Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING



ALLOW FILTERING; 을 query 끝에 붙여주면 된다. 성능 저하가 되는 이유는 cluster 모두에 요청을 하기 때문에 성능에 제약을 걸린다는 얘기라 할 수 있다. 데이터 모델링을 잘 해야 한다. 



cqlsh:userkeyspace> select * from cf21 where number in ('1', '2') ALLOW FILTERING;


 key  | number | name

------+--------+------

 row1 |      1Kim

 row1 |      2Yun

 row2 |      1 | Choi


(3 rows)





참고로 index만 걸려 있게 하고, in query를 사용하면 에러 난다.


cqlsh:userkeyspace> describe table cf33;


CREATE TABLE cf33 (

  key text,

  name text,

  number text,

  PRIMARY KEY (key)

) WITH

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};


CREATE INDEX name ON cf33 (name);



cqlsh:userkeyspace> select * from cf33 where name in ('John') ALLOW FILTERING;

Bad Request: No indexed columns present in by-columns clause with Equal operator

cqlsh:userkeyspace> select * from cf33 where key in ('John') ALLOW FILTERING;


(0 rows)




Posted by 김용환 '김용환'


cassandra진영인 datasax에서 재미난 시도들을 cassandra 2.0에 넣었다. 

https://issues.apache.org/jira/browse/CASSANDRA-5932


아래 내용은 반드시 읽어볼 만하다. 

http://www.datastax.com/dev/blog/rapid-read-protection-in-cassandra-2-0-2



보통은 아래와 같이 데이터를 읽어오는 가정속이다. 

 





그러나, replica가 죽으면, timeout을 낸다. 
 

하지만, replica가 죽으면 알아서 최적의 다른 노드를 읽어서 전달해주면 되면 된다.  이 것을 rapid read protection이라 불린다. 

아래 정보만 넣으면 된다고 한다. 
ALTER TABLE users WITH speculative_retry = '10ms';
ALTER TABLE users WITH speculative_retry = '99percentile';



2.0.2부터는 디폴트로 이 룰이 적용된다고 한다.



일반적으로 cassandra의 하나의 노드가 죽으면 출렁거리고, timeout이 빈번하게 일어난다. (사실 일반 nosql, db든지 대체적으로 이런 현상이 일어난다.) 그러나 rapid read protection을 적용하면 출렁임을 최소화 할 수 있다. 





그러나. 이슈는 있다. ConsistencyLevel.ALL는 완벽히 보장할 수 없다. 

부하가 많은 상황에서는 더 출렁거릴 수 있을 수 있다. 여유있게 운영할 수 있도록 서버를 늘릴 필요가 있다. 역시 운영의 묘가 필요한 법이다. 





Posted by 김용환 '김용환'

** reference

http://www.datastax.com/docs/1.0/references/nodetool



cassandra 2.0의  version upgrade(update)는 기존과 동일하다. 

 (2.0.1 -> 2.0.2)


# ./nodetool -h localhost  version

ReleaseVersion: 2.0.1


thrift port  막음

# ./nodetool -h localhost disablethrift


gossip protocol 막음

# ./nodetool -h localhost disablegossip


# ./nodetool -h localhost status

DN 으로 표시


memtable에 있는 data들을 모두 sstable로 flush. write는 안되지만 read는 계속 되는 상황

# ./nodetool -h localhost drain

# ./nodetool -h localhost statusthrift

not running

# pkill -f 'java.*cassandra'

# ps -ef | grep java

(cassandra process는 존재하지 않음)


# 최근 버전으로 실행

# ./cassandra &


# ./nodetool -h localhost  version

ReleaseVersion: 2.0.2


'nosql' 카테고리의 다른 글

cassandra 'in' query 지원  (0) 2013.11.07
cassandra - rapid read protection  (0) 2013.11.05
cassandra version upgrade(update)  (0) 2013.11.05
cassandra java api 테스트 - API 비교  (0) 2013.11.04
cassandra java client api 선택 & 비교  (0) 2013.11.04
cassandra bench mark 자료 모음  (0) 2013.10.31
Posted by 김용환 '김용환'

서버는 cassandra 2.0.1로 테스트했다. 



1. data sax cql, jdbc driver (code google)


pom.xml

<dependency>

<groupId>org.apache.cassandra</groupId>

<artifactId>cassandra-thrift</artifactId>

<version>2.0.1</version>

</dependency>

                <dependency>

<groupId>com.datastax.cassandra</groupId>

<artifactId>cassandra-driver-core</artifactId>

<version>2.0.0-beta2</version> 

</dependency>




code

package com.google.cassandra.client;


import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;


import org.apache.cassandra.cql.jdbc.CassandraStatementExtras;

import org.apache.cassandra.thrift.ConsistencyLevel;


public class CQLTest {

/*

CREATE TABLE cf10 (

  key text,

  number text,

  name text,

  PRIMARY KEY (key, number)

) WITH COMPACT STORAGE AND

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};

  

  cqlsh:userkeyspace> select * from cf10;


 key  | number | name

------+--------+------

 row1 |      1 |  Kim

 row1 |      2 |   Yu

 row2 |      1 | Park

 row2 |      2 | Choi


(4 rows)



*/

private static java.sql.Connection con = null;

public static void main(String[] args) throws Exception {

Class.forName("org.apache.cassandra.cql.jdbc.CassandraDriver");

con = DriverManager.getConnection("jdbc:cassandra://localhost:9160/userkeyspace?consistency=ONE");

CQLTest sample = new CQLTest();

try {

sample.checkConsistency();

sample.create();

sample.insert();

sample.list();

sample.update();

sample.listUpdated();

sample.delete();

} catch (SQLException e) {

throw e;

}

}

public static final String KEY_SPACE_NAME = "userkeyspace";

public static final String COLUMN_FAMILY_NAME = "cf10";


public void checkConsistency() throws Exception {

Statement stmt = con.createStatement();

    ConsistencyLevel cl = statementExtras(stmt).getConsistencyLevel();

    System.out.println("consistency level : " + cl );

}

    private CassandraStatementExtras statementExtras(Statement statement) throws Exception {

        Class<?> cse = Class.forName("org.apache.cassandra.cql.jdbc.CassandraStatementExtras");

        return (CassandraStatementExtras) statement.unwrap(cse);

    }

 

public void create() throws SQLException {

drop();

String data = "create table IF NOT EXISTS " + COLUMN_FAMILY_NAME

+ " (key text, number text, name text, PRIMARY KEY (key, number)) WITH COMPACT STORAGE;";

Statement st = con.createStatement();

st.execute(data);

}


public void drop() throws SQLException {

String data = "drop table  IF EXISTS " + COLUMN_FAMILY_NAME + ";";

Statement st = con.createStatement();

st.execute(data);

}

public void insert() throws SQLException {

String cql = "BEGIN BATCH \n"

+ "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values ('row1','1','Kim') \n"

+ "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values ('row1','2','Yu') \n"

+ "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values ('row2','1','Park') \n"

+ "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values ('row2','2','Choi') \n"

+ "APPLY BATCH;";


//String cql = "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values ('row1','1','Kim') ";

Statement st = con.createStatement();

st.executeUpdate(cql);

st.close();


/*

prepared statement code is not working in 1.2.5 cassandra -jdbc.. Caused by: InvalidRequestException(why:Invalid amount of bind variables)

https://code.google.com/a/apache-extras.org/p/cassandra-jdbc/issues/detail?id=93&thanks=93&ts=1383294323

String cql2 = "insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values (?, ?, ?)";

PreparedStatement ps = con.prepareStatement(cql2);

ps.setString(1, "row1");

ps.setString(2, "1");

ps.setString(3, "Kim");

ps.executeUpdate(cql2);

ps.close();

*/

}


public void delete() throws SQLException {

String data = "truncate  " + COLUMN_FAMILY_NAME ;

Statement st = con.createStatement();

st.executeUpdate(data);

}


public void update() throws SQLException {

String t = "update " + COLUMN_FAMILY_NAME + " set name='AAAA' where key='row1' and number = '1'";

Statement st = con.createStatement();

st.executeUpdate(t);

}

public void listUpdated() throws SQLException {

String t = "select * from " + COLUMN_FAMILY_NAME + "  where key = 'row1' and number = '1'";

Statement st = con.createStatement();

ResultSet rs = st.executeQuery(t);

System.out.println("---------------");

while (rs.next()) {

System.out.println(" " + rs.getString("key"));

for (int j = 1; j < rs.getMetaData().getColumnCount() + 1; j++) {

System.out.println("   " + rs.getMetaData().getColumnName(j) + " : "

+ rs.getString(rs.getMetaData().getColumnName(j)));

}

}

}


public void list() throws SQLException {

String t = "select * from " + COLUMN_FAMILY_NAME;

Statement st = con.createStatement();

ResultSet rs = st.executeQuery(t);

System.out.println("---------------");

while (rs.next()) {

System.out.println(" " + rs.getString("key"));

for (int j = 1; j < rs.getMetaData().getColumnCount() + 1; j++) {

System.out.println("   " + rs.getMetaData().getColumnName(j) + " : "

+ rs.getString(rs.getMetaData().getColumnName(j)));

}

}

}

}


prepared statement 테스트했는데, 에러가 난다....ㅠ



2. Thrift 


pom.xml


<dependency>

<groupId>org.apache.cassandra</groupId>

<artifactId>cassandra-thrift</artifactId>

<version>2.0.1</version>

</dependency>



code

package com.google.cassandra.client;



import java.nio.ByteBuffer;

import java.util.List;


import org.apache.cassandra.thrift.Cassandra;

import org.apache.cassandra.thrift.Column;

import org.apache.cassandra.thrift.ColumnOrSuperColumn;

import org.apache.cassandra.thrift.ColumnParent;

import org.apache.cassandra.thrift.Compression;

import org.apache.cassandra.thrift.ConsistencyLevel;

import org.apache.cassandra.thrift.KeyRange;

import org.apache.cassandra.thrift.KeySlice;

import org.apache.cassandra.thrift.SlicePredicate;

import org.apache.cassandra.thrift.SliceRange;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.protocol.TProtocol;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransport;


public class ThriftTest {


/*


cqlsh:userkeyspace> describe columnfamily cf11;


CREATE TABLE cf11 (

  key text,

  name text,

  number text,

  PRIMARY KEY (key)

) WITH COMPACT STORAGE AND

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};

  

cqlsh:userkeyspace> select * from cf11;


 key  | name | number

------+------+--------

 row1 |    1 |      2


(1 rows)


*/

    public static void main(String[] args) throws Exception {

    ThriftTest sample = new ThriftTest();

    sample.getConfig();

    sample.create();

    sample.insert();

    sample.list();

    }

    

    public static final String KEYSPACE_NAME = "userkeyspace";

    public static final String ROW_KEY_NAME = "row1";

    public static final String COLUMN_FAMILY_NAME = "cf11";

    

Cassandra.Client client;

TTransport tr;

public void getConfig() {

  tr = new TFramedTransport(new TSocket("localhost", 9160));

           TProtocol proto = new TBinaryProtocol(tr);

           client = new Cassandra.Client(proto);

}

public void create() throws Exception {

tr.open();

String cql = "use " + KEYSPACE_NAME + ";";

client.execute_cql_query(ByteBuffer.wrap(cql.getBytes()), Compression.NONE);

cql = "drop table " + COLUMN_FAMILY_NAME + ";";

client.execute_cql_query(ByteBuffer.wrap(cql.getBytes()), Compression.NONE);

    cql = "create columnfamily " + COLUMN_FAMILY_NAME + "  (key text primary key, name text, number text);";

    client.execute_cql_query(ByteBuffer.wrap(cql.getBytes()), Compression.NONE);

tr.close();

}

public void insert() throws Exception {

        tr.open();

        client.set_keyspace(KEYSPACE_NAME);


        // insert data

        long timestamp = System.currentTimeMillis();

        Column nameColumn = new Column(ByteBuffer.wrap("name".getBytes()));

        nameColumn.setValue(Long.toHexString(1).getBytes());

        nameColumn.setTimestamp(timestamp);


        Column numberColumn = new Column(ByteBuffer.wrap("number".getBytes()));

        numberColumn.setValue(Long.toHexString(2).getBytes());

        numberColumn.setTimestamp(timestamp);


        ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY_NAME);

        client.insert(ByteBuffer.wrap(ROW_KEY_NAME.getBytes()), columnParent,nameColumn,ConsistencyLevel.ALL) ;

        client.insert(ByteBuffer.wrap(ROW_KEY_NAME.getBytes()), columnParent,numberColumn,ConsistencyLevel.ALL);

        tr.close();

}

public void list() throws Exception {

tr.open();

client.set_keyspace(KEYSPACE_NAME);

ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY_NAME);

//  make predicate

        SlicePredicate predicate = new SlicePredicate();

        predicate.setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, 100));

        List<ColumnOrSuperColumn> columnsByKey = client.get_slice(ByteBuffer.wrap(ROW_KEY_NAME.getBytes()), columnParent, predicate, ConsistencyLevel.ALL);

        System.out.println(columnsByKey);

        

        // list 

        KeyRange keyRange = new KeyRange(100);

        keyRange.setStart_key(new byte[0]);

        keyRange.setEnd_key(new byte[0]);

        List<KeySlice> keySlices = client.get_range_slices(columnParent, predicate, keyRange, ConsistencyLevel.ONE);

        System.out.println("size : " + keySlices.size());

        for (KeySlice ks : keySlices) {

                System.out.println("key : " + new String(ks.getKey()));

                for (ColumnOrSuperColumn columns : ks.columns) {

                System.out.println(" column name : " + new String(columns.getColumn().getName()) + 

                ", value : " + new String(columns.getColumn().getValue()));

                }

        }

        tr.close();

}


    

}



3. Hector


<dependency>

<groupId>me.prettyprint</groupId>

<artifactId>hector-core</artifactId>

<version>1.0-2</version>

</dependency>


package com.google.cassandra.client;


import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;


import me.prettyprint.cassandra.model.BasicColumnDefinition;

import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;

import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;

import me.prettyprint.cassandra.model.CqlQuery;

import me.prettyprint.cassandra.model.CqlRows;

import me.prettyprint.cassandra.model.RowImpl;

import me.prettyprint.cassandra.serializers.StringSerializer;

import me.prettyprint.cassandra.service.CassandraHostConfigurator;

import me.prettyprint.hector.api.Cluster;

import me.prettyprint.hector.api.HConsistencyLevel;

import me.prettyprint.hector.api.Keyspace;

import me.prettyprint.hector.api.beans.ColumnSlice;

import me.prettyprint.hector.api.beans.HColumn;

import me.prettyprint.hector.api.beans.OrderedRows;

import me.prettyprint.hector.api.beans.Row;

import me.prettyprint.hector.api.ddl.ColumnIndexType;

import me.prettyprint.hector.api.ddl.ComparatorType;

import me.prettyprint.hector.api.factory.HFactory;

import me.prettyprint.hector.api.mutation.Mutator;

import me.prettyprint.hector.api.query.QueryResult;

import me.prettyprint.hector.api.query.RangeSlicesQuery;


public class HectorTest {

public static void main(String[] args) {

HectorTest sample = new HectorTest();

sample.getConfig();

sample.create();

sample.insert();

sample.read1();

sample.read2();

/*

CREATE TABLE cf6 (

  key text,

  name text,

  number text,

  PRIMARY KEY (key)

) WITH COMPACT STORAGE AND

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=0 AND

  index_interval=128 AND

  read_repair_chance=0.000000 AND

  replicate_on_write='false' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};

  

  CREATE INDEX name_idx ON cf6 (name);


CREATE INDEX number_idx ON cf6 (number);

  

  

  

  cqlsh:userkeyspace> select * from cf6;


(0 rows)

why? bug?

 */

}

Cluster cluster = null;

Keyspace keySpace = null;

BasicColumnFamilyDefinition columnFamilyDefinition = null;

public static final String KEY_SPACE_NAME = "userkeyspace";

public static final String COLUMN_FAMILY_NAME = "cf6";


public void getConfig() {

String hosts = "localhost:9160"

// connection pool : https://github.com/hector-client/hector/wiki/User-Guide 

        CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator(hosts); 

        cassandraHostConfigurator.setMaxActive(1); 

        cassandraHostConfigurator.setCassandraThriftSocketTimeout(3000); 

        cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(4000); 

        cluster = HFactory.getOrCreateCluster("Test Cluster", cassandraHostConfigurator);

        

        // basic case : not connection pool 

//cluster = HFactory.getOrCreateCluster("Test Cluster", "localHost:9160");

// consistency level

ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel();

Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>();


clmap.put("MyColumnFamily", HConsistencyLevel.ONE);

configurableConsistencyLevel.setReadCfConsistencyLevels(clmap);

configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap);

HFactory.createKeyspace(KEY_SPACE_NAME, cluster, configurableConsistencyLevel);

keySpace = HFactory.createKeyspace(KEY_SPACE_NAME, cluster);

}


public void create() {

cluster.dropColumnFamily(KEY_SPACE_NAME, COLUMN_FAMILY_NAME, true);

StringSerializer stringSerializer = StringSerializer.get();


// column definition

BasicColumnDefinition numberColumnDefinition = new BasicColumnDefinition();

numberColumnDefinition.setName(stringSerializer.toByteBuffer("number"));   

numberColumnDefinition.setIndexName("number_idx");

numberColumnDefinition.setIndexType(ColumnIndexType.KEYS);

numberColumnDefinition.setValidationClass(ComparatorType.UTF8TYPE.getClassName());

BasicColumnDefinition nameColumnDefinition = new BasicColumnDefinition();

nameColumnDefinition.setName(stringSerializer.toByteBuffer("name"));

nameColumnDefinition.setIndexName("name_idx");

nameColumnDefinition.setIndexType(ColumnIndexType.KEYS);

nameColumnDefinition.setValidationClass(ComparatorType.UTF8TYPE.getClassName());

// column family (table) definition

columnFamilyDefinition = new BasicColumnFamilyDefinition();

        columnFamilyDefinition.setKeyspaceName(KEY_SPACE_NAME);

        columnFamilyDefinition.setName(COLUMN_FAMILY_NAME);

        columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());

        columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);

        

// adding column family to column.

        columnFamilyDefinition.addColumnDefinition(numberColumnDefinition);

        columnFamilyDefinition.addColumnDefinition(nameColumnDefinition);

        

cluster.addColumnFamily(columnFamilyDefinition);

assert columnFamilyDefinition != null;

    }

public void insert() {

StringSerializer stringSerializer = StringSerializer.get();

        Mutator<String> mutator = HFactory.createMutator(keySpace, stringSerializer);

        

        // mutator insert 

        mutator.insert("row1", columnFamilyDefinition.getName(), HFactory.createStringColumn("1", "Kim"));

        mutator.insert("row1", columnFamilyDefinition.getName(), HFactory.createStringColumn("2", "Park"));

        mutator.insert("row1", columnFamilyDefinition.getName(), HFactory.createStringColumn("3", "Yun"));

        

        // mutator batch

        mutator.addInsertion("row2", columnFamilyDefinition.getName(), HFactory.createStringColumn("1", "A"))

        .addInsertion("row2", columnFamilyDefinition.getName(), HFactory.createStringColumn("2", "B"))

        .addInsertion("row2", columnFamilyDefinition.getName(), HFactory.createStringColumn("3", "C"));

        mutator.execute();

}


// RangeSliceQuery

public void read1() {

if (null == cluster || null == keySpace) {

// throw exception

return;

}

int row_count = 100;

RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory

            .createRangeSlicesQuery(keySpace, StringSerializer.get(), StringSerializer.get(), StringSerializer.get())

            .setColumnFamily(COLUMN_FAMILY_NAME)

            .setRange(null, null, false, 10)

            .setRowCount(row_count);

String last_key = null;

while (true) {

rangeSlicesQuery.setKeys(last_key, null);

            QueryResult<OrderedRows<String, String, String>> result1 = rangeSlicesQuery.execute();

            System.out.println(  result1);

            OrderedRows<String, String, String> rows1 = result1.get();

            Iterator<Row<String, String, String>> rowsIterator = rows1.iterator();


            // we'll skip this first one, since it is the same as the last one from previous time we executed

            if (last_key != null && rowsIterator != null) {

            rowsIterator.next();   

            }


while (rowsIterator.hasNext()) {

Row<String, String, String> row = rowsIterator.next();

last_key = row.getKey();

if (row.getColumnSlice().getColumns().isEmpty()) {

continue;

}

  System.out.println(row);

  ColumnSlice<String, String> columnSlice = row.getColumnSlice();

  Iterator<HColumn<String, String>> columnSliceIterator = columnSlice.getColumns().listIterator();

  while (columnSliceIterator.hasNext()) {

  HColumn<String, String> hColumn = columnSliceIterator.next();

  System.out.println("  name : " + hColumn.getName() + ", value : " + hColumn.getValue());

  }

}

if (rows1.getCount() < row_count) {

    break;

}

}

}


// Cqlquery

public void read2() {

if (null == cluster || null == keySpace) {

// throw exception

return;

}


// CqlQuery

CqlQuery<String, String, String> cqlQuery = new CqlQuery<String, String, String>(

keySpace, StringSerializer.get(), StringSerializer.get(),

StringSerializer.get());

cqlQuery.setQuery("select * from " + COLUMN_FAMILY_NAME);

QueryResult<CqlRows<String, String, String>> result = cqlQuery.execute();

CqlRows<String, String, String> rows = result.get();

if (null != rows) {

System.out.println();

for (int i = 0; i < rows.getCount(); i++) {

RowImpl<String, String, String> row = (RowImpl<String, String, String>) rows.getList().get(i);

System.out.println("Row key = " + row.getKey());

for (HColumn<String, String> column : row.getColumnSlice().getColumns()) {

System.out.println("  Column name = " + column.getName().toString() + ", value = " + column.getValue().toString());

}

}

}

}

}




4. Astyanax


pom.xml

<dependency>

<groupId>com.netflix.astyanax</groupId>

<artifactId>astyanax-cassandra</artifactId>

<version>1.56.37</version>

</dependency>

<dependency>

<groupId>com.netflix.astyanax</groupId>

<artifactId>astyanax-thrift</artifactId>

<version>1.56.37</version>

</dependency>


code

package com.google.cassandra.client;


import com.netflix.astyanax.AstyanaxContext;

import com.netflix.astyanax.ColumnListMutation;

import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.MutationBatch;

import com.netflix.astyanax.connectionpool.NodeDiscoveryType;

import com.netflix.astyanax.connectionpool.OperationResult;

import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;

import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;

import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;

import com.netflix.astyanax.model.ColumnFamily;

import com.netflix.astyanax.model.ColumnList;

import com.netflix.astyanax.model.CqlResult;

import com.netflix.astyanax.model.Row;

import com.netflix.astyanax.serializers.StringSerializer;

import com.netflix.astyanax.thrift.ThriftFamilyFactory;


public class AstyanaxTest {

public static void main(String[] args) {

AstyanaxTest sample = new AstyanaxTest();

sample.getConfig();

sample.create();

sample.insert();

sample.list();

}

/*

cqlsh:userkeyspace> describe columnfamily cf21


CREATE TABLE cf21 (

  key text,

  number text,

  name text,

  PRIMARY KEY (key, number)

) WITH COMPACT STORAGE AND

  bloom_filter_fp_chance=0.010000 AND

  caching='KEYS_ONLY' AND

  comment='' AND

  dclocal_read_repair_chance=0.000000 AND

  gc_grace_seconds=864000 AND

  index_interval=128 AND

  read_repair_chance=0.100000 AND

  replicate_on_write='true' AND

  populate_io_cache_on_flush='false' AND

  default_time_to_live=0 AND

  speculative_retry='NONE' AND

  memtable_flush_period_in_ms=0 AND

  compaction={'class': 'SizeTieredCompactionStrategy'} AND

  compression={'sstable_compression': 'LZ4Compressor'};

  

  cqlsh:userkeyspace> select * from cf21;


 key  | number | name

------+--------+------

 row1 |      1 |  Kim

 row1 |      2 |  Yun

 row1 |      3 | Park

 row2 |      1 | Choi



*/

public static final String CLUSTER_NAME = "Test Cluster";

public static final String KEY_SPACE_NAME = "userkeyspace";

public static final String COLUMN_FAMILY_NAME = "cf21";


private AstyanaxContext<Keyspace> context;

private Keyspace keyspace;

private ColumnFamily<String, String> COLUMN_FAMILY;

public void getConfig() {

context = new AstyanaxContext.Builder()

.forCluster(CLUSTER_NAME)

.forKeyspace(KEY_SPACE_NAME)

.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()

.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))

// connection pool support

.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")

.setPort(9160).setMaxConnsPerHost(1)

.setSeeds("127.0.0.1:9160"))

.withAstyanaxConfiguration(new AstyanaxConfigurationImpl().setCqlVersion("3.0.0")

.setTargetCassandraVersion("2.0.1"))

.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())

.buildKeyspace(ThriftFamilyFactory.getInstance());


context.start();

keyspace = context.getClient();


COLUMN_FAMILY = ColumnFamily.newColumnFamily(COLUMN_FAMILY_NAME,

StringSerializer.get(), StringSerializer.get());

}


public void insert() {

try {

String statement = String.format("insert into " + COLUMN_FAMILY_NAME + " (key, number, name) values (?, ?, ?) \n");

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue("row1")

.withStringValue("1")

.withStringValue("Kim")

.execute();

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue("row1")

.withStringValue("2")

.withStringValue("Yun")

.execute();


keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue("row1")

.withStringValue("3")

.withStringValue("Park")

.execute();

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue("row2")

.withStringValue("1")

.withStringValue("Choi")

.execute();

} catch (ConnectionException e) {

throw new RuntimeException("failed to write data to cql", e);

}

}


public void insertBatch(String key, String[]... entries) {

MutationBatch m = keyspace.prepareMutationBatch();


ColumnListMutation<String> clm = m.withRow(COLUMN_FAMILY, key);

for (String[] kv : entries) {

clm.putColumn(kv[0], kv[1], null);

}


try {

m.execute();

} catch (ConnectionException e) {

throw new RuntimeException("failed to write data to batch insert", e);

}

}


public void create() {

drop();

String statement = "create table IF NOT EXISTS " + COLUMN_FAMILY_NAME

+ " (KEY text, number text, name text, PRIMARY KEY (KEY, number)) WITH COMPACT STORAGE;";

try {

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.execute();

} catch (ConnectionException e) {

throw new RuntimeException("failed to create table", e);

}

}


public void list() {

String statement = "SELECT * FROM " + COLUMN_FAMILY_NAME + " WHERE key = ?";

try {

OperationResult<CqlResult<String, String>> result = keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue("row1")

.execute();

for (Row<String, String> row : result.getResult().getRows()) {

ColumnList<String> cols = row.getColumns();

System.out.println("- key : " + cols.getStringValue("key", null) + ", name : "

cols.getStringValue("name", null) + ", number : "

cols.getStringValue("number", null));

}

} catch (ConnectionException e) {

throw new RuntimeException("failed to read from cql", e);

}

}

public void drop() {

try {

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql("drop table IF EXISTS " + COLUMN_FAMILY_NAME)

.execute();

} catch (ConnectionException e) {

throw new RuntimeException("failed to create table", e);

}

}



}



5. Easy Cassandra

Easycassandra는 JPA 규격을 따르는 Kundera와 비슷하다. Kunder도 아래와 같이 비슷하게 테스트할 수 있다. 


pom.xml


<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-core</artifactId>

<version>3.2.2.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>3.2.2.RELEASE</version>

</dependency>

<dependency>

<groupId>org.springframework.data</groupId>

<artifactId>spring-data-commons-core</artifactId>

<version>1.4.1.RELEASE</version>

</dependency>


<dependency>

<groupId>org.easycassandra</groupId>

<artifactId>EasyCassandra</artifactId>

<version>2.0.0-RC3</version>

</dependency>

<dependency>

<groupId>com.datastax.cassandra</groupId>

<artifactId>cassandra-driver-core</artifactId>

<version>1.0.3</version> 

</dependency>


code


package com.google.cassandra.client;


import org.springframework.context.ApplicationContext;

import org.springframework.context.support.GenericXmlApplicationContext;


import com.google.cassandra.client.spring.Person;

import com.google.cassandra.client.spring.PersonRepository;


public class EasyCassandraTest {


/*

 CREATE TABLE cf33 (

  key text,

  number text,

  name text,

  PRIMARY KEY (key, number)

 */

public static void main(String[] args) {

@SuppressWarnings("resource")

ApplicationContext ctx = new GenericXmlApplicationContext("SpringConfig.xml");

PersonRepository personService = ctx.getBean(PersonRepository.class);


Person person = new Person();

person.setKey("row1");

person.setName("Kim");

person.setNumber("1");

personService.save(person);


person = new Person();

person.setKey("row1");

person.setName("John");

person.setNumber("2");

personService.save(person);

Iterable<Person> iter = personService.findAll();

for (Person p : iter) {

System.out.println("key : " + p.getKey() + ", number : " + p.getNumber() + " , name : " + p.getName());

}


Person xman = personService.findOne("row1");

System.out.println("xman : " +xman.getName());

}

}





package com.google.cassandra.client.spring;


import java.io.Serializable;


import javax.persistence.Column;

import javax.persistence.Entity;

import javax.persistence.Id;


import org.easycassandra.Index;


// easy

@Entity(name = "cf33")

public class Person implements Serializable {


private static final long serialVersionUID = 3L;


@Id

private String key;


@Index

@Column(name = "name")

private String name;


@Column(name = "number")

private String number;


public String getKey() {

return key;

}

public void setKey(String key) {

this.key = key;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public void setNumber(String number) {

this.number = number;

}

public String getNumber() {

return number;

}

@Override

public int hashCode() {

final int prime = 31;

int result = 1;

result = prime * result + ((key == null) ? 0 : key.hashCode());

return result;

}


@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (obj == null)

return false;

if (getClass() != obj.getClass())

return false;

Person other = (Person) obj;

if (key == null) {

if (other.key != null)

return false;

} else if (!key.equals(other.key))

return false;

return true;

}


}



package com.google.cassandra.client.spring;


import org.easycassandra.persistence.cassandra.spring.CassandraRepository;

import org.easycassandra.persistence.cassandra.spring.CassandraTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Repository;



@Repository("personRepository")

public class PersonRepository extends CassandraRepository<Person, String> {


@Autowired

private CassandraTemplate cassandraTemplate;


@Override

protected CassandraTemplate getCassandraTemplate() {

return cassandraTemplate;

}


}






Posted by 김용환 '김용환'

쓸만한 cassandra java client api를 찾아보았다. (2013.11월 기준이고 내 맘대로 선택한 내용이다.)

현재는 spring data에서 cassandra는 지원하고 있지 않다. 



name

feature 

consistency

level

connection

pool

cql support 

spring 

integration

license

active  

issue 

my choice 

 cql

jdbc driver,

datasax 

 Δ

(read=write)

 Δ (simple)

 O

 ?

 apache

X

no more update jdbc driver 

 X

 thrift

 

 O

 X

 O

 ?

 apache

 O

 

 O

 hector

thrift based

 O

 O

 O

 O

 mit

 O

 

 O

 Astyanax

thrift based, netflix 

 O

 O

 O

 O

 apache

 O

 

 O

 Kundera

jpa 2.0, multiple nosql support 

 O

 O

 O

 O

 apache

 O

 use  pelops?

 X

 Easy Cassandra

jap 2.0

 O

X 

 O

 apache

  O

 latest version not support, 1 man coding

 X


1) jdbc-driver, data sax cql

cql을 편하게 쓰려고 했는데, code google에 있는 jdbc driver는 더이상 업데이트가 되지 않는다. 

consistency level을 jdbc driver에서 지원하지만 read/write별로 되지는 않는다.


2) Thrift

cassandra의 날(?) thrfit를 쓰는 방식은 좀이 많이 간다. 하지만 정말 괜찮은 라이브러리가 없다면 선택해야 한다.connection pool은 구현해야 한다. 


3) Hector

좋다. connection pool도 있고, cql도 마음대로 쓸 수 있다. consistent level또한 마음대로 할 수 있다. 


4) Astyanax

Hector의 refactoring 버전이라고 하더니. 정말 괜찮은 api다.netflix에서 밀고 있으니 AWS에서 계속 쓰는 한 계속 지원할 것 같다. 


5) Kundera

JPA 2.0 지원해서 좋은데, pelops가 기본 값이다. 신뢰도가 확 떨어진다. (2013년 2월 부터 pelops는 운영되고 있지 않는다.) 그리고, Kundera를 쓰기는 아직 시기상조일듯, reference 자료가 그리 많지 않다.


6) Easy cassandra 

connection pool을 지원하지 않는 점이 아쉽다. 그러나 JPA 2.0 을 지원한다.

그리고 현재 cassandra 2.0을 지원하고 있지 않다. (lib dependent) 그리고 한사람 혼자서 개발하고 있다..


간단하게 코딩해보니 hector나 astyanax쪽으로 하는 게 맘 편한 듯 하다.







Posted by 김용환 '김용환'


cassandra 에서 cql과 thrift간의 성능자료가 있나해서 인터넷 검색하니, 다음 결과를 찾을 수 있었다.

(1) DataSax에서의 Nosql비교 (http://www.datastax.com/dev/blog/2012-in-review-performance)


 



(2) NBP(Cubrid)에서의 Nosql 비교 (http://www.cubrid.org/blog/dev-platform/nosql-benchmarking/)



(3) 어느 블로거의 테스트 (http://www.nosqlbenchmarking.com/2011/02/new-results-for-cassandra-0-7-2/)


(4) 어느 블로거의 테스트- cassandra gc (http://blog.mikiobraun.de/2010/08/cassandra-gc-tuning.html)


(5) Netflix (http://techblog.netflix.com/2011/11/benchmarking-cassandra-scalability-on.html)

l

(6) Evans의 cql과 thrift rpc 비교 자료 (http://www.acunu.com/2/post/2011/12/cql-benchmarking.html)





Evans 는 다시 cassnadra jira(https://issues.apache.org/jira/browse/CASSANDRA-3634) 에도 비슷한 내용을 올렸는데, 재미있는 자료가 있다.




(6) mongodb에서 cassandra로 바꾼 내용 (http://www.planetcassandra.org/blog/post/cassandra-vs-mongodb-for-time-series-data)



(7) java client 비교 -  (http://theeye.pe.kr/entry/performance-comparison-with-kundera-hector-pelops)


Posted by 김용환 '김용환'


cassandra jdbc의 cql이 좋기는 하지만 consistency level 를 지정할 수 있는 방법이 최근에 추가되었다.


cassandra jdbc 1.2.1이하 버전에서는 consistency level은 모두 one으로만 동작되었다.  이를 위해서 cqlsh에서 작업할 때, 아래와 같이 consistency level을 지정해 줘야 했다.


cqlsh:userkeyspace> consistency one;

Consistency level set to ONE.



그러나, cassandar jdbc 1.2.5 버전에서 이 부분이 패치가 되었다.

https://code.google.com/a/apache-extras.org/p/cassandra-jdbc/issues/detail?id=85

https://code.google.com/a/apache-extras.org/p/cassandra-jdbc/issues/detail?id=71


 connection url에 consistent level을 지정할 수 있으며, java api를 이용해서 set/get을 할 수 있도록 해놨다.


package org.apache.cassandra.cql.jdbc;


import java.sql.Statement;


import org.apache.cassandra.thrift.ConsistencyLevel;


public interface CassandraStatementExtras extends Statement

{

    public void setConsistencyLevel(ConsistencyLevel consistencyLevel);

    

    public ConsistencyLevel getConsistencyLevel();

}




직접 하나의 예제 코드를 작성했다.


{

Class.forName("org.apache.cassandra.cql.jdbc.CassandraDriver");

con = DriverManager.getConnection("jdbc:cassandra://localhost:9160/userkeyspace?consistency=ONE");


Statement stmt = con.createStatement();

ConsistencyLevel cl = statementExtras(stmt).getConsistencyLevel();

System.out.println("consistency level : " + cl );

}


...

    private CassandraStatementExtras statementExtras(Statement statement) throws Exception {

        Class cse = Class.forName("org.apache.cassandra.cql.jdbc.CassandraStatementExtras");

        return (CassandraStatementExtras) statement.unwrap(cse);

    }


출력 결과는 consistency=??에 맞게 제대로 출력된다. 

따라서 cql에서도 다양한 consistency level을 지정할 수 있다. 

그러나 현재로서는 hector나 java thrif처럼 쉽게 read/write 단위로 consistency level을 지정하기는 어려운 것 같다. App에서 hector나 java thrift와 비슷하게 하려면 connection을 두개를 만들어서 read/write용으로 따로 만드면 될 것 같기는 하다. 세밀한 콘트롤이 되려면 시간이 필요할 것 같다. 



* 테스트환경

- client

cassandra-driver-core-2.0.0-beta2

cassandra-thrift-2.0.1

libthrfit-0.9.1.jar

cassandra-jdbc-1.2.5.jar


- server

cassandra 2.0.1


* 참고사항

cassandra 1.2부터는  WITH CONSISTENCY LEVEL을 사용할 수 없다.

cqlsh:db_simple> SELECT * FROM test USING CONSISTENCY ONE WHERE a=1;

Bad Request: line 1:19 missing EOF at 'USING'


Posted by 김용환 '김용환'


cassandra 2.0.1을 사용중이다. 


'delete from tablename' 하면 실행이 되지 않고 다음과 같은 에러가 발생한다. delete는 index가 걸려있는 column에만 사용할 수 있다. 즉, where절을 써야 한다. 


Exception in thread "main" java.sql.SQLSyntaxErrorException: line 0:-1 mismatched input '<EOF>' expecting K_WHERE



따라서 delete all rows 의 의도를 갖는 cql 질의는 "truncate table_name"이 되어야 한다.

delete 쿼리는 where절을 이용해서 row와 index가 걸린 column을 이용할때 사용하면 될 듯 하다. 


(Hector에서는 Cluster에 truncateColumnFamily 메소드를 제공하고 있다.)

Posted by 김용환 '김용환'