카산드라는 복제본을 동기화 상태로 유지하기 위해 여러 메커니즘을 사용한다. 데이터 복구 작업은 일반적으로 세 가지 범주로 나뉜다.


* Synchronous read repair : 데이터를 읽으면서 여러 개의 복제본을 비교할 때, 카산드라는 다른 노드에 체크섬(checksum)을 요청한다. 체크섬이 일치하지 않으면 전체 복제본에 요청을 보내 로컬 버전과 비교한다. 최신 타임 스탬프의 데이터가 반환되고 모든 복제된에 해당 데이터를 복제되도록 전달된다. 요청을 받을 때 오래된 데이터가 복구된다.(일명 read repair라 불린다)


* Asynchronous read repair : 카산드라는 읽기 작업 중에 나머지 복제본도 복구할 수 있게 한다. 카산드라의 각 테이블에는 read_repair_chance (기본 값은 0.1, 즉 10%) 설정이 있다. 설정은 데이터를 읽을 때 비교되지 않는 복제본을 얼마나 처리할지를 결정한다. 


consistency level이 one이면 비교할 대상이 없기 때문에 read repair가 발생되지 않는다. 그리고 quorum이면 모든 노드가 아니라 질의를 받은 노드만 복구된다.


* nodetool repair 직접 수행 : 전체적으로 복구를 수행한다. 최소한 테이블 스키마에 설정된 gc_grace_seconds(기본 10일)안에 한 번 실행해야 하지만, 운영 노하우가 필요하다. 


카산드라의 모든 수정(변경 내용)은 immutable인데 반해 delete는 클라이언트에 주지 말라는 마커(tombstone)만 있다. 따라서 compaction이 호출될 때에만 실제 삭제된다. 그리고 gc가 발생한다.





<운영 노하우>


데이터 consistency가 낮아 있는 cassandra에 node repair를 하다가 gc가 늘어나 서버가 hang이 되지 않도록 운영 작업을 고민했다.


cassandra에 read/write를 모두 quorum(java driver에서 consistency level의 기본값은 one이다)으로 바꿔 consistency를 높였고(이전에는 되어 있지 않았다) 자연스럽게 read operation 시 asynchronous read repair가 발생되도록 해놨다.


기본 consistency level를 결정한다. 


     QueryOptions queryOptions = new QueryOptions();

queryOptions.setFetchSize(QueryOptions.DEFAULT_FETCH_SIZE);

queryOptions.setConsistencyLevel(ConsistencyLevel.QUORUM);


private Cluster cluster;

..

cluster = Cluster.builder().addContactPoints(contactPoints).

withQueryOptions(queryOptions).

withPoolingOptions(poolingOptions).

withSocketOptions(socketOptions).build();





테이블의 성격에 따라 consistency level을 정할 수 있다. 이것은 테이블의 성격이 가장 우선시 된다. 테이블 성능과 cql 쿼리, compaction간의 고민이 필요하다. 


Statement statement = QueryBuilder.select().from(CASSANDRA_KEY_SPACE, CASSANDRA_TABLE_ACTIVITY_ACTION_COUNT)

.where(QueryBuilder.eq("profile_id", actorId)).and(QueryBuilder.eq("guid", id))

.setConsistencyLevel(ConsistencyLevel.ONE);





특정 테이블에서는 consisteny level을 quorum으로 했더니 timeout이 발생했다.


데이터가 크고 secondary index도 쓰고.. timeout이 발생하기 때문에 consisteny level을 one으로 수정했다(성능 이슈)


 com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed



테이블의 성격에 따라 다르게 node repair를 실행하는 것이 좋다.  



Posted by '김용환'
,



참고 

http://knight76.tistory.com/entry/cassandra-cassandra-%EC%84%9C%EB%B2%84%EB%A5%BC-%EC%95%88%EC%A0%84%ED%95%98%EA%B2%8C-%EC%9E%AC%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0



카산드라가 노드 장애시에도 fault tolerance을 유지하는 주요 방법 중 하나는 hinted handoff 메커니즘이다. cassandra.yaml에서 hinted_handoff_enabled이 true로 설정되어 있으면(기본 설정) 복제 노드 중 하나에 도달 할 수 없을 때 카산드라는 coordinator 노드에 hint를 저장한다.


해당 힌트는 클러스터에 속한 위치에 대한 정보를 포함한다. coordinator가 복제 노드가 다시 정상화되는 것을 알아차리게 되면 hint 정보를 복제 노드에서 replay시킨다. 


