Cassandra-unit 으로는 Astyanax 테스트가 어렵다. 

http://knight76.tistory.com/entry/cassandra-transportmessagesErrorMessage-Unexpected-exception-during-request-astyanaxthriftThriftConverter-Read-a-negative-frame-size-2113929216



그래서 Astyanax 에서 제공하는 SingletonEmbeddedCassandra를 이용하여 테스트를 진행하였다. 이를 이용하면 따로 Cassandra 서버를 설치하지 않고도 Cassandra 를 테스트할 수 있다. cql뿐 아니라 column family 단위로 쉽게 테스트가 가능하다. 


테스트 예제를 공개한다. 



<AstyanaxCassandraSpringTest.java>

package org.cassandraunit.test.astyanax;


import org.hamcrest.MatcherAssert;

import org.hamcrest.Matchers;

import org.junit.AfterClass;

import org.junit.Before;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


import com.google.common.collect.ImmutableMap;

import com.netflix.astyanax.AstyanaxContext;

import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.NodeDiscoveryType;

import com.netflix.astyanax.connectionpool.OperationResult;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;

import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;

import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;

import com.netflix.astyanax.model.ColumnFamily;

import com.netflix.astyanax.model.CqlResult;

import com.netflix.astyanax.serializers.LongSerializer;

import com.netflix.astyanax.serializers.StringSerializer;

import com.netflix.astyanax.thrift.ThriftFamilyFactory;

import com.netflix.astyanax.util.SingletonEmbeddedCassandra;


@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = { "classpath:applicationContext-cassandra-astyanax.xml" })

public class AstyanaxCassandraSpringTest {


private static Keyspace                  keyspace;

private static AstyanaxContext<Keyspace> keyspaceContext;


private static String TEST_CLUSTER_NAME  = "Test Cluster";

private static String TEST_KEYSPACE_NAME = "os";

private static final String SEEDS = "localhost:9160";


private static ColumnFamily<String, String> TABLE = 

            ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get(), LongSerializer.get());

            

    @Autowired

StorageDao storageDao;

    

    @Before

    public void before() throws Exception {

SingletonEmbeddedCassandra.getInstance();

Thread.sleep(1000 * 3);

createKeyspace();

createTable();

Thread.sleep(1000 * 3);

    }

    

private void createTable() throws Exception {

OperationResult<CqlResult<String, String>> result;

result = keyspace

   .prepareQuery(TABLE)

   .withCql("create table cpu_util_pct (guid text PRIMARY KEY, pct double) ")

   .execute();

}

@AfterClass

public static void teardown() throws Exception {

if (keyspaceContext != null)

keyspaceContext.shutdown();


Thread.sleep(1000 * 10);

}


private  void createKeyspace() throws Exception {

keyspaceContext = new AstyanaxContext.Builder()

.forCluster(TEST_CLUSTER_NAME)

.forKeyspace(TEST_KEYSPACE_NAME)

.withAstyanaxConfiguration(

new AstyanaxConfigurationImpl()

.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)

.setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE))

.withConnectionPoolConfiguration(

new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME

+ "_" + TEST_KEYSPACE_NAME)

.setSocketTimeout(30000)

.setMaxTimeoutWhenExhausted(2000)

.setMaxConnsPerHost(20)

.setInitConnsPerHost(10)

.setSeeds(SEEDS))

.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())

.buildKeyspace(ThriftFamilyFactory.getInstance());


keyspaceContext.start();


keyspace = keyspaceContext.getEntity();


keyspace.createKeyspace(ImmutableMap.<String, Object>builder()

.put("strategy_options", ImmutableMap.<String, Object>builder()

.put("replication_factor", "1")

.build())

.put("strategy_class",     "SimpleStrategy")

.build()

);

}

  

@org.junit.Test

public void insertTest() {

storageDao.insertCpu("a", 0);

double d1 = storageDao.getCpuUtil("a");

MatcherAssert.assertThat(d1, Matchers.is((double) 0));

storageDao.insertCpu("a", 2);

double d2 = storageDao.getCpuUtil("a");

MatcherAssert.assertThat(d2, Matchers.is((double) 2));

storageDao.insertCpu("b", 10);

double d3 = storageDao.getCpuUtil("b");

MatcherAssert.assertThat(d3, Matchers.is((double) 10));

}

}




