Hycz's Blog

Life is a game. Why so serious?

Cassandra 0.8.0流程分析(1)——CassandraDaemon启动流程

一、入口

入口方法是CassandraDaemon中的main方法,里面只有一个CassandraDaemon().activate()的调用。

    public static void main(String[] args)
    {
        new CassandraDaemon().activate();
    }

这里调用的是AbstractCassandraDaemon中的方法。Cassandra虽然是用java写的,但是确实如我从前看到的一篇文章所说的那样,很有C的风格。具体来说,类中的static变量比比皆是,还有就是代码中常用static{}代码块,这个就是用来在第一次调用到这个类时一定会执行到的部分,于是,这就成了C语言中的面向过程编程了。而且由于步进时是不会自动在这些地方停留的,于是,必须手动在这个代码块中设置断点,否则总是直接执行完了,对于我们这种废柴读代码者来说很是难过,不过习惯了也就好了。

二、setup

继续刚才的过程,AbstractCassandraDaemon首先启动了log4j的相关功能,设置了定期检查配置更新。然后正式进入到activate()方法中。

这个方法做了2件事,也就是2步,第一步是初始化daemon,第二步是启动daemon。

初始化的工作其实是调用AbstractCassandraDaemon中的setup()方法。这其实是个非常繁复的过程。注意下面的类的初始化基本都包括了log4j的Logger注册,和Mbean的注册,所以就不单独写了。

1、设置logger

2、检查CLibrary

3、DatabaseDescriptor初始化

全名org.apache.cassandra.config.DatabaseDescriptor。很类似,这个类的初始化也是放在了static中,说的笼统一点,这里就是把各种配置文件读取了,然后存放在相应的变量中。但是如果真的要分的话,还是有2个部分:

第1部分是读取cassadnra.yaml,将其中的配置参数导入到各个变量中,整个过程中还涉及到对参数值的合法性检查,根据设置的类名启动相应的类或建立相应的对象等等。

第2部分是建立各种system tables,这些系统表的配置是直接写在代码中的(hardcode),而不是从配置中读取。具体来说,就是建立了一个名为system的keyspace的MetaData,然后其中包括以下的columnFamily:

  • StatusCf:persistent metadata for the local node,本节点的持久化元数据
  • HintsCf:hinted handoff data,便签式提交数据
  • MigrationsCf:individual schema mutations,单个(?)schema改变
  • SchemaCf:current state of the schema,当前schema状态
  • IndexCf:indexes that have been completed,已完成索引
  • NodeIdCf:nodeId and their metadata,节点ID和相应元数据

相关的代码如下:

            // Hardcoded system tables
            KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
                                                   LocalStrategy.class,
                                                   KSMetaData.optsWithRF(1),
                                                   CFMetaData.StatusCf,
                                                   CFMetaData.HintsCf,
                                                   CFMetaData.MigrationsCf,
                                                   CFMetaData.SchemaCf,
                                                   CFMetaData.IndexCf,
                                                   CFMetaData.NodeIdCf);
            CFMetaData.map(CFMetaData.StatusCf);
            CFMetaData.map(CFMetaData.HintsCf);
            CFMetaData.map(CFMetaData.MigrationsCf);
            CFMetaData.map(CFMetaData.SchemaCf);
            CFMetaData.map(CFMetaData.IndexCf);
            CFMetaData.map(CFMetaData.NodeIdCf);
            tables.put(Table.SYSTEM_TABLE, systemMeta);

从这里也可以看出一个keyspace的元数据需要包括哪些内容(名,副本策略,副本策略参数,ColumnFamily的元数据)。注意这里仅仅是建立了相关的元数据,并没有真正生成相应的keyspace的对象。

这个类的初始化很重要,是整个daemon启动的基础。

4、StorageService初始化

这个类的全名是org.apache.cassandra.service.StorageService,实际上是在DatabaseDescriptor初始化的过程中调用到了这个类的方法, 于是在第一次调用时初始化了。StorageService的初始化包括3个部分。

第一个部分定义了一系列的谓词(VERBS)和谓词阶段(verbStages),如下:

/* All verb handler identifiers */
    public enum Verb
    {
        MUTATION,
        BINARY,
        READ_REPAIR,
        READ,
        REQUEST_RESPONSE, // client-initiated reads and writes
        STREAM_INITIATE, // Deprecated
        STREAM_INITIATE_DONE, // Deprecated
        STREAM_REPLY,
        STREAM_REQUEST,
        RANGE_SLICE,
        BOOTSTRAP_TOKEN,
        TREE_REQUEST,
        TREE_RESPONSE,
        JOIN, // Deprecated
        GOSSIP_DIGEST_SYN,
        GOSSIP_DIGEST_ACK,
        GOSSIP_DIGEST_ACK2,
        DEFINITIONS_ANNOUNCE,
        DEFINITIONS_UPDATE_RESPONSE,
        TRUNCATE,
        SCHEMA_CHECK,
        INDEX_SCAN,
        REPLICATION_FINISHED,
        INTERNAL_RESPONSE, // responses to internal calls
        COUNTER_MUTATION,
        // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
        UNUSED_1,
        UNUSED_2,
        UNUSED_3,
        ;
        // remember to add new verbs at the end, since we serialize by ordinal
    }
    public static final Verb[] VERBS = Verb.values();

    public static final EnumMap<StorageService.Verb, Stage> verbStages = new EnumMap<StorageService.Verb, Stage>(StorageService.Verb.class)
    {{
        put(Verb.MUTATION, Stage.MUTATION);
        put(Verb.BINARY, Stage.MUTATION);
        put(Verb.READ_REPAIR, Stage.MUTATION);
        put(Verb.READ, Stage.READ);
        put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
        put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've just copied old behavior here
        put(Verb.STREAM_REQUEST, Stage.STREAM);
        put(Verb.RANGE_SLICE, Stage.READ);
        put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
        put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
        put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
        put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
        put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
        put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
        put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
        put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
        put(Verb.TRUNCATE, Stage.MUTATION);
        put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
        put(Verb.INDEX_SCAN, Stage.READ);
        put(Verb.REPLICATION_FINISHED, Stage.MISC);
        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
        put(Verb.COUNTER_MUTATION, Stage.MUTATION);
        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
        put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
        put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
    }};

第二个部分是建立了2个线程池,一个是用来处理短任务,一个是用来处理非周期性长任务,如下:

    /**
     * This pool is used for periodic short (sub-second) tasks.
     */
     public static final RetryingScheduledThreadPoolExecutor scheduledTasks = new RetryingScheduledThreadPoolExecutor("ScheduledTasks");

    /**
     * This pool is used by tasks that can have longer execution times, and usually are non periodic.
     */
    public static final RetryingScheduledThreadPoolExecutor tasks = new RetryingScheduledThreadPoolExecutor("NonPeriodicTasks");

第三个部分是实例化了一个标记为public static final的StorageService对象。在实例化的过程中,首先注册了谓词处理(register the verb handlers),然后又初始化了一个org.apache.cassandra.streaming.StreamingService(是的,层层调用。。。),不过这个类的初始化仅仅是做了实例化,实例化的过程中除了注册Mbean什么都没做。。。

5、获取网络地址和端口(address and port)

6、ColumnFamilyStore初始化

同样的,这个类的初始化是在第一次调用到这个类时完成的,过程是完成所有static的定义和static块中代码的执行。

带有static的定义有3个,分别是

  • flushSorter :一个用来执行flush任务中的排序阶段的线程池,由于是CPU密集型(CPU-bound),所以会根据处理器数量的线程。
  • flushWriter :在排序之后,由此线程池进行写磁盘,由于是磁盘密集型(disk-bound),所以可以和flushSourter同时运作。注意,flushSorter和flushWriter处理的是Memtable和BinaryMemtable的flush,对于BinaryMemtable的flush,这两个线程池已经足够了,这两个线程池都是private的
  • postFlushExecutor : 不同于上面两个线程池,这个是用来处理live Memtable的flush。live Memtable的flush更复杂一些,需要switchMemtable做额外的2件事(这里只会由switchMemtable去调用submitFlush):第一,将这个Memtable放到memtablesPendingFlush中,直到flush完成,并且它已经被转为SSTableReader,加到了ssTable_中;第二,等到flush完成后,在commitLogUpdater中加入一项标记,markCompacted,调用onMemtableFlush。这允许在多核系统中多个flush同时进行,并且以正确的顺序调用onMemtableFlush,正确的顺序对于replay很重要,否就就要restart,因为当onMemtableFlush被调用时,是假设在给定位置之上的内容都已经被固化到SSTable中了。注意到这个线程池是public的,所以是会被别的类调用的,在这个类中有2处调用,一处是添加已flush的标记到commit log的header中,一处是添加已compact的标记到commitlogUpdater中。