기본적으로 Cassandra는 hint 대기열이 너무 길어지는 것을 방지하기 위해 최대 3시간 동안 hint를 저장한다. cassandra.yaml의 max_hint_window_in_ms 속성(기본 3시간)이 해당 hint가 저장되는 시간을 의미한다. 


max_hin_window_in_ms 시간이 지나면 일관성을 복원하기 위해 node repair를 실행해야 한다. 

Posted by '김용환'
,



1. where 절의 in

in을 사용할 때는 많은 키의 정보를 한 번에 읽어야 할 때, 최대한 작게 사용하는 것이 좋다. 

read를 QUORUM으로 설정했다면 최대 2개 노드에 요청하기 때문에, 100개의 키를 사용한다면, 최대 200번을 호출해서 결과를 얻어올 것이다.



2. Paging (limit)

데이터가 어느 정도 많아지면 limit을 써서 pagination을 하는 것이 성능에 문제 없다.  

데이터가 많아지면 timeout이 발생할 수 있다. 개인적으로는 백 만개의 row도 20개 씩 읽으면 문제가 없다. 



3. secondary index

secondary index는 데이터가 클러스터내에서 동기화되지 않고 노드에만 저장되어 있다. 따라서 만약 secondary index만 가지고 select문을 사용한다면 해당 노드에 요청해 scan all을 하게 된다. 따라서 반드시 써야 한다면 값의 범위가 광범위하지 않는 것이 좋다(빨리 검색될 수 있는 컬럼이어야 한다)

데이터가 적을 때나 쓸만한 index이다. 대용량에서는 쓰면 안된다. 


secondary index가 없으면 allow filtering를 사용해야 하는데..(사실 이런 쿼리를 만들었다면 다시 만들어야 한다. 잘못 만든 모델링이다) 성능 이슈가 발생한다. 



이런 문제로 cassandra 3.4부터 SSTable Attached Secondary Index (SASI)가 추가되었다. 인덱스가 별도 테이블이 아니라 SSTable에 존재하도록 하여 조금 좋아지게 했다고 한다. 

https://docs.datastax.com/en/cql/3.3/cql/cql_using/useSASIIndex.html


CREATE INDEX d_index ON crizin(d) USING 'org.apache.cassandra.index.sasi.SASIIndex';



4. 잦은 삭제

작은 삭제는 tombstone과 gc를 유발시킬 수 있다. 

데이터가 삭제되면 gc_grace_seconds(기본 값: 864000)로 되어 있어서 10일간은 그대로 남아 있고, tombstone이라고 하지만 사실 데이터가 있다. 따라서, 검색(select)할 때 tombstone도 스캔한다. 따라서 작은 삭제가 일어나는 테이블은 gc_grace_seconds를 수정할 필요가 있을 수도 있다. 



5. strong consistency 욕심

http://docs.datastax.com/en/archived/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html


strong consistency를 주기 위해 write할 때 consistency를 all로 주고 read할 때 consistency를 one으로 줄 수 있다.

단점은 노드가 하나라도 문제가 있어서 데몬이 내려가 있다(node failure)면, UnavailableException 예외가 발생할 것이다. QUORUM 또는 LOCAL_QUORUM이 나을 수 있다.


strong consistency를 높이려면 replication factor보다는 read consistency level과 write consistency level을 잘 잡는게 중요하다(말은 이렇게 하지만, 실제 상용 서비스에 적용할 때는 가장 어려운 것 같다..트레이드 오프와 node repair에 대한 고민은 늘 존재한다)



6. 데이터 복구 (data repairing)

데이터가 별로 사용하지 않는 곳, 서비스에 영향을 주지 않는 곳에서는 nodetool repair 를 사용해도 큰 이슈는 없지만, 대용량에서는 어마어마한 일이 벌어질 수 있다. 공부를 많이 하고 효과적인 전략을 세워야 한다. 

(cassandra summit 슬라이드 보면 관련 내용이 많다. )


(블로그에 내용을 따로 작성하고 있다. 즉, 운영을 어느 정도 해보고 자신있을 때 진행하는 게 좋다. 잘못하면 서비스 장애 가 발생할 수 있다.)



7. 여러 클러스터로 사용

하나의 클러스터에 이런 저런 용도의 테이블을 저장하기 보다는 여러 클러스터로 나누어 테이블을 저장하는 것이 좋다. compaction은 side effect가 존재할 수 있다. Hbase처럼 최대한 클러스터를 분리시키는 것이 훨씬 낫다.

