cassandra에서 테이블의 row 개수를 구하려면, 다음과 같은 cql을 사용할 수 있다. 


select count(*) from table



하지만, 대용량 데이터가 존재한다면, timeout이 발생한다.


이를 위해 timeout를 설정할 수 있지만, 성능 이슈가 발생할 수 있으니..


cqlsh --request-timeout="60"이라고 지정할 수 있다.



https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlsh.html


--request-timeout="timeout" CQL request timeout in seconds; default: 10



하지만, 문제는 성능 이슈이다..


(서비스에서 매번 카운트를 불러 읽는 경우라면 따로 cassandra counter를 이용해 구현하는 것이 좋다. hbase나 cassandra에 카운트 계산을 매번 호출하는 것은 위험한 작업이다!!)



대용량 데이터의 row 개수를 구할 수 있는 또 다른 방법은 nodetool을 이용하는 것이다.

nodetool cfstat를 사용해서 테이블의 Number of keys(estimate)를 확인하면 대략적인 내용을 확인할 수 있다. 



$./nodetool cfstat

Table (index): table
Number of keys (estimate): 184251
..


Table: table

Number of keys (estimate): 538971 




추정치 값은 아래 cassandra 코드를 따라 들어가 확인할 수 있다. 


데이터 스트림 크기를 기반으로 hyperloglog 계산을 이용한 추정치이기 때문에 신뢰할만하다.





https://github.com/apache/cassandra/blob/42e0fc5ee221950875d93b4cd007d4f5bcaa4244/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java


                Object estimatedPartitionCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "EstimatedPartitionCount");

                if (Long.valueOf(-1L).equals(estimatedPartitionCount))

                {

                    estimatedPartitionCount = 0L;

                }

                statsTable.numberOfKeysEstimate = estimatedPartitionCount;








https://github.com/apache/cassandra/blob/979af884ee4ecef78a21c4bd58992d053256f8f0/src/java/org/apache/cassandra/tools/NodeProbe.java


    /**

     * Retrieve ColumnFamily metrics

     * @param ks Keyspace for which stats are to be displayed or null for the global value

     * @param cf ColumnFamily for which stats are to be displayed or null for the keyspace value (if ks supplied)

     * @param metricName View {@link TableMetrics}.

     */

    public Object getColumnFamilyMetric(String ks, String cf, String metricName)

    {

        try

        {

            ObjectName oName = null;

            if (!Strings.isNullOrEmpty(ks) && !Strings.isNullOrEmpty(cf))

            {

                String type = cf.contains(".") ? "IndexTable" : "Table";

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=%s,keyspace=%s,scope=%s,name=%s", type, ks, cf, metricName));

            }

            else if (!Strings.isNullOrEmpty(ks))

            {

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=Keyspace,keyspace=%s,name=%s", ks, metricName));

            }

            else

            {

                oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=Table,name=%s", metricName));

            }

            switch(metricName)

            {

                case "BloomFilterDiskSpaceUsed":

                case "BloomFilterFalsePositives":

                case "BloomFilterFalseRatio":

                case "BloomFilterOffHeapMemoryUsed":

                case "IndexSummaryOffHeapMemoryUsed":

                case "CompressionMetadataOffHeapMemoryUsed":

                case "CompressionRatio":

                case "EstimatedColumnCountHistogram":

                case "EstimatedPartitionSizeHistogram":

                case "EstimatedPartitionCount":

                case "KeyCacheHitRate":

                case "LiveSSTableCount":

                case "MaxPartitionSize":

                case "MeanPartitionSize":

                case "MemtableColumnsCount":

                case "MemtableLiveDataSize":

                case "MemtableOffHeapSize":

                case "MinPartitionSize":

                case "PercentRepaired":

                case "RecentBloomFilterFalsePositives":

                case "RecentBloomFilterFalseRatio":

                case "SnapshotsSize":

                    return JMX.newMBeanProxy(mbeanServerConn, oName, CassandraMetricsRegistry.JmxGaugeMBean.class).getValue();







https://github.com/apache/cassandra/blob/81f6c784ce967fadb6ed7f58de1328e713eaf53c/src/java/org/apache/cassandra/metrics/TableMetrics.java



public class TableMetrics

{



    /** Approximate number of keys in table. */

    public final Gauge<Long> estimatedPartitionCount;



        estimatedPartitionCount = Metrics.register(factory.createMetricName("EstimatedPartitionCount"),

                                                   aliasFactory.createMetricName("EstimatedRowCount"),

                                                   new Gauge<Long>()

                                                   {

                                                       public Long getValue()

                                                       {

                                                           long memtablePartitions = 0;

                                                           for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())

                                                               memtablePartitions += memtable.partitionCount();

                                                           return SSTableReader.getApproximateKeyCount(cfs.getSSTables(SSTableSet.CANONICAL)) + memtablePartitions;

                                                       }

                                                   });






https://github.com/apache/cassandra/blob/4a2464192e9e69457f5a5ecf26c094f9298bf069/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java


    /**

     * Calculate approximate key count.

     * If cardinality estimator is available on all given sstables, then this method use them to estimate

     * key count.

     * If not, then this uses index summaries.

     *

     * @param sstables SSTables to calculate key count

     * @return estimated key count

     */

    public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)

    {

        long count = -1;


        if (Iterables.isEmpty(sstables))

            return count;


        boolean failed = false;

        ICardinality cardinality = null;

        for (SSTableReader sstable : sstables)

        {

            if (sstable.openReason == OpenReason.EARLY)

                continue;


            try

            {

                CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);

                // If we can't load the CompactionMetadata, we are forced to estimate the keys using the index

                // summary. (CASSANDRA-10676)

                if (metadata == null)

                {

                    logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());

                    failed = true;

                    break;

                }


                if (cardinality == null)

                    cardinality = metadata.cardinalityEstimator;

                else

                    cardinality = cardinality.merge(metadata.cardinalityEstimator);

            }

            catch (IOException e)

            {

                logger.warn("Reading cardinality from Statistics.db failed.", e);

                failed = true;

                break;

            }

            catch (CardinalityMergeException e)

            {

                logger.warn("Cardinality merge failed.", e);

                failed = true;

                break;

            }

        }

        if (cardinality != null && !failed)

            count = cardinality.cardinality();


        // if something went wrong above or cardinality is not available, calculate using index summary

        if (count < 0)

        {

            for (SSTableReader sstable : sstables)

                count += sstable.estimatedKeys();

        }

        return count;

    }





https://github.com/apache/cassandra/blob/4a2464192e9e69457f5a5ecf26c094f9298bf069/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java



/**

 * Compaction related SSTable metadata.

 *

 * Only loaded for <b>compacting</b> SSTables at the time of compaction.

 */

public class CompactionMetadata extends MetadataComponent

{

    public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();


    public final ICardinality cardinalityEstimator;


    public CompactionMetadata(ICardinality cardinalityEstimator)

    {

        this.cardinalityEstimator = cardinalityEstimator;

    }


.,...

   public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>

    {

        public int serializedSize(Version version, CompactionMetadata component) throws IOException

        {

            int sz = 0;

            byte[] serializedCardinality = component.cardinalityEstimator.getBytes();

            return TypeSizes.sizeof(serializedCardinality.length) + serializedCardinality.length + sz;

        }


        public void serialize(Version version, CompactionMetadata component, DataOutputPlus out) throws IOException

        {

            ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);

        }


        public CompactionMetadata deserialize(Version version, DataInputPlus in) throws IOException

        {

            ICardinality cardinality = HyperLogLogPlus.Builder.build(ByteBufferUtil.readBytes(in, in.readInt()));

            return new CompactionMetadata(cardinality);

        }

    }






Posted by '김용환'
,