Cassandra-unit 으로는 Astyanax 테스트가 어렵다. 

http://knight76.tistory.com/entry/cassandra-transportmessagesErrorMessage-Unexpected-exception-during-request-astyanaxthriftThriftConverter-Read-a-negative-frame-size-2113929216



그래서 Astyanax 에서 제공하는 SingletonEmbeddedCassandra를 이용하여 테스트를 진행하였다. 이를 이용하면 따로 Cassandra 서버를 설치하지 않고도 Cassandra 를 테스트할 수 있다. cql뿐 아니라 column family 단위로 쉽게 테스트가 가능하다. 


테스트 예제를 공개한다. 



<AstyanaxCassandraSpringTest.java>

package org.cassandraunit.test.astyanax;


import org.hamcrest.MatcherAssert;

import org.hamcrest.Matchers;

import org.junit.AfterClass;

import org.junit.Before;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


import com.google.common.collect.ImmutableMap;

import com.netflix.astyanax.AstyanaxContext;

import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.NodeDiscoveryType;

import com.netflix.astyanax.connectionpool.OperationResult;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;

import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;

import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;

import com.netflix.astyanax.model.ColumnFamily;

import com.netflix.astyanax.model.CqlResult;

import com.netflix.astyanax.serializers.LongSerializer;

import com.netflix.astyanax.serializers.StringSerializer;

import com.netflix.astyanax.thrift.ThriftFamilyFactory;

import com.netflix.astyanax.util.SingletonEmbeddedCassandra;


@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = { "classpath:applicationContext-cassandra-astyanax.xml" })

public class AstyanaxCassandraSpringTest {


private static Keyspace                  keyspace;

private static AstyanaxContext<Keyspace> keyspaceContext;


private static String TEST_CLUSTER_NAME  = "Test Cluster";

private static String TEST_KEYSPACE_NAME = "os";

private static final String SEEDS = "localhost:9160";


private static ColumnFamily<String, String> TABLE = 

            ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get(), LongSerializer.get());

            

    @Autowired

StorageDao storageDao;

    

    @Before

    public void before() throws Exception {

SingletonEmbeddedCassandra.getInstance();

Thread.sleep(1000 * 3);

createKeyspace();

createTable();

Thread.sleep(1000 * 3);

    }

    

private void createTable() throws Exception {

OperationResult<CqlResult<String, String>> result;

result = keyspace

   .prepareQuery(TABLE)

   .withCql("create table cpu_util_pct (guid text PRIMARY KEY, pct double) ")

   .execute();

}

@AfterClass

public static void teardown() throws Exception {

if (keyspaceContext != null)

keyspaceContext.shutdown();


Thread.sleep(1000 * 10);

}


private  void createKeyspace() throws Exception {

keyspaceContext = new AstyanaxContext.Builder()

.forCluster(TEST_CLUSTER_NAME)

.forKeyspace(TEST_KEYSPACE_NAME)

.withAstyanaxConfiguration(

new AstyanaxConfigurationImpl()

.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)

.setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE))

.withConnectionPoolConfiguration(

new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME

+ "_" + TEST_KEYSPACE_NAME)

.setSocketTimeout(30000)

.setMaxTimeoutWhenExhausted(2000)

.setMaxConnsPerHost(20)

.setInitConnsPerHost(10)

.setSeeds(SEEDS))

.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())

.buildKeyspace(ThriftFamilyFactory.getInstance());


keyspaceContext.start();


keyspace = keyspaceContext.getEntity();


keyspace.createKeyspace(ImmutableMap.<String, Object>builder()

.put("strategy_options", ImmutableMap.<String, Object>builder()

.put("replication_factor", "1")

.build())

.put("strategy_class",     "SimpleStrategy")

.build()

);

}

  

@org.junit.Test