Posted by '김용환'
,


카산드라에는 여러 key 개념이 있다.



CQL을 이용할 때 일반 Database를 사용하 듯 table을 생성할 때 다음처럼 키를 생성할 수 있다. 


primary key(col1, col2, col3)


* primary key 

DB의 pk와 비슷하다. row를 유일무이하게 해주는 key를 의미한다. 1개 이상의 키가 필요하다. 


* composite(compound) key

primary key가 2개 이상이면 composite key라 부른다.


* partition key

partition key는 primary key의 1번째 key(예시에서는 col1)를 의미한다. 저장소 row key로 직접 변환하고 해시 알고리즘에 따라 클러스터에 저장(분배)된다. 대부분의 질의는 partition key를 제공해서 카산드라는 요청된 데이터가 어느 노드에 있는지 알게 된다. 


* clustering key

primary key의 1번째 key외 나머지 key를 clustering key(또는 clustering column)라 한다. 해당 key는 디스크에 데이터 순서를 안다. 하지만 어느 노드에 저장될지는 결정하지 않는다. 

순서 관련해서 오름차순, 내림차순으로 변경할 수 있다. 


with clustering order by (col2 desc) 




실제로는 '값:값'이 아닌 'col1의 값:col2의컬럼 이름'의 조합으로 되어 있다. 따라서 2.x까지는 중복이 발생할 수 있지만, 3.x부터는 문제가 없도록 되어 있다고 한다. 


실제 예시를 소개한다.


$cqlsh 

cqlsh:google> 