<AstyanaxClient.java>

package org.cassandraunit.test.astyanax;


import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.netflix.astyanax.AstyanaxContext;

import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.NodeDiscoveryType;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;

import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;

import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;

import com.netflix.astyanax.thrift.ThriftFamilyFactory;


public class AstyanaxClient {


private static final Logger log = LoggerFactory.getLogger(AstyanaxClient.class);


private AstyanaxContext<Keyspace> context;

private Keyspace keyspace;

private String clusterName;

private String keyspaceName;

private String cqlVersion;

private String targetCassandraVersion;

private String connectionPoolName;

private int port;

private int maxConnsPerHost;

private String seeds;

private NodeDiscoveryType discoveryType;


@PostConstruct

public void init() {

context = new AstyanaxContext.Builder()

.forCluster(clusterName)

.forKeyspace(keyspaceName)

.withAstyanaxConfiguration(

new AstyanaxConfigurationImpl()

.setDiscoveryType(discoveryType)

.setCqlVersion(cqlVersion)

.setTargetCassandraVersion(

targetCassandraVersion))

.withConnectionPoolConfiguration(

new ConnectionPoolConfigurationImpl(connectionPoolName)

.setPort(port)

.setMaxConnsPerHost(maxConnsPerHost)

.setSeeds(seeds)

.setTimeoutWindow(3000)

.setConnectTimeout(3000)

)

.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())

.buildKeyspace(ThriftFamilyFactory.getInstance());


context.start();

System.out.println(context.toString());

keyspace = context.getClient();

}


@PreDestroy

public void destroy() {

context.shutdown();

}


public String toString() {

return new StringBuffer().append("clusterName : ")

.append(getClusterName()).append("\r\n")

.append("keyspaceName : ").append(getKeyspaceName())

.append("\r\n").append("cqlVersion : ").append(getCqlVersion())

.append("\r\n").append("targetCassandraVersion : ")

.append(getTargetCassandraVersion()).append("\r\n")

.append("connectionPoolName : ")

.append(getConnectionPoolName()).append("\r\n")

.append("port : ").append(getPort()).append("\r\n")

.append("maxConnsPerHost : ").append(getMaxConnsPerHost())

.append("\r\n").append("seeds : ").append(getSeeds())

.append("\r\n").append("discoveryType : ")

.append(getDiscoveryType()).toString();


}


public Keyspace getKeyspace() {

return this.keyspace;

}


public String getClusterName() {

return clusterName;

}


public void setClusterName(String clusterName) {

this.clusterName = clusterName;

}


public String getKeyspaceName() {

return keyspaceName;

}


public void setKeyspaceName(String keyspaceName) {

this.keyspaceName = keyspaceName;

}


public NodeDiscoveryType getDiscoveryType() {

return discoveryType;

}


public void setDiscoveryType(NodeDiscoveryType discoveryType) {

this.discoveryType = discoveryType;

}


public String getCqlVersion() {

return cqlVersion;

}


public void setCqlVersion(String cqlVersion) {

this.cqlVersion = cqlVersion;

}


public String getTargetCassandraVersion() {

return targetCassandraVersion;

}


public void setTargetCassandraVersion(String targetCassandraVersion) {

this.targetCassandraVersion = targetCassandraVersion;

}


public String getConnectionPoolName() {

return connectionPoolName;

}


public void setConnectionPoolName(String connectionPoolName) {

this.connectionPoolName = connectionPoolName;

}


public int getPort() {

return port;

}


public void setPort(int port) {

this.port = port;

}


public int getMaxConnsPerHost() {

return maxConnsPerHost;

}


public void setMaxConnsPerHost(int maxConnsPerHost) {

this.maxConnsPerHost = maxConnsPerHost;

}


public String getSeeds() {

return seeds;

}


public void setSeeds(String seeds) {

this.seeds = seeds;

}

}




<StorageDao.java>

package org.cassandraunit.test.astyanax;


import java.util.Comparator;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.stereotype.Component;


import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.OperationResult;

import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;

import com.netflix.astyanax.model.ColumnFamily;

import com.netflix.astyanax.model.ColumnList;

import com.netflix.astyanax.model.CqlResult;

import com.netflix.astyanax.model.Row;

import com.netflix.astyanax.serializers.IntegerSerializer;