static块中,则从StorageService.tasks线程池中建立一个线程,设置延迟1秒开始,两次执行间隔为1秒,执行的内容是一个org.apache.cassandra.db.MeteredFlusher对象。

7、MeteredFlusher线程的定期运作

首先,先获取非活跃Memtable的大小。

然后就是flush的操作,flush的过程分2部分,设m是管线pipeline中能够存在的最大Memtable数量,第一部分则是将所有已使用内存超过已分配内存的1/m的ColumnFamily进行flush,第二部分则是对剩下的Memtable根据大小进行排序,对超过阀值的进行flush

8、清洗系统表的目录

这一步是对系统表所在的目录做清理,保证作为系统基石的系统表不会出错,可以理解,系统有可能在任何时候down机,于是固化到硬盘上的数据文件也就可能处于各种各样的脏数据状态,这里就是为了纠正这些可能的错误,虽然手段很粗暴。具体来说,就是对每个系统表调用ColumnFamilyStore.scrubDataDirectories方法,代码如下:

        // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
        // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
        // until system table is opened.
        for (CFMetaData cfm : DatabaseDescriptor.getTableMetaData(Table.SYSTEM_TABLE).values())
            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);

在方法的执行过程中,所做的工作是移除相应的ColumnFamily所在目录下的不必要的文件,所谓不必要的文件,包括以下这些:临时文件(temp files),孤儿(orphans,指缺少data file的),零长度文件(zero-length files),已合并sstable(compacted sstables)。至于无法辨认的文件,将会被略过。经过这个方法之后,一系列满足上述描述的Descriptor将会被移除。

在具体的实现中,首先处理掉的是各个temp files和compacted sstables,因为有标记,所以很好清除;其次,是orphans,需要检查data file的标记;再其次,是未完成的cache文件,需要对文件名进行匹配,然后删掉;最后是清理这个ColumnFamily的一级索引,具体的操作是对一个ColumnFamily中的各个Column的Index做一次scrubDataDirectories递归调用,这里有趣的是,在原本该填入column family name的地方,填入的是针对某个column的indexName,虽然我还没有看到那部分的代码,但是从这里可以推测,Cassandra中主索引的实现,是以特殊的ColumnFamily的方式实现的。

9、Table的初始化

全名是org.apache.cassandra.db.Table。这个类的初始化自然也是在第一次调用到它的时候才会发生,有趣的是,这个类的第一次调用发生的位置是有可能不确定的。

第一处出现的地方是在SystemTable的checkHealth()方法中:

            table = Table.open(Table.SYSTEM_TABLE);

checkHealth()会在AbstractCassandraDaemon的setup()方法中被调用,这是真正的起始调用处。所以调用过程其实是这样:AbstractCassandraDaemon.setup()->SystemTable.checkHealth()->Table.open(Table.SYSTEM_TABLE)

第二处出现的地方是在ColumnFamilyStore的all()方法中:

        for (Table table : Table.all())

真正的起始调用处是在MeteredFlusher中的run()方法,调用过程是这样:MeteredFlusher.run()->MeteredFlusher.countFlushingBytes()->ColumnFamilyStore.all()->Table.all()->Table.open(tableName)

由于这两个调用的地方处于不同的线程之中,所以谁先被执行到是不确定的,然而,重要的地方是Table这个类的很多方法是需要在不同线程中进行同步的。也就是说,这些方法同一时间只能有一个线程去访问。open()方法就是其中一个。

有点扯远了,不过,明白Table的用处对于理解它的初始化是有帮助的。这里的static块中的代码没有什么特点,仅仅是检查相关的keyspace的目录有没有建立好,没建立好就建一下。更加重要的是Table中各种变量的存在意义。

