Cassandra-unit 으로는 Astyanax 테스트가 어렵다.
그래서 Astyanax 에서 제공하는 SingletonEmbeddedCassandra를 이용하여 테스트를 진행하였다. 이를 이용하면 따로 Cassandra 서버를 설치하지 않고도 Cassandra 를 테스트할 수 있다. cql뿐 아니라 column family 단위로 쉽게 테스트가 가능하다.
테스트 예제를 공개한다.
<AstyanaxCassandraSpringTest.java>
package org.cassandraunit.test.astyanax;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.google.common.collect.ImmutableMap;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.netflix.astyanax.util.SingletonEmbeddedCassandra;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext-cassandra-astyanax.xml" })
public class AstyanaxCassandraSpringTest {
private static Keyspace keyspace;
private static AstyanaxContext<Keyspace> keyspaceContext;
private static String TEST_CLUSTER_NAME = "Test Cluster";
private static String TEST_KEYSPACE_NAME = "os";
private static final String SEEDS = "localhost:9160";
private static ColumnFamily<String, String> TABLE =
ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get(), LongSerializer.get());
@Autowired
StorageDao storageDao;
@Before
public void before() throws Exception {
SingletonEmbeddedCassandra.getInstance();
Thread.sleep(1000 * 3);
createKeyspace();
createTable();
Thread.sleep(1000 * 3);
}
private void createTable() throws Exception {
OperationResult<CqlResult<String, String>> result;
result = keyspace
.prepareQuery(TABLE)
.withCql("create table cpu_util_pct (guid text PRIMARY KEY, pct double) ")
.execute();
}
@AfterClass
public static void teardown() throws Exception {
if (keyspaceContext != null)
keyspaceContext.shutdown();
Thread.sleep(1000 * 10);
}
private void createKeyspace() throws Exception {
keyspaceContext = new AstyanaxContext.Builder()
.forCluster(TEST_CLUSTER_NAME)
.forKeyspace(TEST_KEYSPACE_NAME)
.withAstyanaxConfiguration(
new AstyanaxConfigurationImpl()
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
.setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME
+ "_" + TEST_KEYSPACE_NAME)
.setSocketTimeout(30000)
.setMaxTimeoutWhenExhausted(2000)
.setMaxConnsPerHost(20)
.setInitConnsPerHost(10)
.setSeeds(SEEDS))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
keyspaceContext.start();
keyspace = keyspaceContext.getEntity();
keyspace.createKeyspace(ImmutableMap.<String, Object>builder()
.put("strategy_options", ImmutableMap.<String, Object>builder()
.put("replication_factor", "1")
.build())
.put("strategy_class", "SimpleStrategy")
.build()
);
}
@org.junit.Test
public void insertTest() {
storageDao.insertCpu("a", 0);
double d1 = storageDao.getCpuUtil("a");
MatcherAssert.assertThat(d1, Matchers.is((double) 0));
storageDao.insertCpu("a", 2);
double d2 = storageDao.getCpuUtil("a");
MatcherAssert.assertThat(d2, Matchers.is((double) 2));
storageDao.insertCpu("b", 10);
double d3 = storageDao.getCpuUtil("b");
MatcherAssert.assertThat(d3, Matchers.is((double) 10));
}
}
<AstyanaxClient.java>
package org.cassandraunit.test.astyanax;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
public class AstyanaxClient {
private static final Logger log = LoggerFactory.getLogger(AstyanaxClient.class);
private AstyanaxContext<Keyspace> context;
private Keyspace keyspace;
private String clusterName;
private String keyspaceName;
private String cqlVersion;
private String targetCassandraVersion;
private String connectionPoolName;
private int port;
private int maxConnsPerHost;
private String seeds;
private NodeDiscoveryType discoveryType;
@PostConstruct
public void init() {
context = new AstyanaxContext.Builder()
.forCluster(clusterName)
.forKeyspace(keyspaceName)
.withAstyanaxConfiguration(
new AstyanaxConfigurationImpl()
.setDiscoveryType(discoveryType)
.setCqlVersion(cqlVersion)
.setTargetCassandraVersion(
targetCassandraVersion))
.withConnectionPoolConfiguration(
new ConnectionPoolConfigurationImpl(connectionPoolName)
.setPort(port)
.setMaxConnsPerHost(maxConnsPerHost)
.setSeeds(seeds)
.setTimeoutWindow(3000)
.setConnectTimeout(3000)
)
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
context.start();
System.out.println(context.toString());
keyspace = context.getClient();
}
@PreDestroy
public void destroy() {
context.shutdown();
}
public String toString() {
return new StringBuffer().append("clusterName : ")
.append(getClusterName()).append("\r\n")
.append("keyspaceName : ").append(getKeyspaceName())
.append("\r\n").append("cqlVersion : ").append(getCqlVersion())
.append("\r\n").append("targetCassandraVersion : ")
.append(getTargetCassandraVersion()).append("\r\n")
.append("connectionPoolName : ")
.append(getConnectionPoolName()).append("\r\n")
.append("port : ").append(getPort()).append("\r\n")
.append("maxConnsPerHost : ").append(getMaxConnsPerHost())
.append("\r\n").append("seeds : ").append(getSeeds())
.append("\r\n").append("discoveryType : ")
.append(getDiscoveryType()).toString();
}
public Keyspace getKeyspace() {
return this.keyspace;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public String getKeyspaceName() {
return keyspaceName;
}
public void setKeyspaceName(String keyspaceName) {
this.keyspaceName = keyspaceName;
}
public NodeDiscoveryType getDiscoveryType() {
return discoveryType;
}
public void setDiscoveryType(NodeDiscoveryType discoveryType) {
this.discoveryType = discoveryType;
}
public String getCqlVersion() {
return cqlVersion;
}
public void setCqlVersion(String cqlVersion) {
this.cqlVersion = cqlVersion;
}
public String getTargetCassandraVersion() {
return targetCassandraVersion;
}
public void setTargetCassandraVersion(String targetCassandraVersion) {
this.targetCassandraVersion = targetCassandraVersion;
}
public String getConnectionPoolName() {
return connectionPoolName;
}
public void setConnectionPoolName(String connectionPoolName) {
this.connectionPoolName = connectionPoolName;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getMaxConnsPerHost() {
return maxConnsPerHost;
}
public void setMaxConnsPerHost(int maxConnsPerHost) {
this.maxConnsPerHost = maxConnsPerHost;
}
public String getSeeds() {
return seeds;
}
public void setSeeds(String seeds) {
this.seeds = seeds;
}
}
<StorageDao.java>
package org.cassandraunit.test.astyanax;
import java.util.Comparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.serializers.IntegerSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@Component
public class StorageDao {
private static final Logger log = LoggerFactory.getLogger(StorageDao.class);
@Autowired
@Qualifier("osClient")
private AstyanaxClient osClient;
public void insertCpu(String guid, double pct) {
Keyspace keyspace = osClient.getKeyspace();
ColumnFamily<String, String> COLUMN_FAMILY =
ColumnFamily.newColumnFamily("cpu_util_pct", StringSerializer.get(), StringSerializer.get());
try {
String statement = String.format("insert into cpu_util_pct (guid, pct) values (?, ?);");
log.debug(statement);
keyspace.prepareQuery(COLUMN_FAMILY)
.withCql(statement)
.asPreparedStatement()
.withStringValue(guid)
.withDoubleValue(pct)
.execute();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("failed to insert.", e);
}
}
public double getCpuUtil(String guid) {
Keyspace keyspace = osClient.getKeyspace();
ColumnFamily<Integer, String> COLUMN_FAMILY = ColumnFamily.newColumnFamily("cpu_util_pct", IntegerSerializer.get(), StringSerializer.get());
double cpuPct = 0;
try {
OperationResult<CqlResult<Integer, String>> result = keyspace.prepareQuery(COLUMN_FAMILY)
.withCql("SELECT pct FROM cpu_util_pct WHERE guid = '" + guid + "'")
.execute();
for (Row<Integer, String> row : result.getResult().getRows()) {
ColumnList<String> cols = row.getColumns();
cpuPct = cols.getDoubleValue("pct", 0.0D);
}
} catch (ConnectionException e) {
e.printStackTrace();
throw new RuntimeException("failed to read from cql", e);
}
return cpuPct;
}
class IntTreeMapComparator implements Comparator<Long> {
@Override
public int compare(Long o1, Long o2) {
return o1.compareTo(o2);
}
}
}
<applicationContext-cassandra-astyanax.xml>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="org.cassandraunit.test.astyanax"/>
<!-- properties -->
<context:property-placeholder order="1"
ignore-unresolvable="true" location="classpath:cassandra.properties" />
<bean id="storageDao" class="org.cassandraunit.test.astyanax.StorageDao"/>
<bean id="osClient" class="org.cassandraunit.test.astyanax.AstyanaxClient">
<property name="clusterName" value="${system.cassandra.cluster.name}" />
<property name="discoveryType" value="${system.cassandra.discovery.type.token}" />
<property name="cqlVersion" value="${system.cassandra.cql.version}" />
<property name="targetCassandraVersion" value="${system.cassandra.target.version}" />
<property name="port" value="${system.cassandra.port}" />
<property name="maxConnsPerHost" value="${system.cassandra.max.host}" />
<property name="seeds" value="${system.cassandra.seeds}" />
<property name="connectionPoolName" value="${system.cassandra.connection.pool.name}" />
<property name="keyspaceName" value="${system.cassandra.os.keyspace.name}" />
</bean>
</beans>
클라이언트 설정 정보 - Astyanax 에서 이용
<cassandra.properties>
# system cassandra DB Info
system.cassandra.connection.pool.name=cassandraPool
system.cassandra.port=9160
system.cassandra.max.host=1
system.cassandra.seeds=127.0.0.1:9160
system.cassandra.cql.version=3.0.0
system.cassandra.target.version=2.0.2
system.cassandra.cluster.name=Test Cluster
system.cassandra.discovery.type.ring=RING_DESCRIBE
system.cassandra.discovery.type.discovery=DISCOVERY_SERVICE
system.cassandra.discovery.type.token=TOKEN_AWARE
system.cassandra.discovery.type.none=NONE
system.cassandra.os.keyspace.name=os
서버 설정 정보.- SingletonEmbeddedCassandra 에서 사용
<cassandra-template.yaml>
cluster_name: '$CLUSTER$'
initial_token: 0
auto_bootstrap: false
hinted_handoff_enabled: false
max_hint_window_in_ms: 3600000 # one hour
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
#authority: org.apache.cassandra.auth.AllowAllAuthority
partitioner: org.apache.cassandra.dht.RandomPartitioner
#partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- $DIR$/data
commitlog_directory: $DIR$/commitlog
saved_caches_directory: $DIR$/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
seed_provider:
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
# multiple nodes!
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
- seeds: "127.0.0.1"
#flush_largest_memtables_at: 0.75
#reduce_cache_sizes_at: 0.85
#reduce_cache_capacity_to: 0.6
concurrent_reads: 32
concurrent_writes: 32
memtable_flush_queue_size: 4
storage_port: $STORAGE_PORT$
listen_address: 127.0.0.1
rpc_address: 127.0.0.1
rpc_port: $PORT$
rpc_keepalive: true
thrift_framed_transport_size_in_mb: 15
thrift_max_message_length_in_mb: 16
incremental_backups: false
snapshot_before_compaction: false
column_index_size_in_kb: 64
in_memory_compaction_limit_in_mb: 64
compaction_throughput_mb_per_sec: 16
compaction_preheat_key_cache: true
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
dynamic_snitch: true
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.0
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
index_interval: 128
<pom.xml>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit-spring</artifactId>
<version>2.0.2.2-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.netflix.astyanax</groupId>
<artifactId>astyanax</artifactId>
<version>1.56.44</version>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.0.7</version>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.5-M4</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.1.0.GA</version>
</dependency>