CREATE TABLE story.crizin (

    a int,

    b int,

    c int,

    d int,

    type int,

    PRIMARY KEY (a, b, c)


cqlsh:google> insert into google.crizin(a,b,c,d,type) values(1,1,1,1,1);

cqlsh:google> insert into google.crizin(a,b,c,d,type) values(2,2,2,2,2);



$ cassandra-cli -h IP -port 9160

[default@google] list crizin

... ;

Using default limit of 100

Using default cell limit of 100

-------------------

RowKey: 1

=> (name=1:1:, value=, timestamp=1484644014043867)

=> (name=1:1:d, value=00000001, timestamp=1484644014043867)

=> (name=1:1:type, value=00000001, timestamp=1484644014043867)

-------------------

RowKey: 2

=> (name=2:2:, value=, timestamp=1484644020194477)

=> (name=2:2:d, value=00000002, timestamp=1484644020194477)

=> (name=2:2:type, value=00000002, timestamp=1484644020194477)


2 Rows Returned.


cassandra data layer에서는 콜론 단위로 저장됨을 확인할 수 있다. 




지금까지 하나의 partition key에 여러 clustering 컬럼 조합을 살펴봤다. partition key를 multi로 둘 수 있다. 


primary key((col1, col2), col3)



* composite partition key

composite partition key는 다수의 컬럼을 partition key로 둔다. 


data layer에서는 row key를 'col1의 값:col2의 값'으로 변경한다. 


$cqlsh 

cqlsh:google> 


CREATE TABLE story.crizin1 (

    a int,

    b int,

    c int,

    d int,

    type int,

    PRIMARY KEY ((a, b), c)


cqlsh> insert into story.crizin1(a,b,c,d,type) values(2,2,2,2,2);

cqlsh> insert into story.crizin1(a,b,c,d,type) values(1,1,1,1,1);




$ cassandra-cli -h IP -port 9160

[default@story] use story;

Authenticated to keyspace: story

[default@story] list story.crizin1;

 list crizin1;

Using default limit of 100

Using default cell limit of 100

-------------------

RowKey: 2:2

=> (name=2:, value=, timestamp=1484645817202697)

=> (name=2:d, value=00000002, timestamp=1484645817202697)

=> (name=2:type, value=00000002, timestamp=1484645817202697)

-------------------

RowKey: 1:1

=> (name=1:, value=, timestamp=1484645825186628)

=> (name=1:d, value=00000001, timestamp=1484645825186628)

=> (name=1:type, value=00000001, timestamp=1484645825186628)


2 Rows Returned.

Elapsed time: 58 msec(s).



key 말고 secondary index가 존재한다. index를 구축할 수 있지만, 


CREATE INDEX d_index ON crizin(d);


성능 이슈가 존재한다는 커다란 단점이 있다. (자세한 내용은 다음 블로그에서 설명한다)







Posted by '김용환'
,


카산드라 장비가 클러스터링이 되어 있다면, nodetool status로 서로 연동되어 있음을 확인할 수 있다. 


$ nodetool status

Datacenter: datacenter1

=======================

Status=Up/Down

|/ State=Normal/Leaving/Joining/Moving

--  Address       Load       Tokens  Owns    Host ID                               Rack

UN  1.1.1.1  77.1 GB    256     ?       e9fdd401-e83e-428e-b848-5f51602145e6  rack1

UN  1.1.1.2  75.07 GB   256     ?       ca478c2c-5fec-4ee8-9f3f-37c33cc761d0  rack1

UN  1.1.1.3  73.75 GB   256     ?       b3d60757-ef63-4847-b6c5-4ee4289e9a27  rack1




데이터는 어떻게 저장되어 있을까?


카산드라에서 nodetool ring 커맨드를 실행하면, 카산드라의 consistent hashing을 통해 여러 노드 address에 저장될 수 있도록 구성되어 있음을 확인할 수 있다. 



$ nodetool ring 

Datacenter: datacenter

==========

Address       Rack        Status State   Load            Owns                Token

                                                                                           9220917267562992196

1.1.1.1  rack1       Up     Normal  75.52 GB        ?                   -9221498249641296905

1.1.1.2  rack1       Up     Normal  77.02 GB        ?                   -9200410371552807557

1.1.1.3  rack1       Up     Normal  77.17 GB        ?                   -9177126049054932799

...

1.1.1.1  rack1       Up     Normal  77.17 GB        ?                   9220917267562992196


커맨드 결과를 살펴보면 토큰이 점점 작은 값(큰 음수)에서 큰 값(큰 양수)로 이어지는지 볼 수 있다. 


같은 클러스터의 다른 장비에서 nodetool ring 커맨드를 사용하면 동일한 결과가 보인다. 클러스터 내의 데이터가 저장되는 가상 단위라 말할 수 있다.



내용을 자세히 설명한다.


consistent hashing을 소개하면, 버켓(bucket)은 미리 정해진 범위(range)로 되어 있다. 카산드라 노드는 범위를 할당하고 다음처럼 계산된다. 

범위(range)의 시작을 토큰(token)이라 부른다. 카산드라 1.2까지는 토큰이라 불렀는데, 2.0부터는 vnode라고 한다. 


그리고 CQL을 통해 데이터를 저장할 때, row key는 token을 기반으로 각 노드에 분산 저장된다. 



<그림 참조 : https://academy.datastax.com >


범위는 토큰 값의 구분 값이 들어가 있다. 

range start : 토큰 값

range end : 다음 토큰 값 -1




카산드라에서 consistent hashing에 쓰이는 기본 알고리즘은 Murmur3Patitioner이다. 

https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archPartitionerM3P.html

https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html


Murmur3Partitioner(m3p)는 RandomPartitioner보다 빠른 해싱과 성능이 좋다. 

(Paritioner가 여러 존재하지만, ByteOrderedPartitioner를 사용할 때는 hot spot 이슈가 생길 수 있다. 많은 이들은 



또한 cql에서 paging을 사용할 수 있다. (setFetchSize와 setPagingState를 java driver에서 지원한다)

https://docs.datastax.com/en/cql/3.1/cql/cql_reference/paging.html

http://docs.datastax.com/en/developer/java-driver/3.1/manual/paging/ 




카산드라는 키를 읽고 저장할 때 어느 범위에 있는지 알기 위해 모든 클러스터의 카산드라는 동일한 해시 함수를 사용한다. 즉 모든 노드는 클러스내의 모든 범위를 알아야 하고 자기 노드의 범위 뿐 아니라 다른 노드의 범위를 알고 있다. 


따라서, 요청을 받은 노드를 coordinator라 한다. 카산드라 키가 요청받은 노드(coordinator)에 속하지 않으면 범위에 맞는 노드로 요청을 보낸다. 



vnode는 토큰을 상위 개념이지만, 토큰을 아예 안쓰는 것은 아니다. 최신 버전(2.0 이상)은 특별히 토큰 관리를 하지 않지만, cassandra.yml에 initial_token이라는 옵션을 사용해 각 카산드라에서 나름 토큰의 범위를 지정할 수 있었다. 반대로 생각해보면 cassandra 관리를 각 카산드라 서버별로 작업을 진행해야했기에 관리가 좀 불편했다.. 


cassandra문서에 따르면 현재 initial_token은 disable이다. initial_token이 true이고 하나의 token_num이 1개이면 완전 single node가 된다. 하나의 개발 장비로 구성할 때는 이 방법이 좋을 수 있을 것이다. 

https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html





각 노드에 하나의 토큰만 할당하기 보다 num_tokens(기본 값 256)을 사용해 늘릴 수 있다. 바로 vnode이다. vnode는 데이터를 분산시키기 위해 존재한다. 





만약 10대의 장비가 있고 num_tokens가 256이면 전체 vnode 개수는 2560이 될 것이다. 실제 개수는 정확히 나온다. 


$ nodetool ring | grep Normal | wc -l

2560


num_tokens를 256으로 사용한다면, increment repair하는데 도움이 된다고 한다(직접 해본 적은 없다)


vnode는 카산드라의 rebuild작업을 수동으로 할때 편리하다. 부하가 더 많은 노드에 분산되기 때문에 재시작에 유리할 수 있다. 게다가 rebuild할 때 하나의 복제본을 사용할 수 있다. 따라서 안정성이 더 확보된다. 






Posted by '김용환'
,




cassandra 로그를 설정할 때 두 가지 방법이 있다.



1) logback.xml 설정 변경


/etc/cassandra/conf/logback.xml 파일에서 특정 패키지의 로그 수준을 변경한 후, cassandra를 시작시킨다. 



<configuration scan="true">

  <jmxConfigurator />

  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">

    <file>${cassandra.logdir}/system.log</file>

    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

      <fileNamePattern>${cassandra.logdir}/system.log.%i.zip</fileNamePattern>

      <minIndex>1</minIndex>

      <maxIndex>20</maxIndex>

    </rollingPolicy>


    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

      <maxFileSize>20MB</maxFileSize>

    </triggeringPolicy>

    <encoder>

      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>

      <!-- old-style log format

      <pattern>%5level [%thread] %date{ISO8601} %F (line %L) %msg%n</pattern>

      -->

    </encoder>

  </appender>


  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">

    <encoder>

      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>

    </encoder>

  </appender>


  <root level="INFO">

    <appender-ref ref="FILE" />

    <appender-ref ref="STDOUT" />

  </root>


  <logger name="com.thinkaurelius.thrift" level="ERROR"/>

</configuration>




2) 동적 로그 수준 변경


logback 파일로 진행하면 다음과 같이  nodetool을 실행하면 동적으로 변경할 수 있다. 


$ nodetool setlogginglevel org.apache.cassandra.db DEBUG


Posted by '김용환'
,


nodetool에 disablebinary라는 것이 존재한다. 


이는 native binary 프로토콜을 사용 중이라고 하고 opscenter, agent와 같은 probe의 연결을 해지한다. 


아래 cassandra 코드를 보면 jmx 연결을 끊는 것으로서, 애플리케이션과 카산드라간의 영향은 없는 듯 하다. 


<참고 코드>


https://github.com/apache/cassandra/blob/cassandra-3.11/src/java/org/apache/cassandra/tools/nodetool/DisableBinary.java



https://github.com/apache/cassandra/blob/cassandra-3.11/src/java/org/apache/cassandra/tools/NodeProbe.java




Posted by '김용환'
,



cassandra 장비를 재시작할 때 다음 커맨드를 실행한다.




cassandra의 내부 통신은 gossip 프로토콜(p2p)을 사용한다. 해당 프로토콜을 이용해 서로의 상태를 확인하고 노드를 알수 있게 된다. disablegossip을 하면 클러스터에서 끊어지는 효과를 얻게 된다. 마치 그 장비는 없는 셈이 된다. (재시작된 서버가 클러스터에 빠져서 클러스터로 돌아왔을 때, consistency를 맞추기 위한 시간을 줄이기 위한 추가적인 기능인 Hinted handoff도 disable되지 않는다.) 


opscenter로 확인하면 클러스터에서 disablegossip 커맨드를 실행한 서버가 클러스터에서 빠져 있는 것을 볼 수 있다.


$ nodetool disablegossip



클러스터 상황이 내부적으로 정리되면, 이번에는 클러스터에 붙는 애플리케이션의 연결을 사용하지 않도록 한다. disablethrift는 cassandra data-stax client에서 해당 클러스터의 연결이 진행되지 못하도록 한다.


$ nodetool disablethrift


리눅스 내부적으로 tcpdump/netstat으로 확인해보면, disablethift를 실행할 때 기존의 connection은 끊기고 새로운 connection이 맺어지고 더 이상 데이터는 전달되지 않는 가짜 connection이 생긴다. (ping과 fin/ack 정도만 있다)



필요에 따라서는 opscenter와 agent와 같은 일부 probe툴과의 통신을 끊을 수 있다.


$ nodetool disablebinary



다음에는 drain 커맨드를 실행시켜 완전한 connection을 중단한다. 

애플리케이션과 중지된 cassandra간의 connection은 모두 끊어진다. 


$ nodetool drain



이제 cassandra 데몬을 종료한다.


$ kill -9 $cassandra_pid




cassandra 재시작 또는


$ reboot


작업을 진행한다...





cassandra 데몬을 실행한다. 



$ CASSANDRA_CONF=/etc/cassandra/conf cassandra


제대로 실행 중인지 로그를 확인한다. 참고로 부하와 cpu가 일시적으로 올라갈 수 있다. 


$ tail -f /var/log/cassandra/system.log




opscenter를 위한 모니터링 툴을 실행한다.


sudo service datastax-agent start



그리고, ntp 데몬이 정상적인지 확인한다. 동기(sync) 시간이 카산드라 클ㄹ스터에 중요한 부분을 차지하기 때문에 ntp 데몬이 정상인지 확인해야 한다. 그리고 불의의 사고에서도 동작되도록 chkconfig를 확인한다. 



$ ntpq -p

     remote           refid      st t when poll reach   delay   offset  jitter

==============================================================================

..



$  sudo chkconfig --list | grep ntpd

ntpd           0:해제 1:해제 2:활성 3:활성 4:활성 5:활성 6:해제





이제 opscenter에서 클러스터에 제대로 붙어 있는지 확인할 수 있다. 


그리고, tcpdump를 사용해서 특정 소켓에 대해서 정상적으로 패킷이 왔다갔다 하는지 확인한다. 





* 참고 


재시작을 할 상황에서는 복구를 구성할 수 있는 정보가 3시간(디폴트)만 저장된다. 따라서 3시간이 넘어가면 복구할 수 없게 된다. 즉 복구 관련된 힌트 정보를 모두 삭제한다. 따라서 이를 위해 nodetool repaire를 실행해 클러스터의 정보를 복제하도록 해야 한다. 관련 복제 관련된 정보가 바로 max_hint_window_in_ms이다. 



http://docs.datastax.com/en/archived/cassandra/2.0/cassandra/configuration/configCassandra_yaml_r.html



max_hint_window_in_ms
(Default: 10800000 - 3 hours) Defines how long in milliseconds to generate and save hints for an unresponsive node. After this interval, new hints are no longer generated until the node is back up and responsive. If the node goes down again, a new interval begins. This setting can prevent a sudden demand for resources when a node is brought back online and the rest of the cluster attempts to replay a large volume of hinted writes.




'cassandra' 카테고리의 다른 글

[cassandra] 로그 설정 팁  (0) 2017.01.11
[cassandra] nodetool disablebinary  (0) 2017.01.11
[cassadra] compaction 전략  (0) 2016.12.09
[cassandra] select count(*) 구하기  (0) 2016.12.07
[cassandra] read repair  (0) 2016.11.23
Posted by '김용환'
,



cassandra에서 alter table로 compaction 전략을 변경할 수 있다. 



ALTER TABLE table WITH compaction = { 'class' :  'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'  }


사실 처음부터 compaction을 설정하려면 ALTER TABLE 없이 CREATE TABLE에 with compaction을 두어도 된다.

CREATE TABLE story.test1 (

  block_id uuid,

  species text,

  alias text,

  population varint,

  PRIMARY KEY (block_id)

) WITH compression =

    { 'sstable_compression' : 'DeflateCompressor', 'chunk_length_kb' : 64 }

  AND compaction =

    { 'class' : 'LeveledCompactionStrategy'};




카산드라 2.0.11 이상의 버전을 사용할 때 compaction 정책을 정할 때, 다음 규칙을 이용하는 것이 좋다. 


- 쓰기가 많은 작업에는 SizeTiredCompationStrategy을 사용한다.

- 읽기가 많은 작업에는 LeveledCompactionStrategy를 사용한다.

- 시계열과 데이터 만료 작업에는 DateTieredCompactionStrategy를 사용한다. 


https://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_write_path_c.html#concept_ds_wt3_32w_zj__logging-writes-and-memtable-storage



You can configure these types of compaction to run periodically: SizeTieredCompactionStrategy, DateTieredCompactionStrategy (Cassandra 2.0.11), and LeveledCompactionStrategy.


SizeTieredCompactionStrategy is designed for write-intensive workloads, DateTieredCompactionStrategy for time-series and expiring data, and LeveledCompactionStrategy for read-intensive workloads. You can manually start compaction using the nodetool compact command.





compaction 전략에 대한 설명은 다음을 참고한다. 


http://www.datastax.com/dev/blog/when-to-use-leveled-compaction



https://docs.datastax.com/en/cassandra/2.1/cassandra/operations/ops_configure_compaction_t.html



http://www.datastax.com/dev/blog/when-to-use-leveled-compaction



http://www.datastax.com/dev/blog/datetieredcompactionstrategy



https://labs.spotify.com/2014/12/18/date-tiered-compaction/







cassandra의 compaction설정은 테이블 단위라면, hbase의 compaction 설정은 클러스터 단위이다.


http://knight76.tistory.com/entry/%EA%B3%B5%EB%B6%80-Hbase-compaction





'cassandra' 카테고리의 다른 글

[cassandra] nodetool disablebinary  (0) 2017.01.11
[cassandra] cassandra 서버를 안전하게 재시작하기  (0) 2017.01.09
[cassandra] select count(*) 구하기  (0) 2016.12.07
[cassandra] read repair  (0) 2016.11.23
[cassandra] cqlsh 팁  (0) 2016.11.21
Posted by '김용환'
,


cassandra에서 테이블의 row 개수를 구하려면, 다음과 같은 cql을 사용할 수 있다. 


select count(*) from table



하지만, 대용량 데이터가 존재한다면, timeout이 발생한다.


이를 위해 timeout를 설정할 수 있지만, 성능 이슈가 발생할 수 있으니..


cqlsh --request-timeout="60"이라고 지정할 수 있다.



https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlsh.html


--request-timeout="timeout" CQL request timeout in seconds; default: 10



하지만, 문제는 성능 이슈이다..


(서비스에서 매번 카운트를 불러 읽는 경우라면 따로 cassandra counter를 이용해 구현하는 것이 좋다. hbase나 cassandra에 카운트 계산을 매번 호출하는 것은 위험한 작업이다!!)



대용량 데이터의 row 개수를 구할 수 있는 또 다른 방법은 nodetool을 이용하는 것이다.

nodetool cfstat를 사용해서 테이블의 Number of keys(estimate)를 확인하면 대략적인 내용을 확인할 수 있다. 



$./nodetool cfstat

Table (index): table
Number of keys (estimate): 184251
..


Table: table

Number of keys (estimate): 538971 




추정치 값은 아래 cassandra 코드를 따라 들어가 확인할 수 있다. 


데이터 스트림 크기를 기반으로 hyperloglog 계산을 이용한 추정치이기 때문에 신뢰할만하다.





https://github.com/apache/cassandra/blob/42e0fc5ee221950875d93b4cd007d4f5bcaa4244/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java


                Object estimatedPartitionCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "EstimatedPartitionCount");

                if (Long.valueOf(-1L).equals(estimatedPartitionCount))

                {

                    estimatedPartitionCount = 0L;

                }

                statsTable.numberOfKeysEstimate = estimatedPartitionCount;