如同我之前的文章所说,keyspace其实就是table,在代码中也得到了体现。首先,是2个static的变量,一个是重用读写锁:

    /**
     * accesses to CFS.memtable should acquire this for thread safety.
     * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
     *
     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
     */
    static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();

一个就是用来储存所有Table的实例映射的变量:

    /** Table objects, one per keyspace.  only one instance should ever exist for any given keyspace. */
    private static final Map<String, Table> instances = new NonBlockingHashMap<String, Table>();

其他的非static变量则是一个table应该拥有的属性:name,columnFamilyStores(每个column family都有一个columnFamilyStore),indexLocks,flushTask,replicationStrategy。

无论如何,Table.open()方法都是至关重要的一个入口。其实这个方法做的事情很简单,就是从instances中取得相应名字的table,如果找不到的话,就新建一个。需要注意的是必须保证每个keyspace只有一个table对象,所以在新建table的代码块外面加上了synchronized标记,用来在多线程中同步。

Table的构造器则做是在读取配置之后,建立各个实例属性(也就是上面提到的那些,name,columnFamilyStores,indexLocks,flushTask,replicationStrategy)。

10、系统表健康检查

说是健康检查,实际上也就是打开系统表的一个Column Family,比了比里面的值。

第一步打开系统表:

            table = Table.open(Table.SYSTEM_TABLE);

这一步的结果有两种:1)成功,继续下面的操作;2)发生异常,注释上说的是当更改了partitioner(从OPP改成RP)的时候会发生,系统表的文件还在,却无法读取。

第二步是读取系统表中的一部分(一行两列),如下面的子表(粗体是具体值,带引号的是字符串,不带的是变量,灰色是变量名,通常都是常量,蓝色是说明,下同):

column family: SCHEMA_CF=”Schema
columns: PARTITIONER=”Partioner CLUSTERNAME=”ClusterName
Key: LOCATION_KEY=”L  value  value

具体代码如下:

        SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
        cols.add(PARTITIONER);
        cols.add(CLUSTERNAME);
        QueryFilter filter = QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF), cols);
        ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);

此处的情况就略为复杂一些,可能发生的结果有三种:

  • 1)cf不为空,读取成功,great;
  • 2)cf为空,于是再次尝试取得名为 STATUS_CF 的系统表中的ColumnFamily,检查其中的SSTable部分:
    • 2.1)如果SSTable部分不为空,那就抛出异常,因为这意味着这个CF的文件存在,但是却读不出来,这种情况是在更改了partitioner(从RP改成了OPP)的时候会发生。如果为空,则进入2.2;
    • 2.2)如果SSTable部分为空,意味着没有找到系统表的这部分文件,没关系,这会被认为是新的节点,于是进入处理新节点的流程;

此外,这里的QueryFilter对象的建立,ColumnFamily对象的获得,都是一系列比较复杂的过程,这也是后话。

第三步是检查读取到的partitioner和clustername是否与从配置文件中读取的相同,我们需要他们相同!

11、载入Schema

第一步是尝试获得最新的MigrationId。获取的位置依然是System Table,存入UUID类型的变量中,这里假设这个变量是version吧,version下面还会用到,讲起来比较方便。具体位置如下表:

column family: SCHEMA_CF=”Schema
columns: LAST_MIGRATION_KEY=”Last Migration
Key: LAST_MIGRATION_KEY=”Last Migration  value

第二步,判断这个值是否找到,这时,会发生2种情况:1,找不到,表现为version == null。然后此时再查看是否有数据文件存在,如果数据文件还在,那么就是系统无法读取schema,需要用户用CLI重新定义schema,如果没有数据文件存在,那么很好,说明是一个什么表都没建立的空节点;2,找到了这个版本号,那么就会调用org.apache.cassandra.db.DefsTable.loadFromStorage(UUID version)方法。

第三步,接上一步的最后一个分支,是从存储中载入某个版本的keyspace。具体来说,是从System Table的下面DEFINITION_SCHEMA_COLUMN_NAME列中读出一个json串

column family: SCHEMA_CF=”Schema
columns: DEFINITION_SCHEMA_COLUMN_NAME=”Avro/Schema 每个keyspace一个column,name这里是keyspace的name,直接是文本
Key: version  value是Schema的JSON 每个keyspace的定义, 根据左边的schema编码之后的数据,读出后需要用相应的schema解码