public void insertTest() {

storageDao.insertCpu("a", 0);

double d1 = storageDao.getCpuUtil("a");

MatcherAssert.assertThat(d1, Matchers.is((double) 0));

storageDao.insertCpu("a", 2);

double d2 = storageDao.getCpuUtil("a");

MatcherAssert.assertThat(d2, Matchers.is((double) 2));

storageDao.insertCpu("b", 10);

double d3 = storageDao.getCpuUtil("b");

MatcherAssert.assertThat(d3, Matchers.is((double) 10));

}

}




<AstyanaxClient.java>

package org.cassandraunit.test.astyanax;


import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


import com.netflix.astyanax.AstyanaxContext;

import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.NodeDiscoveryType;

import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;

import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;

import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;

import com.netflix.astyanax.thrift.ThriftFamilyFactory;


public class AstyanaxClient {


private static final Logger log = LoggerFactory.getLogger(AstyanaxClient.class);


private AstyanaxContext<Keyspace> context;

private Keyspace keyspace;

private String clusterName;

private String keyspaceName;

private String cqlVersion;

private String targetCassandraVersion;

private String connectionPoolName;

private int port;

private int maxConnsPerHost;

private String seeds;

private NodeDiscoveryType discoveryType;


@PostConstruct

public void init() {

context = new AstyanaxContext.Builder()

.forCluster(clusterName)

.forKeyspace(keyspaceName)

.withAstyanaxConfiguration(

new AstyanaxConfigurationImpl()

.setDiscoveryType(discoveryType)

.setCqlVersion(cqlVersion)

.setTargetCassandraVersion(

targetCassandraVersion))

.withConnectionPoolConfiguration(

new ConnectionPoolConfigurationImpl(connectionPoolName)

.setPort(port)

.setMaxConnsPerHost(maxConnsPerHost)

.setSeeds(seeds)

.setTimeoutWindow(3000)

.setConnectTimeout(3000)

)

.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())

.buildKeyspace(ThriftFamilyFactory.getInstance());


context.start();

System.out.println(context.toString());

keyspace = context.getClient();

}


@PreDestroy

public void destroy() {

context.shutdown();

}


public String toString() {

return new StringBuffer().append("clusterName : ")

.append(getClusterName()).append("\r\n")

.append("keyspaceName : ").append(getKeyspaceName())

.append("\r\n").append("cqlVersion : ").append(getCqlVersion())

.append("\r\n").append("targetCassandraVersion : ")

.append(getTargetCassandraVersion()).append("\r\n")

.append("connectionPoolName : ")

.append(getConnectionPoolName()).append("\r\n")

.append("port : ").append(getPort()).append("\r\n")

.append("maxConnsPerHost : ").append(getMaxConnsPerHost())

.append("\r\n").append("seeds : ").append(getSeeds())

.append("\r\n").append("discoveryType : ")

.append(getDiscoveryType()).toString();


}


public Keyspace getKeyspace() {

return this.keyspace;

}


public String getClusterName() {

return clusterName;

}


public void setClusterName(String clusterName) {

this.clusterName = clusterName;

}


public String getKeyspaceName() {

return keyspaceName;

}


public void setKeyspaceName(String keyspaceName) {

this.keyspaceName = keyspaceName;

}


public NodeDiscoveryType getDiscoveryType() {

return discoveryType;

}


public void setDiscoveryType(NodeDiscoveryType discoveryType) {

this.discoveryType = discoveryType;

}


public String getCqlVersion() {

return cqlVersion;

}


public void setCqlVersion(String cqlVersion) {

this.cqlVersion = cqlVersion;

}


public String getTargetCassandraVersion() {

return targetCassandraVersion;

}


public void setTargetCassandraVersion(String targetCassandraVersion) {

this.targetCassandraVersion = targetCassandraVersion;

}


public String getConnectionPoolName() {

return connectionPoolName;

}


public void setConnectionPoolName(String connectionPoolName) {

this.connectionPoolName = connectionPoolName;

}


public int getPort() {

return port;

}


public void setPort(int port) {

this.port = port;

}


public int getMaxConnsPerHost() {

return maxConnsPerHost;

}