import com.netflix.astyanax.serializers.StringSerializer;


@Component

public class StorageDao {

private static final Logger log = LoggerFactory.getLogger(StorageDao.class);

@Autowired

@Qualifier("osClient")

private AstyanaxClient osClient;

public void insertCpu(String guid, double pct) {

Keyspace keyspace = osClient.getKeyspace();

ColumnFamily<String, String> COLUMN_FAMILY = 

ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get());

try {

String statement = String.format("insert into cpu_util_pct (guid, pct) values (?, ?);");

log.debug(statement);

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue(guid)

.withDoubleValue(pct)

.execute();

} catch (Exception e) {

e.printStackTrace();

throw new RuntimeException("failed to insert.", e);

}

}


public double getCpuUtil(String guid) {

Keyspace keyspace = osClient.getKeyspace();

ColumnFamily<Integer, String> COLUMN_FAMILY = ColumnFamily.newColumnFamily("cpu_util_pct", IntegerSerializer.get(), StringSerializer.get());


double cpuPct = 0;

try {

OperationResult<CqlResult<Integer, String>> result = keyspace.prepareQuery(COLUMN_FAMILY)

.withCql("SELECT pct FROM cpu_util_pct WHERE guid = '" + guid + "'")

.execute();

for (Row<Integer, String> row : result.getResult().getRows()) {

ColumnList<String> cols = row.getColumns();

cpuPct = cols.getDoubleValue("pct", 0.0D);

}

} catch (ConnectionException e) {

e.printStackTrace();

throw new RuntimeException("failed to read from cql", e);

}

return cpuPct;

}

class IntTreeMapComparator implements Comparator<Long> {

@Override

public int compare(Long o1, Long o2) {

       return  o1.compareTo(o2);

   } 

}


}



<applicationContext-cassandra-astyanax.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:component-scan base-package="org.cassandraunit.test.astyanax"/>


<!-- properties -->

<context:property-placeholder order="1"

ignore-unresolvable="true" location="classpath:cassandra.properties" />

<bean id="storageDao" class="org.cassandraunit.test.astyanax.StorageDao"/>

<bean id="osClient" class="org.cassandraunit.test.astyanax.AstyanaxClient">

<property name="clusterName" value="${system.cassandra.cluster.name}" />

<property name="discoveryType" value="${system.cassandra.discovery.type.token}" />

<property name="cqlVersion" value="${system.cassandra.cql.version}" />

<property name="targetCassandraVersion" value="${system.cassandra.target.version}" />

<property name="port" value="${system.cassandra.port}" />

<property name="maxConnsPerHost" value="${system.cassandra.max.host}" />

<property name="seeds" value="${system.cassandra.seeds}" />

<property name="connectionPoolName" value="${system.cassandra.connection.pool.name}" />

<property name="keyspaceName" value="${system.cassandra.os.keyspace.name}" />

</bean>



</beans>


클라이언트 설정 정보 - Astyanax 에서 이용

<cassandra.properties>

# system cassandra DB Info

system.cassandra.connection.pool.name=cassandraPool

system.cassandra.port=9160

system.cassandra.max.host=1

system.cassandra.seeds=127.0.0.1:9160

system.cassandra.cql.version=3.0.0

system.cassandra.target.version=2.0.2

system.cassandra.cluster.name=Test Cluster


system.cassandra.discovery.type.ring=RING_DESCRIBE

system.cassandra.discovery.type.discovery=DISCOVERY_SERVICE

system.cassandra.discovery.type.token=TOKEN_AWARE

system.cassandra.discovery.type.none=NONE


system.cassandra.os.keyspace.name=os

 

서버 설정 정보.- SingletonEmbeddedCassandra 에서 사용

<cassandra-template.yaml>

cluster_name: '$CLUSTER$'

initial_token: 0

auto_bootstrap: false

hinted_handoff_enabled: false

max_hint_window_in_ms: 3600000 # one hour

authenticator: org.apache.cassandra.auth.AllowAllAuthenticator

#authority: org.apache.cassandra.auth.AllowAllAuthority

partitioner: org.apache.cassandra.dht.RandomPartitioner

#partitioner: org.apache.cassandra.dht.Murmur3Partitioner

data_file_directories:

    - $DIR$/data

commitlog_directory: $DIR$/commitlog

saved_caches_directory: $DIR$/saved_caches