然后依次经过几个类型的容器,IColumn=>ByteBuffer=>String=>Schema,获得一个Schema对象。最终通过反序列化得到所有keyspace的元数据的集合Collection<KSMetaData>

  • IColumn=>ByteBuffer:直接取IColumn的value
  • ByteBuffer=>String:调用ByteBufferUtil.string(value),获得一个json字符串
  • String=>Schema:调用Schema.parse(s),这个方法来自Avro项目,具体的过程不在次讨论,简单介绍一下Avro中的Schema支持,以下翻译自(http://www.cloudera.com/blog/2009/11/avro-a-new-format-for-data-interchange/):“

    Avro使用JSON来定义一个数据结构的schema。举例来说,一个二维的点可以定义成以下的Avro记录:

    {“type”: “record”, “name”: “Point”,
    “fields”: [
    {“name”: “x”, “type”: “int”},
    {“name”: “y”, “type”: “int”},
    ]
    }

    这个记录的每个实例都被序列化成简单的两个整数,没有额外的每记录(per-record)或每域(per-field)的注解。整数使用可变长的zig-zag编码写下。因此,较小坐标值的点就能仅用两个字节来写下:100个点会需要大概200字节。

    在记录(records)类型和数值(numeric)类型之外,Avro还包括了对数组(arrays),映射(maps),枚举(enums),变量(variable),定长二进制数据(fixed-length binary data)以及字符串 (strings)的支持。Avro还定义了一个容器文件格式(container file format),以提供良好的支持给MapReduce以及其他分析框架。细节见Avro specification.

  • Collection<KSMetaData>:使用第三步开始得到的Schema对象依次反序列化这个ColumnFamily中的其他Column的值,每次生成一个KsDef类的对象,合起来就是所有的keyspace的元数据集合。这个Schema相当于是keyspace元数据的元数据。

第四步,对所有的keyspace:创建keyspace名到columnfamily名的映射<cfm.ksName, cfm.cfName>,然后添加到CFMetaData类的静态变量cfIdMap中;然后将这个keyspace的元数据,连同版本号一起添加到DatabaseDescriptor的相应静态变量中。

12、清洗所有表的目录

类似上述第8步,只是清洗的对象是所有的表,因为他们的元数据已经由上一步获得了,存在了DatabaseDescriptor.tables里。

13、打开所有表

对所有的表执行Table.open(table)方法。代码如下:

        // initialize keyspaces
        for (String table : DatabaseDescriptor.getTables())
        {
            if (logger.isDebugEnabled())
                logger.debug("opening keyspace " + table);
            Table.open(table);
        }

14、启动垃圾收集检查器的定期运作

这里的垃圾收集检查器是指GCInspector类,这个类也用到了Singleton Pattern,只被实例化一次。但是注意这一点:这个类的作用是定期对sun的类进行垃圾收集。实例化时,会在MBeanServer中注册。然后会启动一个线程定期运作。定期运作时,如果当完成一次完整的垃圾收集后仍然使用很多的内存,则会根据需要进行1)降低缓存大小;2)flush最大的Memtable。

代码就一句:

        try
        {
            GCInspector.instance.start();
        }
        catch (Throwable t)
        {
            logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
        }

15、CommitLog恢复

CommitLog的恢复是在server端启动时完成的,作用是处理未完成的操作。

第一步,从DatabaseDescriptor中获得commitlog的位置,然后检查大小,看是否需要恢复(空文件不需要恢复)。如果需要,那么对需要恢复的commitlog进行排序(会有多个commitlog文件),排序依据是修改时间