public void setMaxConnsPerHost(int maxConnsPerHost) {

this.maxConnsPerHost = maxConnsPerHost;

}


public String getSeeds() {

return seeds;

}


public void setSeeds(String seeds) {

this.seeds = seeds;

}

}




<StorageDao.java>

package org.cassandraunit.test.astyanax;


import java.util.Comparator;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.stereotype.Component;


import com.netflix.astyanax.Keyspace;

import com.netflix.astyanax.connectionpool.OperationResult;

import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;

import com.netflix.astyanax.model.ColumnFamily;

import com.netflix.astyanax.model.ColumnList;

import com.netflix.astyanax.model.CqlResult;

import com.netflix.astyanax.model.Row;

import com.netflix.astyanax.serializers.IntegerSerializer;

import com.netflix.astyanax.serializers.StringSerializer;


@Component

public class StorageDao {

private static final Logger log = LoggerFactory.getLogger(StorageDao.class);

@Autowired

@Qualifier("osClient")

private AstyanaxClient osClient;

public void insertCpu(String guid, double pct) {

Keyspace keyspace = osClient.getKeyspace();

ColumnFamily<String, String> COLUMN_FAMILY = 

ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get());

try {

String statement = String.format("insert into cpu_util_pct (guid, pct) values (?, ?);");

log.debug(statement);

keyspace.prepareQuery(COLUMN_FAMILY)

.withCql(statement)

.asPreparedStatement()

.withStringValue(guid)

.withDoubleValue(pct)

.execute();

} catch (Exception e) {

e.printStackTrace();

throw new RuntimeException("failed to insert.", e);

}

}


public double getCpuUtil(String guid) {

Keyspace keyspace = osClient.getKeyspace();

ColumnFamily<Integer, String> COLUMN_FAMILY = ColumnFamily.newColumnFamily("cpu_util_pct", IntegerSerializer.get(), StringSerializer.get());


double cpuPct = 0;

try {

OperationResult<CqlResult<Integer, String>> result = keyspace.prepareQuery(COLUMN_FAMILY)

.withCql("SELECT pct FROM cpu_util_pct WHERE guid = '" + guid + "'")

.execute();

for (Row<Integer, String> row : result.getResult().getRows()) {

ColumnList<String> cols = row.getColumns();

cpuPct = cols.getDoubleValue("pct", 0.0D);

}

} catch (ConnectionException e) {

e.printStackTrace();

throw new RuntimeException("failed to read from cql", e);

}

return cpuPct;

}

class IntTreeMapComparator implements Comparator<Long> {

@Override

public int compare(Long o1, Long o2) {

       return  o1.compareTo(o2);

   } 

}


}



<applicationContext-cassandra-astyanax.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<context:component-scan base-package="org.cassandraunit.test.astyanax"/>


<!-- properties -->

<context:property-placeholder order="1"

ignore-unresolvable="true" location="classpath:cassandra.properties" />

<bean id="storageDao" class="org.cassandraunit.test.astyanax.StorageDao"/>

<bean id="osClient" class="org.cassandraunit.test.astyanax.AstyanaxClient">

<property name="clusterName" value="${system.cassandra.cluster.name}" />

<property name="discoveryType" value="${system.cassandra.discovery.type.token}" />

<property name="cqlVersion" value="${system.cassandra.cql.version}" />

<property name="targetCassandraVersion" value="${system.cassandra.target.version}" />

<property name="port" value="${system.cassandra.port}" />

<property name="maxConnsPerHost" value="${system.cassandra.max.host}" />

<property name="seeds" value="${system.cassandra.seeds}" />

<property name="connectionPoolName" value="${system.cassandra.connection.pool.name}" />

<property name="keyspaceName" value="${system.cassandra.os.keyspace.name}" />

</bean>



</beans>


클라이언트 설정 정보 - Astyanax 에서 이용

<cassandra.properties>

# system cassandra DB Info

system.cassandra.connection.pool.name=cassandraPool