https://github.com/apache/cassandra/blob/979af884ee4ecef78a21c4bd58992d053256f8f0/src/java/org/apache/cassandra/tools/NodeProbe.java


    /**

     * Retrieve ColumnFamily metrics

     * @param ks Keyspace for which stats are to be displayed or null for the global value

     * @param cf ColumnFamily for which stats are to be displayed or null for the keyspace value (if ks supplied)

     * @param metricName View {@link TableMetrics}.

     */

    public Object getColumnFamilyMetric(String ks, String cf, String metricName)

    {

        try

        {

            ObjectName oName = null;

            if (!Strings.isNullOrEmpty(ks) && !Strings.isNullOrEmpty(cf))

            {

                String type = cf.contains(".") ? "IndexTable" : "Table";

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=%s,keyspace=%s,scope=%s,name=%s", type, ks, cf, metricName));

            }

            else if (!Strings.isNullOrEmpty(ks))

            {

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=Keyspace,keyspace=%s,name=%s", ks, metricName));

            }

            else

            {

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=Table,name=%s", metricName));

            }

            switch(metricName)

            {

                case "BloomFilterDiskSpaceUsed":

                case "BloomFilterFalsePositives":

                case "BloomFilterFalseRatio":

                case "BloomFilterOffHeapMemoryUsed":

                case "IndexSummaryOffHeapMemoryUsed":

                case "CompressionMetadataOffHeapMemoryUsed":

                case "CompressionRatio":

                case "EstimatedColumnCountHistogram":

                case "EstimatedPartitionSizeHistogram":

                case "EstimatedPartitionCount":

                case "KeyCacheHitRate":

                case "LiveSSTableCount":

                case "MaxPartitionSize":

                case "MeanPartitionSize":

                case "MemtableColumnsCount":

                case "MemtableLiveDataSize":

                case "MemtableOffHeapSize":

                case "MinPartitionSize":

                case "PercentRepaired":

                case "RecentBloomFilterFalsePositives":

                case "RecentBloomFilterFalseRatio":

                case "SnapshotsSize":

                    return JMX.newMBeanProxy(mbeanServerConn, oName, CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();







https://github.com/apache/cassandra/blob/81f6c784ce967fadb6ed7f58de1328e713eaf53c/src/java/org/apache/cassandra/metrics/TableMetrics.java



public class TableMetrics

{



    /** Approximate number of keys in table. */

    public final Gauge<Long> estimatedPartitionCount;



        estimatedPartitionCount = Metrics.register(factory.createMetricName("EstimatedPartitionCount"),

                                                   aliasFactory.createMetricName("EstimatedRowCount"),

                                                   new Gauge<Long>()

                                                   {

                                                       public Long getValue()

                                                       {

                                                           long memtablePartitions = 0;

                                                           for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())

                                                               memtablePartitions += memtable.partitionCount();

                                                           return SSTableReader.getApproximateKeyCount(cfs.getSSTables(SSTableSet.CANONICAL)) + memtablePartitions;

                                                       }

                                                   });






https://github.com/apache/cassandra/blob/4a2464192e9e69457f5a5ecf26c094f9298bf069/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java


    /**

     * Calculate approximate key count.

     * If cardinality estimator is available on all given sstables, then this method use them to estimate

     * key count.

     * If not, then this uses index summaries.

     *

     * @param sstables SSTables to calculate key count

     * @return estimated key count

     */

    public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)

    {

        long count = -1;


        if (Iterables.isEmpty(sstables))

            return count;


        boolean failed = false;

        ICardinality cardinality = null;

        for (SSTableReader sstable : sstables)

        {

            if (sstable.openReason == OpenReason.EARLY)

                continue;


            try

            {

                CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);

                // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index

                // summary. (CASSANDRA-10676)

                if (metadata == null)

                {

                    logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());

                    failed = true;

                    break;

                }


                if (cardinality == null)

                    cardinality = metadata.cardinalityEstimator;

                else

                    cardinality = cardinality.merge(metadata.cardinalityEstimator);

            }

            catch (IOException e)

            {

                logger.warn("Reading cardinality from Statistics.db failed.", e);

                failed = true;

                break;

            }

            catch (CardinalityMergeException e)

            {

                logger.warn("Cardinality merge failed.", e);

                failed = true;

                break;

            }

        }

        if (cardinality != null && !failed)

            count = cardinality.cardinality();


        // if something went wrong above or cardinality is not available, calculate using index summary

        if (count < 0)

        {

            for (SSTableReader sstable : sstables)

                count += sstable.estimatedKeys();

        }

        return count;

    }





https://github.com/apache/cassandra/blob/4a2464192e9e69457f5a5ecf26c094f9298bf069/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java



/**

 * Compaction related SSTable metadata.

 *

 * Only loaded for <b>compacting</b> SSTables at the time of compaction.

 */

public class CompactionMetadata extends MetadataComponent

{

    public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();


    public final ICardinality cardinalityEstimator;


    public CompactionMetadata(ICardinality cardinalityEstimator)

    {

        this.cardinalityEstimator = cardinalityEstimator;

    }


.,...

   public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>

    {

        public int serializedSize(Version version, CompactionMetadata component) throws IOException

        {

            int sz = 0;

            byte[] serializedCardinality = component.cardinalityEstimator.getBytes();

            return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz;

        }


        public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException

        {

            ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);

        }


        public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException

        {

            ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt()));

            return new CompactionMetadata(cardinality);

        }

    }






Posted by '김용환'
,