第二步,使用这些commitlog文件进行恢复。这一步是CommitLog恢复的主要操作所在,过程还是比较复杂,而且似乎不同版本间的这个方法是不一样的,这里的仅对0.8.0。

  1. 获得所有sstable文件的ReplayPosition。
    先来说说ReplayPosition,这个类有用的是两个属性:segment和position。segment在程序里表现为时间点的值,用这个语句生成:System.currentTimeMillis(),segment会在2个地方出现(至少到目前为止,我只看到两个地方),一个是commitlog的文件名中,另一个是sstable4元组(一个SSTable包括Data.db,Filter.db,Index.db和Statistics.db)中的Statistics.db。而position则是文件中的文件指针的位置,用来标记从文件的何处开始读。于是,这里要做的就是从每个ColumnFamily的各个SSTable的Statistics.db文件中读出每个SSTable的ReplayPosition,然后取每个ColumnFamily的各个SSTable的ReplayPosition中的最大值,存入cfposition变量中。具体的文件格式在以后的文章中再说。
  2. 获取cfposition中最小的那个ReplayPosition,存入globalPosition。这里的最小指的是segment最小,也就是时间最早,代表着上一次系统结束前最早操作的那个column family的时间点。这个时间点后面会用到。
  3. 获取commitlog中最小的那个的segment。正如上面所说,commitlog的文件名中就包含了这个segment,所以直接解析即可。
  4. 对每个commitlog,获得执行恢复操作所需要的commitlog文件的位置。判断的依据是比较globalPosition.segment和commitlog.segment(代码中不是这样写的,但是这里这样写方便叙述,包括下面提到的replayPosition为了方便叙述也写成commitlog.replayPosition)的值,会发生三种情况:
    1)globalPosition.segment<commitlog.segment:说明这个commitlog所做的操作还没有执行,所以把commitlog.replayPosition值设为0
    2)globalPosition.segment==commitlog.segment:说明这个commitlog所做的操作正杂执行,所以把commitlog.replayPosition值设为globalPosition.position
    3)globalPosition.segment>commitlog.segment:说明这个commitlog所做的操作已经执行过了,所以把commitlog.replayPosition值设为reader.length(),也就是这个commitlog的末尾。
  5. 如果要恢复,从replayPosition每次读取一项(entry),然后将读取到的每一项反序列化成RowMutation对象
  6. 将得到的RowMutation对象添加到SEDA中MUTATION阶段的线程池中,执行提交。
    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
  7. 最后等所有的更改结束之后flush涉及到的table。

第三步,删除所有的commitlog。

16、启动服务器

第一步,获得各节点的Token,即一系列<token,endpoint>,获取地址在系统表中,具体如下:

column family: STATUS_CF=”LocationInfo
columns: column name是各token值
Key: RING_KEY=”Ring column value是相应的endpoint值

第二步,更新系统中使用到token的地方,一处时tokenMetadata_变量,一处是Gossiper。

第三步,定义了一个ShutdownHook,添加到了Runtime中。

第四步,加入到Token Ring中。从这里开始是真正开始把服务器当做网络中的一个节点,所以各种设置监听。