system.cassandra.port=9160

system.cassandra.max.host=1

system.cassandra.seeds=127.0.0.1:9160

system.cassandra.cql.version=3.0.0

system.cassandra.target.version=2.0.2

system.cassandra.cluster.name=Test Cluster


system.cassandra.discovery.type.ring=RING_DESCRIBE

system.cassandra.discovery.type.discovery=DISCOVERY_SERVICE

system.cassandra.discovery.type.token=TOKEN_AWARE

system.cassandra.discovery.type.none=NONE


system.cassandra.os.keyspace.name=os

 

서버 설정 정보.- SingletonEmbeddedCassandra 에서 사용

<cassandra-template.yaml>

cluster_name: '$CLUSTER$'

initial_token: 0

auto_bootstrap: false

hinted_handoff_enabled: false

max_hint_window_in_ms: 3600000 # one hour

authenticator: org.apache.cassandra.auth.AllowAllAuthenticator

#authority: org.apache.cassandra.auth.AllowAllAuthority

partitioner: org.apache.cassandra.dht.RandomPartitioner

#partitioner: org.apache.cassandra.dht.Murmur3Partitioner

data_file_directories:

    - $DIR$/data

commitlog_directory: $DIR$/commitlog

saved_caches_directory: $DIR$/saved_caches

commitlog_sync: periodic

commitlog_sync_period_in_ms: 10000

seed_provider:

    # Addresses of hosts that are deemed contact points. 

    # Cassandra nodes use this list of hosts to find each other and learn

    # the topology of the ring.  You must change this if you are running

    # multiple nodes!

    - class_name: org.apache.cassandra.locator.SimpleSeedProvider

      parameters:

          # seeds is actually a comma-delimited list of addresses.

          - seeds: "127.0.0.1"

#flush_largest_memtables_at: 0.75

#reduce_cache_sizes_at: 0.85

#reduce_cache_capacity_to: 0.6

concurrent_reads: 32

concurrent_writes: 32

memtable_flush_queue_size: 4

storage_port: $STORAGE_PORT$

listen_address: 127.0.0.1

rpc_address: 127.0.0.1

rpc_port: $PORT$

rpc_keepalive: true

thrift_framed_transport_size_in_mb: 15

thrift_max_message_length_in_mb: 16

incremental_backups: false

snapshot_before_compaction: false

column_index_size_in_kb: 64

in_memory_compaction_limit_in_mb: 64

compaction_throughput_mb_per_sec: 16

compaction_preheat_key_cache: true

endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch

dynamic_snitch: true

dynamic_snitch_update_interval_in_ms: 100 

dynamic_snitch_reset_interval_in_ms: 600000

dynamic_snitch_badness_threshold: 0.0

request_scheduler: org.apache.cassandra.scheduler.NoScheduler

index_interval: 128

 

 

<pom.xml>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.8.2</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.cassandraunit</groupId>

<artifactId>cassandra-unit-spring</artifactId>

<version>2.0.2.2-SNAPSHOT</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>com.netflix.astyanax</groupId>

<artifactId>astyanax</artifactId>

<version>1.56.44</version>

<exclusions>

<exclusion>

<artifactId>snappy-java</artifactId>

<groupId>org.xerial.snappy</groupId>

</exclusion>

</exclusions>


</dependency>

<dependency>

<groupId>org.apache.cassandra</groupId>

<artifactId>cassandra-all</artifactId>

<version>2.0.7</version>

<exclusions>

<exclusion>

<artifactId>snappy-java</artifactId>

<groupId>org.xerial.snappy</groupId>

</exclusion>

</exclusions>


</dependency>

<dependency>

<groupId>org.xerial.snappy</groupId>

<artifactId>snappy-java</artifactId>

<version>1.0.5-M4</version>

</dependency>

<dependency>

<groupId>org.jboss.netty</groupId>

<artifactId>netty</artifactId>

<version>3.1.0.GA</version>

</dependency>

            


Posted by 김용환 '김용환'