commitlog_sync: periodic

commitlog_sync_period_in_ms: 10000

seed_provider:

    # Addresses of hosts that are deemed contact points. 

    # Cassandra nodes use this list of hosts to find each other and learn

    # the topology of the ring.  You must change this if you are running

    # multiple nodes!

    - class_name: org.apache.cassandra.locator.SimpleSeedProvider

      parameters:

          # seeds is actually a comma-delimited list of addresses.

          - seeds: "127.0.0.1"

#flush_largest_memtables_at: 0.75

#reduce_cache_sizes_at: 0.85

#reduce_cache_capacity_to: 0.6

concurrent_reads: 32

concurrent_writes: 32

memtable_flush_queue_size: 4

storage_port: $STORAGE_PORT$

listen_address: 127.0.0.1

rpc_address: 127.0.0.1

rpc_port: $PORT$

rpc_keepalive: true

thrift_framed_transport_size_in_mb: 15

thrift_max_message_length_in_mb: 16

incremental_backups: false

snapshot_before_compaction: false

column_index_size_in_kb: 64

in_memory_compaction_limit_in_mb: 64

compaction_throughput_mb_per_sec: 16

compaction_preheat_key_cache: true

endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch

dynamic_snitch: true

dynamic_snitch_update_interval_in_ms: 100 

dynamic_snitch_reset_interval_in_ms: 600000

dynamic_snitch_badness_threshold: 0.0

request_scheduler: org.apache.cassandra.scheduler.NoScheduler

index_interval: 128

 

 

<pom.xml>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.8.2</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.cassandraunit</groupId>

<artifactId>cassandra-unit-spring</artifactId>

<version>2.0.2.2-SNAPSHOT</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>com.netflix.astyanax</groupId>

<artifactId>astyanax</artifactId>

<version>1.56.44</version>

<exclusions>

<exclusion>

<artifactId>snappy-java</artifactId>

<groupId>org.xerial.snappy</groupId>

</exclusion>

</exclusions>


</dependency>

<dependency>

<groupId>org.apache.cassandra</groupId>

<artifactId>cassandra-all</artifactId>

<version>2.0.7</version>

<exclusions>

<exclusion>

<artifactId>snappy-java</artifactId>

<groupId>org.xerial.snappy</groupId>

</exclusion>

</exclusions>


</dependency>

<dependency>

<groupId>org.xerial.snappy</groupId>

<artifactId>snappy-java</artifactId>

<version>1.0.5-M4</version>

</dependency>

<dependency>

<groupId>org.jboss.netty</groupId>

<artifactId>netty</artifactId>

<version>3.1.0.GA</version>

</dependency>

            


Posted by '김용환'
,

Netflix의 Astyanax cassandra API Test Case를  cassandra-unit(https://github.com/jsevellec/cassandra-unit)을 이용해서 진행하던 차에 byte 단위 통신에서 실패가 발생한다. 그러나 Cassandra 서버와의 통신은 잘 된다.... (이건 머지? Cassandra Unit의 구현체?)


2일 동안 고생한 결과, 궁합이 맞지 않는 것으로 생각된다.  Astyatnax 의 com.netflix.astyanax.util.SingletonEmbeddedCassandra, com.netflix.astyanax.util.EmbeddedCassandra써서 유닛테스트 코드를 개발해야 한다.  


아래와 같은 Exception 발생시 고민해봐야할 것이다. 


1)

 [Native-Transport-Requests:102] ERROR transport.messages.ErrorMessage - Unexpected exception during request

java.lang.ArrayIndexOutOfBoundsException: 68

at org.apache.cassandra.transport.Message$Type.fromOpcode(Message.java:106)

at org.apache.cassandra.transport.Frame$Decoder.decode(Frame.java:168)

at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)

at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:722)


2014-05-14 18:14:30,141 [main] DEBUG astyanax.thrift.ThriftConverter - Read a negative frame size (-2113929216)!


2) 

 [Native-Transport-Requests:95] ERROR transport.messages.ErrorMessage - Unexpected exception during request

java.lang.ArrayIndexOutOfBoundsException: 55

at org.apache.cassandra.transport.Message$Type.fromOpcode(Message.java:106)

at org.apache.cassandra.transport.Frame$Decoder.decode(Frame.java:168)

at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)

at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)

at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)

at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)

at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:722)



Posted by '김용환'
,


Spring 기반으로 Cassandra를 테스트할 때, @EmbeddedCassandra 를 이용하여 자동으로 Cassandra 서버를 실행시킬 수 있다. 


@RunWith(SpringJUnit4ClassRunner.class)

@TestExecutionListeners({ CassandraUnitTestExecutionListener.class })

@CassandraDataSet(value = { "simple.cql" })

@EmbeddedCassandra

public class KnightSpringCQLScriptLoadTest {

Session session ;

    @Before

    public void should_have_started_and_execute_cql_script() throws Exception {

        Cluster cluster = Cluster.builder()

                .addContactPoints("127.0.0.1")

                .withPort(9142)

                .build();

        session = cluster.connect("cassandra_unit_keyspace");

    }


    @Test

    public void testSelect() {

        ResultSet result = session.execute("select * from mytable WHERE id='myKey01'");

        assertThat(result.iterator().next().getString("value"), Matchers.is("myValue01"));

    }

}



Cassandra의 디폴트 설정은 127.0.0.1:9142 이고 Test Cluster 라는 cluster 이름을 가지고 있다. 


https://github.com/jsevellec/cassandra-unit/blob/master/cassandra-unit-spring/src/main/java/org/cassandraunit/spring/EmbeddedCassandra.java


@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.TYPE)
@Inherited
@Documented
public @interface EmbeddedCassandra {
  // cassandra configuration file
  String configuration() default EmbeddedCassandraServerHelper.DEFAULT_CASSANDRA_YML_FILE;
  // the following settings is needed to load dataset, you must use the same value that can be found in configuration file
  String clusterName() default "Test Cluster";
  String host() default "127.0.0.1";
  // CQL port be default, use 9171 for Thrift
  int port() default 9142;
}


Posted by '김용환'
,


Cassandra 2.0.3 / 1.2.12 를 실행시킬 때, 아래와 같은 에러를 볼 수 있다. 


[main] ERROR cassandra.cql3.QueryProcessor - Unable to initialize MemoryMeter (jamm not specified as javaagent).  This means Cassandra will be unable to measure object sizes accurately and may consequently OOM.



에러까지는 아니고 MemoryMeter 클래스를 포함한 jamm java agent를 java runtime때 포함시키지 않아서 난 것이다. 이것때문에 CI나 Local 환경에서는이슈가 없다. 


그러나 상용(Production) 서버에서는 OOM을 발생하지 않도록 조심해야 하기 때문이다. 






2.0.2, 1.2.11 버전에 EmbeddedCassandraService 클래스를 사용시 아래와 같은 에러가 발생했다. (개인적으로 이 부분 패치를 원했는데.. 잘 되었다~^^;;)


 ERROR Native-Transport-Requests:109 org.apache.cassandra.transport.messages.ErrorMessage - Unexpected exception during request
java.lang.IllegalStateException: Instrumentation is not set; Jamm must be set as -javaagent





https://issues.apache.org/jira/browse/CASSANDRA-6293

Cassandra 6293 의 의거.. QueryProcessor에 MemoryMetter 초기화 여부에 따른 일부 코드가 추가되었다. 만약 초기화되지 않았으면, 'ERROR cassandra.cql3.QueryProcessor - Unable to initialize MemoryMeter (jamm not specified as javaagent).  This means Cassandra will be unable to measure object sizes accurately and may consequently OOM.' 라는 로그가 뜨도록 되어 있다. 


 


 static

    {

        if (MemoryMeter.isInitialized())

        {

            preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()

                                 .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)

                                 .weigher(cqlMemoryUsageWeigher)

                                 .build();

            thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>()

                                       .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)

                                       .weigher(thriftMemoryUsageWeigher)

                                       .build();

        }

        else

        {

            logger.error("Unable to initialize MemoryMeter (jamm not specified as javaagent).  This means "

                         + "Cassandra will be unable to measure object sizes accurately and may consequently OOM.");

            preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()

                                 .maximumWeightedCapacity(MAX_CACHE_PREPARED_COUNT)

                                 .build();

            thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>()

                                       .maximumWeightedCapacity(MAX_CACHE_PREPARED_COUNT)

                                       .build();

        }

    }



MemoryMeter가 초기화되었으면, weigher 인스턴스를 생성하는데. capacity 개수를 결정할 수 있도록 한다. 

즉, QueryProcessor에 key, value로 된 pair의 개수를 제한할 수 있다. 

MemoryMeter가 초기화되면 MAX_CACHE_PREPARED_MEMORY로 capacity를 설정하고 key-value paring을 지정할 수 있도록 하는 반면.. MemoryMetter가 초기화되지 않으면 capacity는 MAX_CACHE_PREPARED_COUNT로만 지정할 수 있다.대신 count만 가지고 제한하는 경우는 역시 메모리 크기가 이슈가 될 수 있으니 OOM (Out Of Memory)를 마음으로 준비하라는 뜻이다. 


final 변수는 다음과 같다. 

  1. private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
  2. private static final int MAX_CACHE_PREPARED_COUNT = 10000;


DB의 prepared statment size를 결정하는 것으로 보이는 부분이라 할 수 있을 것 같다. 이를 통해서 DB의 jdbc 와 비슷하게 cassandra 내부에서 결정할 수 있게 한다. 이를 추적할 수 있는 부분을 제공한 것이다. 



출처 : http://concurrentlinkedhashmap.googlecode.com/svn/wiki/release-1.1-LRU/com/googlecode/concurrentlinkedhashmap/ConcurrentLinkedHashMap.html


 A Weigher instance determines how many units of capacity that a value consumes. The default weigher assigns each value a weight of 1 to bound the map by the total number of key-value pairs.



MemoryMeter에 대한 코드는 아래 jamm github에서 확인해볼 수 있다. 

https://github.com/jbellis/jamm


Posted by '김용환'
,


datastax에서 2014년 5월 1일 좋은 튜토리얼을 공개했다.

cassandra를 이용해서 cql 개발시 많은 도움이 될 듯 하다. 


http://www.datastax.com/documentation/cql/3.1/pdf/cql31.pdf


CQL 스펙은 여기서 볼 수 있다. 

http://cassandra.apache.org/doc/cql3/CQL.html

Posted by '김용환'
,


현재 (2014.4.28) cassandra 2.0.7 이 최근 버전인데... (2.1.0-beta중.)


https://github.com/apache/cassandra/blob/trunk/CHANGES.txt


변경사항을 보니. 3.0 출신중인 듯 하다. 아직 trunk 버전이긴 한데.. 왠지.. 기분이 좋아진다. 빨리 움직이는 cassandra가 좋아진다. 


java8  컴파일 지원하다. CQL2가 사라진다. CQL3만 실제 사용가능하게 된다. 먼가 새롭게 바뀔 것이 있을란가? 자세한 내용은 아직 검색하지 못했지만. 기대하고 있다. 



3.0
 * Move sstable RandomAccessReader to nio2, which allows using the
   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 * Remove CQL2 (CASSANDRA-5918)
 * Add Thrift get_multi_slice call (CASSANDRA-6757)
 * Optimize fetching multiple cells by name (CASSANDRA-6933)
 * Allow compilation in java 8 (CASSANDRA-7208)



Posted by '김용환'
,

 

Hadoop Summit 2014 (암스테르담)에서 받은 DataStax 부스에서 받은 USB를 꽂아보니.

Cassandra 전환사례를 http 링크를 웹 브라우져를 통해서 화면에 보여주도록 동작한다.

 

아래 링크를 통해서 인터뷰자료를 바탕으로 다양한 Storage에서 Cassandra로 migration한 다양한 회사의 사례들을 확인할 수 있다.


 

 

첫페이지는 아래 싸이트. 오라클에서 Cassandra로 이동한 사례

 

http://planetcassandra.org/oracle-to-cassandra-migration/

Netflix와 영국 Sky 방송국에서 Cassandra 전환 사례 (회사를 클릭하면 인터뷰한 사례들이 나온다. 볼만하다.)

 

 

두번째는 Mysql 에서 Cassandra로 이동한 사례

http://planetcassandra.org/mysql-to-cassandra-migration/

AOL 도 옮겼구나..

 

 

세번째는 Mongodb에서 Cassandra로 이동한 사례

http://planetcassandra.org/mongodb-to-cassandra-migration/

다 잘 모르는 회사들

 

 

네번째는 Hbase에서 Cassandra 로 이동한 사례

http://planetcassandra.org/hbase-to-cassandra-migration/

다 잘 모르는 회사들

 

 

다섯번째는 Redis에서 Cassandra 로 이동한 사례

http://planetcassandra.org/redis-to-cassandra-migration/

Instagram이 눈에 띈다.

 

 

 

 

Posted by '김용환'
,


팁 or 주의사항 


1. camel case 주의 

(netflix의 astyanax를 이용한 케이스임)

create table 시 camel case로 column를 정의하면, describe table로 확인하면 모두 소문자로 변환되어 있다. 따라서 기대치는 camel case로 알고 있어 select 이용시 아무것도 없다고 나올 수 있다. insert시는 camel case와 상관없이 잘 작동된다. 


즉, create, insert시 대문자든, 소문자든 의도대로 사용할 수 있으나,  select시 camel case는 따진다.



2. composite key 적용

cassandra최근판이 가장 좋아진 것은 composite key . mysql로 따지자면 mulitple key를 쓸 수 있다는 점. 

cassandra에서는 where절 검색이 쉬워진다는 점이 있다. 

이슈는 다양한 where절 검색을 위해서 모든 필드가 primary key가 될 수 있다. 따라서 쉽게 검색이 된다는 점인데, 주로 timestamp나 timeuuid를 primary key 중의 하나가 되면 좋은 듯 했다.


3. 동작 여부 확인

netflix의 astyanax를 사용하면서 가장 불편한 점은 실행은 되는데, 때때로 잘못된 cql 에 대해서 exception없이 그냥 void return하는 경우가 있다. 이 때는 눈을 의심하고, 스스로 문제를 해결해야 한다.

db에서의 친절한 exception이 없다


만약 에러가 나타난다면 간단한 syntax 체크 또는 thrift binary 통신 이슈 외에는 없는 듯 했다. 



4. order by 된 채로 저장

처음부터 index(order by 된)채로 create table을 생성할 수 있다. 


  create table Bite (
partkey varchar,
score bigint,
id varchar,
data varchar,
PRIMARY KEY (partkey, score, id)
) with clustering order by (score desc);


5. 코드가 cassandra dependent client library에 의존적이 된다.  (little bit)

Spring 개발자라면 nosql 사용시  XXTemplate 이런것을 기대하는데, hector정도만 있는 것 같다. 

나는 netflix의 astyanax를 사용했기 때문에 spring integration이 고급스럽지 않았다. 대신 통신 코딩하는 느낌으로 dao를 구현해야 했다. (사실 가장 깔끔한 코드가 astyanax이기 때문이다.)


그리고 spring 쪽에서는 cassandra 쪽은 표준화하지 못했다는 점이 있다. 

JPA도 아직 완벽한게 아니니. 충분히 테스트하는 것도 좋을 것 같다. 





Posted by '김용환'
,


참조 자료. 


http://www.datastax.com/docs/1.1/cluster_architecture/hadoop_integration


http://www.datastax.com/dev/blog/bulk-loading

Posted by '김용환'
,

cassandra 운영 툴 중에 ops center, dev center가 있다.


dev center는 cql query tool이다. sqlyog 같은 류이다.  ops center는 모니터링 및 매니징 tool이다. 

이 툴을 이용하면 로컬에서 모니터링 및 개발이 쉽게 도와주는 tool이라 할 수 있다. 




관련 license는 다음과 같다. 



http://www.datastax.com/download/clientdrivers


By downloading and using DevCenter, you agree that your access to and use of DevCenter is governed by the terms applicable to Licensed Software under a “No-Fee” or “Trial” license under the DataStax End User General Terms provided, however that you may use the Licensed Software in production, not just for non-production evaluation purposes.



http://www.datastax.com/terms

2. Trial License

Subject to this Agreement, and as long as you have no Licensed Software deployed in production environments, DataStax grants you a royalty-free license to use the Licensed Software for an unlimited number of Nodes, only for internal, non-production, evaluation purposes (a “Trial License”). DataStax reserves the right, in its sole discretion, to terminate any Trial License. All Trial Licenses terminate without notice when you move any Licensed Software into production. Please note that under a Trial License DataStax provides the Licensed Software to you free of charge, and on that basis, to the fullest extent permitted by law, DataStax provides it “as-is,” and without any warranty or support.

Posted by '김용환'
,