首先就是启动Gossiper。Gossiper作为Cassandra中很有特色的一个东西,还是比较重要的,其作用是维护集群的状态,通过gossip,每个节点都能知道集群中包含哪些节点,以及这些节点的状态,这使得Cassandra集群中的任何一个节点都可以完成任意key的路由,任意一个节点不可用都不会造成灾难性的后果。Gossiper的详细介绍以后再说,这里仅给出涉及到的部分。

  1. Gossiper采用 Singleton Pattern,仅有一个实例,在实例化的时候,做了两件事:设置了两个时间长度,用作gossip过程中的时间上限,然后将这个Gossiper实例注册为FailureDetector的一个监听器。Gossiper和FailureDetector的羁绊在于IFailureDetectionEventListener接口,其中只有一个方法 convict(InetAddress ep),即标记一个节点死了,FailureDetector实现了”The Phi Accrual Failure Detector”,做出判决,然后由各监听器来执行。
  2. 在启动Gossiper之前,注册了2个预订用户(subscribers):StorageService实例和MigrationManager实例,他们的共同特点是要实现IEndpointStateChangeSubscriber接口,这个接口定义了5个方法,用于某节点a通知它感兴趣的那部分(?interested parties,不知道怎么翻。。)关于任意endpoint的状态改变。这个5个方法分别是:onJoin,onChange,onAlive,onDead,onRemove。
            // have to start the gossip service before we can see any info on other nodes.  this is necessary
            // for bootstrap to get the load info it needs.
            // (we won't be part of the storage ring though until we add a nodeId to our state, below.)
            Gossiper.instance.register(this);
            Gossiper.instance.register(migrationManager);
            Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.

    然后,Gossiper启动,参数是新的版本号。在启动过程中,首先获得所有的seed节点。然后启动本机的心跳状态(HeartBeatState),将本机状态设为alive,并将<netAddress, EndpointState>映射存入endpointStateMap。接下来通知snitches,告诉他们gossip要开始了。最后启动定期运作的线程,定期GossipTask类中的run方法,运作的内容如下(摘自袁大星的文档):

    Cassandra内部有一个Gossiper,每隔一秒运行一次(在Gossiper.java的start方法中),按照以下规则向其他节点发 送同步消息:

    1) 随机取一个当前活着的节点,并向它发送同步请求

    2) 向随机一台不可达的机器发送同步请求

    3) 如果第一步中所选择的节点不是seed,或者当前活着的节点数少于seed数,则向随意一台seed发送同步请求

    第一和第二步好理解,通过第一步可以和当前活着的节点同步状态,以更新本地的状态,通过第二步可以尽早发现不可用的节点重新可用了。

    第三步中的第一个条件,如果第一步中的节点不是seed,则向随意一台seed发送同步请求也比较好理解,因为seed理论上总是有较多的节点状态 信息。

    第三步中第二个条件则有点难理解,当活着的节点数少于seed时,也需要向随机的seed发送同步消息。其实这里是为了避免出现seed孤岛。

    如果没有这个判断,考虑这样一种场景,有4台机器,{A, B, C, D},并且配置了它们都是seed,如果它们同时启动,可能会出现这样的情形:

    A节点起来,发现没有活着的节点,走到第三步,和任意一个种子同步,假设选择了B

    B节点和A完成同步,则认为A活着,它将和A同步,由于A是种子,B将不再和其他种子同步

    C节点起来,发现没有活着的节点,同样走到第三步,和任意一个种子同步,假设这次选择了D

    C节点和D完成同步,认为D活着,则它将和D同步,由于D也是种子,所以C也不再和其他种子同步

    这时就形成了两个孤岛,A和B互相同步,C和D之间互相同步,但是{A,B}和{C,D}之间将不再互相同步,它们也就不知道对方的存在了。

    加入第二个判断后,A和B同步完,发现只有一个节点活着,但是seed有4个,这时会再和任意一个seed通信,从而打破这个孤岛。

  3. MessagingService开始监听本地地址。这是用于消息传递的类。
  4. StorageLoadBalancer开始广播。这个类是这篇文章《Scalable range query processing for large-scale distributed database applications》的实现。用于监视负载信息,必要时会进行负载平衡,运行间隔是5分钟。
  5. 向所有seed节点声明自己的版本号。先用消息进行积极(actively)的声明,最后使用Gossip进行消极(passively)的声明。
  6. 在Gossiper的本地映射中添加应用状态。
            MessagingService.instance().listen(FBUtilities.getLocalAddress());
            StorageLoadBalancer.instance.startBroadcasting();
            MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
            Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
  7. HintedHandOffManager注册Mbean。
  8. 判断是不是AutoBootstrap,然后分别有不同的处理流程。如果不是AutoBootstrap的话,那么就设置一下token就好了。如果是AutoBootstrap的话,事情就大条了,先要用StorageLoadBalancer获得负载信息,然后看看这个节点是不是已经是token ring中一部分,是的话就异常了,有幸进行下去的话,用获得的负载信息来决定本机的token值,然后再用这个token启动。

17、载入mx4j

这是为了能够使用JMX。实现的代码里充斥着reflection。。。

三、start

启动RPCServer。最终是调用了这个方法:

    protected void startServer()
    {
        if (server == null)
        {
            server = new ThriftServer(listenAddr, listenPort);
            server.start();
        }
    }

四、后记

在拖延症的影响下,这篇日志写了2个多月,真是汗颜啊。。。当然,也有第一次看这个代码的原因,往往一个方法要搞懂得看好久,不过,完成了这篇日志,还是很有好处的,了解了许多实现内部的细节,以后应该就能看的快些了。最后一些部分有些流水账,写的太急。还有一些方法的细节部分没有写,准备以后专门用别的日志来写,这篇已经太长太长了。。。

Advertisements

One response to “Cassandra 0.8.0流程分析(1)——CassandraDaemon启动流程

  1. Pingback: Year 13 Week 42 | Collecting The Pieces

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: