Hycz's Blog

Life is a game. Why so serious?

Tag Archives: Apache Cassandra

Cassandra 0.8.0 源码分析——Thrift相关

一、引子

这两天在写一个简单的cassandra测试程序,中间参考了test/distributed下的代码,不过其中用到的whirr实在蛋疼,首先这里用到的是whirr0.4版本,跟最新的0.7版本差别太大,其次是没有详细的文档说明如何在服务器端配置能够使用whirr进行集群管理,所以我就自己写了个简单的,把whirr的部分都砍掉了。

写这个简单的测试程序的过程中,出了些开始觉得诡异,后来觉得愚蠢的错误,不过也因此给我理由步进进去看了看源码,比如libthrift-0.6里的源码,还有自动生成的代码。

二、Introduction

首先推荐一个不错的thrift介绍,http://kingxss.iteye.com/blog/1413696

其他介绍的话就不说了,那篇博客里有写,这里就说下自己的感受。Thrift就是定义了一套自己的语法、传输协议,以此提供了与具体语言无关的一系列定义,在需要的时候翻译成相应的语言,同时提供了服务器客户端之间的底层通讯功能,于是可以让使用者专注于实现自己的功能

具体的表现,就是我接触到的Cassandra中thrift的使用,在cassandra.thrift文件中定义好数据结构struct,服务功能service,以及枚举enum、异常exception等部分后,使用thrift根据这个文件中的定义,自动生成大量的代码,在Cassandra中,就是interface/thrift/gen-java下的代码。

三、真的是生成代码?

开始我一直奇怪的就是,为何.thrift文件只有650+行,却生成了Cassandra.java这个有30000+行的庞然大物,而且.thrift文件中没有业务逻辑,仅仅是一些功能的名称。所以我下了个thrift的可执行程序(http://www.apache.org/dyn/closer.cgi?path=/thrift/0.8.0/thrift-0.8.0.exe)自己生成了一次。

首先我写了个简单的定义Hello.thrift,代码如下:

namespace java service.demo   
service Hello{   
 string helloString(1:string para)   
 i32 helloInt(1:i32 para)   
 bool helloBoolean(1:bool para)   
 void helloVoid()   
 string helloNull()   
}

执行命令thrift-0.8.0.exe -gen java Hello.thrift后,查看了一下生成代码的大小,真是吓人,仅仅几行的定义就生成了几千行的代码。

再看看cassandra.thrift,本身就有650+行,执行命令thrift-0.8.0.exe -gen java cassandra.thrift之后,确实生成了在原本项目中interface/thrift/gen-java目录下的代码。

四、生成了什么代码

刚才提到.thrift文件被叫做接口定义(interface definition),使用Thrift IDL(Thrift interface description language)(http://thrift.apache.org/docs/idl/)写成,这种文件一般包括了很多thrift types 和 services。services也是thrift types的一种,不过这里定义的services还需要由服务端实现,由客户端调用

所以,重要的就是理解.thrift文件中的各种项。官网上有2处提到了这些类型,一个是Thrift IDL(http://thrift.apache.org/docs/idl/),一个是Thrift Types(http://thrift.apache.org/docs/types/),我认为前者是完整的描述,后者则是其中常用的一些类型。

先说说下面列举一些常用的Type:

  • Base Types
    就是下面的7种基本类型,不用太多解释:

    • bool: A boolean value (true or false)
    • byte: An 8-bit signed integer
    • i16: A 16-bit signed integer
    • i32: A 32-bit signed integer
    • i64: A 64-bit signed integer
    • double: A 64-bit floating point number
    • string: A text string encoded using UTF-8 encoding
  • Special Types
    只有一种binary类型,当前只是string类型的一种特殊形式。

    • binary: a sequence of unencoded bytes
  • Structs
    定义了一个通用对象,类似于OOP中的Class,但是没有继承。一个Struct有一系列的拥有单独名称标识的强类型域(strongly typed fields)。在生成java代码时,每个Struct将单独生成一个类,由于定义时不会有方法,所以生成的Class代码中只有众多的存取方法。
  • Containers
    多种编程语言中常用的强类型容器,具体有3种:

    • list:元素的有序列表。会被翻译成STL vector,Java ArrayList,native arrays in scripting languages等等。
    • set:不同元素的无序列表。会被翻译成STL set, Java HashSet, set in Python等等。PHP中不支持set,所以会被看成语List相同。
    • map:严格不同的键值对map。可以被翻译成STL map, Java HashMap, PHP associative array, Python/Ruby dictionary等等。

    在实际使用中,容器常常作为field的类型使用。

  • Exceptions
    功能上Exception和Struct是一样的,不过Exception在翻译成目标语言的时候,都会继承相应的Exception类。生成Java代码时,也会生成一个单独的Exception文件。
  • Services
    定义一个service等同于在OOP中定义一个接口(或者纯虚拟抽象类)。Thrift编译器产生完整功能的,实现了这个接口的客户端和服务端stub。
    一个service有一系列命名了的functions组成,function有一系列参数和返回类型。

更加完整的定义还是得去看Thrift IDL,这里不细说了。

五、service生成代码的分析

interface/thrift/gen-java目录下,生成了很多的java文件,其中Cassandra.java是客户端的所在,其他则是一些数据结构的类、异常的类和枚举的类,所以这些闲散的类就没什么看头了,研究一下服务的类Cassandra.java。

我觉得Cassandra.java可以分为4个部分:接口,Client,Processor,辅助类

1、接口部分

接口部分有两个,IfaceAsyncIface,两者都是根据.thrift文件中的”service Cassandra{…}”这一项中的定义生成的,两个接口的不同之处是AsyncIface中的方法都比Iface中的多一个Call back的参数。

2、Client

Client就是实现了上述接口的类,因此同样有2个,一个是Client类,一个是AsyncClient类。Client类分为了4个部分,分别是:实例属性,Factory类,功能方法,sendrecv方法

2.1 实例属性

之所以把实例属性放在其他部分之前说,是由于实例属性体现了这是一个分布式程序,并且影响着其他部分。

实例属性只有3个:iprot_, oprot_, seqid。前2个都是org.apache.thrift.protocol.TProtocol类型的,可以理解为提供了分别设置输入协议对象和输出协议对象的能力,虽然使用时可以设置为一个。seqid则表示这客户端的编号,每次收到的消息都要比较seqid是否相同。

2.2 Factory类

Factory类就是提供了2个public级别的Client的构造器,2种构造器的参数列表略有不同,一个是只传入了一个协议对象,这将会把iprot_和oprot_设为相同对象;另一个则传入了2个协议对象,分别对应iprot_和oprot_

2.3 功能方法

这里的功能方法即为接口Iface中定义的方法,有趣的地方是所有的功能方法都有着统一的形式,即顺序执行send和recv方法。比如下面这个例子

public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_add(key, column_parent, column, consistency_level);
      recv_add();
    }

2.4 sendrecv方法

这里是真正涉及到了分布式的地方,每一个功能方法都对应着一个send方法和recv方法。send方法写消息,用oprot_写,recv方法收消息,从iprot_中读。比如下面这个例子

    public void send_add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("add", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      add_args args = new add_args();
      args.setKey(key);
      args.setColumn_parent(column_parent);
      args.setColumn(column);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

    public void recv_add() throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
      if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
        org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
        iprot_.readMessageEnd();
        throw x;
      }
      if (msg.seqid != seqid_) {
        throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "add failed: out of sequence response");
      }
      add_result result = new add_result();
      result.read(iprot_);
      iprot_.readMessageEnd();
      if (result.ire != null) {
        throw result.ire;
      }
      if (result.ue != null) {
        throw result.ue;
      }
      if (result.te != null) {
        throw result.te;
      }
      return;
    }

这里就涉及到了Thrift中的Message的结构,见下一章。

3、Processor

Processor类实现了org.apache.thrift.TProcessor接口,这个类封装了各种从输入输出流中读写数据的操作,在Thrift架构中在传输之上,应用之下,连接了底层操作和用户功能实现。Processor类大致分为这么几个部分:构造器,process方法,具体功能类

3.1 构造器

构造这个类的实例时,需要传入Iface中定义的方法的具体实现,具体来说,就是一个实现了Iface接口的对象,然后构造器中新建了各个功能类的的实例放入processMap_。具体代码如下:

public static class Processor implements org.apache.thrift.TProcessor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
  public Processor(Iface iface)
  {
    iface_ = iface;
    processMap_.put("login", new login());
    processMap_.put("set_keyspace", new set_keyspace());
    processMap_.put("get", new get());
    processMap_.put("get_slice", new get_slice());
    processMap_.put("get_count", new get_count());
    processMap_.put("multiget_slice", new multiget_slice());
    processMap_.put("multiget_count", new multiget_count());
    processMap_.put("get_range_slices", new get_range_slices());
    processMap_.put("get_indexed_slices", new get_indexed_slices());
    processMap_.put("insert", new insert());
    processMap_.put("add", new add());
    processMap_.put("remove", new remove());
    processMap_.put("remove_counter", new remove_counter());
    processMap_.put("batch_mutate", new batch_mutate());
    processMap_.put("truncate", new truncate());
    processMap_.put("describe_schema_versions", new describe_schema_versions());
    processMap_.put("describe_keyspaces", new describe_keyspaces());
    processMap_.put("describe_cluster_name", new describe_cluster_name());
    processMap_.put("describe_version", new describe_version());
    processMap_.put("describe_ring", new describe_ring());
    processMap_.put("describe_partitioner", new describe_partitioner());
    processMap_.put("describe_snitch", new describe_snitch());
    processMap_.put("describe_keyspace", new describe_keyspace());
    processMap_.put("describe_splits", new describe_splits());
    processMap_.put("system_add_column_family", new system_add_column_family());
    processMap_.put("system_drop_column_family", new system_drop_column_family());
    processMap_.put("system_add_keyspace", new system_add_keyspace());
    processMap_.put("system_drop_keyspace", new system_drop_keyspace());
    processMap_.put("system_update_keyspace", new system_update_keyspace());
    processMap_.put("system_update_column_family", new system_update_column_family());
    processMap_.put("execute_cql_query", new execute_cql_query());
  }
…
private Iface iface_;
protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
…

3.2 process方法

这个process方法是使用Processor类进行处理时的入口,通过读入输入流中的Message,寻找是否有相应的功能类来处理,如果有,则转至相应功能类进行处理,否则,写回一个Exception。具体代码如下:

public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
  ProcessFunction fn = processMap_.get(msg.name);
  if (fn == null) {
    org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
    iprot.readMessageEnd();
    org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
    oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
    x.write(oprot);
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
    return true;
  }
  fn.process(msg.seqid, iprot, oprot);
  return true;
}

3.3 具体功能类

对已Iface中定义的每个方法,这里都建立了一个类来与其对应,这样的类都实现了processFunction接口,其中只实现了一个process方法,用来处理输入输出流部分的操作,比如读入输入参数,写回结果,写回异常等等当然,最后还是要使用构造时传入的具体处理功能实现。这里代码比较多,随便贴一个来看看:

    protected static interface ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
    }
…

    private class get_slice implements ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
      {
        get_slice_args args = new get_slice_args();
        try {
          args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
          iprot.readMessageEnd();
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd();
        get_slice_result result = new get_slice_result();
        try {
          result.success = iface_.get_slice(args.key, args.column_parent, args.predicate, args.consistency_level);
        } catch (InvalidRequestException ire) {
          result.ire = ire;
        } catch (UnavailableException ue) {
          result.ue = ue;
        } catch (TimedOutException te) {
          result.te = te;
        } catch (Throwable th) {
          LOGGER.error("Internal error processing get_slice", th);
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, "Internal error processing get_slice");
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
      }

    }

4、辅助类

我所称的辅助类其实也跟所有的功能方法有关,每一个功能方法都有2个辅助类,分别是args和result。比如对于add功能,就有add_args和add_result两个辅助类。

args类是根据.thrift文件中定义的方法中的参数列表生成的,主要是对参数列表中的每一项建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这里的代码比较冗长,主要是供Client中的send方法使用。

result类是根据.thrift文件中定义的方法返回值和异常列表生成的,注意这里如果有返回值的话,那么会储存在success属性中,主要是对返回值和异常建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这个类主要供Client中的recv方法使用。

六、Thrift中的Message的结构

既然客户端和服务端之间的通讯的媒介是Message,那么我们就简单看下Message的结构是怎样的。

因为是生成代码,所以Cassandra.java中的代码结构就有迹可循。这一节我们先用一个例子来看一下消息是如何被一点点组织起来,完成后又被一点点读取出来的,这里用get_range_slice功能,然后总结一下Message的通用结构。

    public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_get_range_slices(column_parent, predicate, range, consistency_level);
      return recv_get_range_slices();
    }
    public void send_get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_range_slices", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      get_range_slices_args args = new get_range_slices_args();
      args.setColumn_parent(column_parent);
      args.setPredicate(predicate);
      args.setRange(range);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

1、Message Begin和End

send方法中的第一句就是写入一个Message Begin。这里实际上给了我们2个信息,一个是这个Begin究竟包括了哪些内容,另一个是如何去写入一个Message Begin。

1.1 Message Begin包括的内容

且不论不同的协议会如何对待传入的参数,我们知道了这个Message Begin是一个org.apache.thrift.protocol.TMessage类型的。这里是这个类的构造器:

public TMessage(String n, byte t, int s) {
name = n;
type = t;
seqid = s;
}
public final String name;
public final byte type;
public final int seqid;

由此可见,一个Message应该包括的内容有3个:name,type,seqid

  • name:根据上面send方法中的设置来看,name被设置为功能名称,如果使用TMessage的无参数构造器的话,name会被设为空字符串””。
  • type:虽然这是一个byte类型的,但是实际上是从TMessageType中选择的,具体含义见源码,不过如果使用TMessage的无参数构造器的话,type会被置为TType.STOP。
    public final class TMessageType {
      public static final byte CALL  = 1;
      public static final byte REPLY = 2;
      public static final byte EXCEPTION = 3;
      public static final byte ONEWAY = 4;
    }
  • seqid:这个就是用来标记客户端的。

1.2 不同协议的write

语句中写的是oprot_.writeMessageBegin,所以注定不同的协议会有不用的写法。先看看0.6版本中有哪些协议:

  • TBinaryProtocol
  • TCompactProtocol
  • TJSONProtocol
  • TSimpleJSONProtocol

这些协议都继承了TProtocol这个抽象类。在源码提供的测试程序中使用的是TBinaryProtocol。

当所有的写入完成后,需要执行writeMessageEnd方法。TBinaryProtocol中这是一个空方法。TJSONProtocol中就不是空方法。

2、args写入

send方法中的第二部分就是把参数写入,这里就借助了args辅助类来执行这个操作,当然,具体执行的之后,实际上还是调用的oprot_中的write方法,具体如下:

    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      validate();

      oprot.writeStructBegin(STRUCT_DESC);
      if (this.column_parent != null) {
        oprot.writeFieldBegin(COLUMN_PARENT_FIELD_DESC);
        this.column_parent.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.predicate != null) {
        oprot.writeFieldBegin(PREDICATE_FIELD_DESC);
        this.predicate.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.range != null) {
        oprot.writeFieldBegin(RANGE_FIELD_DESC);
        this.range.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.consistency_level != null) {
        oprot.writeFieldBegin(CONSISTENCY_LEVEL_FIELD_DESC);
        oprot.writeI32(this.consistency_level.getValue());
        oprot.writeFieldEnd();
      }
      oprot.writeFieldStop();
      oprot.writeStructEnd();
    }

从这里已经可以看出Message中的大致结构了,依次是Message级,Struct级,Field级

3、Message结构

Message的结构有三级,依次是Message级,Struct级,Field级。这是Thrift中通用的逻辑结构,不同的协议实现的不同。大致的示意图如下:

Message Begin
Struct Begin
Field Begin
arg
Field End
Struct End
Message End

其中arg是包含特定数据的地方,比如String,Int,Map,List等等,这里的结构就不固定了,arg部分的写入一般是在args类中的write方法里完成的。

在Cassandra.java中,用的是TBinaryProtocol,Message结构中的很多部分没有用到,比如writeMessageEnd方法,writeStructBegin方法,writeStructEnd方法,writeFieldEnd方法都是空方法,而在Message Begin和Field Begin这两处,则写入了一些标识信息,例如Field Begin中就标识了arg中放入的数据的类型,如下:

Message Begin: name, type, seqid
Struct Begin: nothing
Field Begin: field type, field id
arg: blablabla
Field End: nothing
Struct End: nothing
Message End: nothing

TBinaryProtocol类中的具体代码如下:

  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }

  public void writeMessageEnd() {}

  public void writeStructBegin(TStruct struct) {}

  public void writeStructEnd() {}

  public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
  }

  public void writeFieldEnd() {}

  public void writeFieldStop() throws TException {
    writeByte(TType.STOP);
  }

所以Message大致就是这么回事,关于上面的代码还有几个需要解释的地方:

  • 几种type:之前提到过Message的type,这里又出现了Field的type,两者是不同的,如同字面上的意思,Message的type表示了整个Message的类型,而Field的type仅表示了这个Field的类型Message的type是在TMessageType类中定义,Field的type是在TType类中定义,具体如下:
    /**
    * Type constants in the Thrift protocol.
    */
    public final class TType {
      public static final byte STOP   = 0;
      public static final byte VOID   = 1;
      public static final byte BOOL   = 2;
      public static final byte BYTE   = 3;
      public static final byte DOUBLE = 4;
      public static final byte I16    = 6;
      public static final byte I32    = 8;
      public static final byte I64    = 10;
      public static final byte STRING = 11;
      public static final byte STRUCT = 12;
      public static final byte MAP    = 13;
      public static final byte SET    = 14;
      public static final byte LIST   = 15;
      public static final byte ENUM   = 16;
    }

    于是这里就列出了所有arg中可能出现的数据的类型,对于String及String之后的类型,都有专门的writeXXXBegin、writeXXXEnd、readXXXBegin和readXXXEnd方法。

  • 关于Field Stop:在白皮书(http://thrift.apache.org/static/files/thrift-20070401.pdf)中提到,“所有的write方法都有一个对应的read方法,除了writeFieldStop这个例外这是一个特殊的方法,用来标记一个Struct的结束”。

七、Thrift的结构和简单使用(Java版)

首先有两篇介绍Thrift的文章,跟之前的不同是没有讲代码,http://diwakergupta.github.com/thrift-missing-guide/#_versioning_compatibilityhttp://dongxicheng.org/search-engine/thrift-guide/ ,前者是鸟文的,后者是中文的,不过翻译了不少前者的内容。

1、Thrift网络栈

下面这个是我画的很丑的Thrift 网络栈:

/—————————————————————
|        Server                                         |
|        (Single-threaded, event-driven etc) |
|—————————————————————|
|        Processer                                     |
|        (compiler generated)                     |
|—————————————————————|
|        Protocol                                       |
|        (Binary, JSON, Compact etc)           |
|—————————————————————|
|        Transport                                     |
|        (raw TCP, HTTP etc)                     |
|—————————————————————/

之后的内容大部分摘自http://dongxicheng.org/search-engine/thrift-guide/

1.1 Transport

Transport层提供了一个简单的网络读写抽象层。这使得thrift底层的transport从系统其它部分(如:序列化/反序列化)解耦。以下是一些Transport接口提供的方法:

  • open
  • close
  • read
  • write
  • flush

除了以上几个接口,Thrift使用ServerTransport接口接受或者创建原始transport对象。正如名字暗示的那样,ServerTransport用在server端,为到来的连接创建Transport对象。

  • open
  • listen
  • accept
  • close

1.2 Protocol

Protocol抽象层定义了一种将内存中数据结构映射成可传输格式的机制。换句话说,Protocol定义了datatype怎样使用底层的Transport对自己进行编解码。因此,Protocol的实现要给出编码机制并负责对数据进行序列化。

Protocol接口的定义如下:

  • writeMessageBegin(name, type, seq)
  • writeMessageEnd()
  • writeStructBegin(name)
  • writeStructEnd()
  • writeFieldBegin(name, type, id)
  • writeFieldEnd()
  • writeFieldStop()
  • writeMapBegin(ktype, vtype, size)
  • writeMapEnd()
  • writeListBegin(etype, size)
  • writeListEnd()
  • writeSetBegin(etype, size)
  • writeSetEnd()
  • writeBool(bool)
  • writeByte(byte)
  • writeI16(i16)
  • writeI32(i32)
  • writeI64(i64)
  • writeDouble(double)
  • writeString(string)
  • name, type, seq = readMessageBegin()
  • readMessageEnd()
  • name = readStructBegin()
  • readStructEnd()
  • name, type, id = readFieldBegin()
  • readFieldEnd()
  • k, v, size = readMapBegin()
  • readMapEnd()
  • etype, size = readListBegin()
  • readListEnd()
  • etype, size = readSetBegin()
  • readSetEnd()
  • bool = readBool()
  • byte = readByte()
  • i16 = readI16()
  • i32 = readI32()
  • i64 = readI64()
  • double = readDouble()
  • string = readString()

下面是一些对大部分thrift支持的语言均可用的protocol:

  • binary:简单的二进制编码
  • Compact:具体见THRIFT-11
  • Json

1.3 Processor

Processor封装了从输入数据流中读数据和向输出数据流中写数据的操作。读写数据流用Protocol对象表示。Processor的结构体非常简单:

/**
 * A processor is a generic object which operates upon an input stream and
 * writes to some output stream.
 *
 */
public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}

与服务相关的processor实现由编译器产生。Processor主要工作流程如下:从连接中读取数据(使用输入protocol),将处理授权给handler(由用户实现),最后将结果写到连接上(使用输出protocol)。

1.4 Server

Server将以上所有特性集成在一起:

  1. 创建一个transport对象
  2. 为transport对象创建输入输出protocol
  3. 基于输入输出protocol创建processor
  4. 等待连接请求并将之交给processor处理

2、Client 和 Server的编写

2.1 Client编写

好吧, 这个Client不是指Cassandra.java中的Client,而是指单独写的一个可以通过使用Cassandra.java来对服务器进行操作的客户端。其实只是一个简单的客户端的的话并不需要太多代码,这个是部分代码:

    public Cassandra.Client createClient(InetAddress addr) throws TException
    {
        TTransport transport    = new TSocket(
                                    addr.getHostAddress(),
                                    CLIENT_PORT,
                                    200000);
        transport               = new TFramedTransport(transport);
        TProtocol  protocol     = new TBinaryProtocol(transport);

        Cassandra.Client client = new Cassandra.Client(protocol);
        transport.open();

        return client;
    }

这里先建了个TSocket对象,然后以此为参数建立TTransport对象,再以此TTransport对象建立TProtocol对象,最后以此TProtocol对象建立客户端。

使用这个客户端也很方便,代码如下:

	public void testInsert() throws Exception
    {

        List hosts = controller.getHosts();
        final String keyspace = "TestInsert";
        addKeyspace(keyspace, 1);
        Cassandra.Client client = controller.createClient(hosts.get(0));
        try{
        	client.set_keyspace(keyspace);

	        ByteBuffer key = newKey();

	        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
	        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);

	        // block until the column is available
	        new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
	        new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);

	        List coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
	        assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
	        assertColumnEqual("c2", "v2", 0, coscs.get(1).column);
		} catch(Exception e){
			e.printStackTrace();
		} finally {
			client.send_system_drop_keyspace(keyspace);
		}
    }

2.2 Server编写

这一部分就不是用户写的了,而是属于Cassandra的一部分,org.apache.cassandra.thrift.CassandraServer就是这个server,这个类实现了Iface中定义的各种功能,其对象将会作为参数传给Processor,由于Processor中已经做了输入输出流的读取,因此CassandraServer类只需要专注于功能实现就行了,其实还是调用其他内部接口,比如下面这个方法:

    public List get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
    throws InvalidRequestException, UnavailableException, TException, TimedOutException
    {
        logger.debug("range_slice");

        String keyspace = state().getKeyspace();
        state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);

        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
        ThriftValidation.validateColumnParent(metadata, column_parent);
        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
        ThriftValidation.validateKeyRange(range);
        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

        List rows;
        try
        {
            IPartitioner p = StorageService.getPartitioner();
            AbstractBounds bounds;
            if (range.start_key == null)
            {
                Token.TokenFactory tokenFactory = p.getTokenFactory();
                Token left = tokenFactory.fromString(range.start_token);
                Token right = tokenFactory.fromString(range.end_token);
                bounds = new Range(left, right);
            }
            else
            {
                bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
            }
            try
            {
                schedule();
                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
            }
            finally
            {
                release();
            }
            assert rows != null;
        }
        catch (TimeoutException e)
        {
        	throw new TimedOutException();
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }        return thriftifyKeySlices(rows, column_parent, predicate);
    }

这里在获得了key的bounds之后,尝试调度此线程获得资源,然后调用了StorageProxy.getRangeSlice方法。

八、总结

这篇水文从Cassandra的生成代码入手,简单介绍了.thrift文件,分析了生成代码的结构,Thrift中的Message结构,最后简单介绍了一下Thrift的结构和如何写代码去使用Thrift。

Advertisements

Cassandra 0.8.0流程分析(1)——CassandraDaemon启动流程

一、入口

入口方法是CassandraDaemon中的main方法,里面只有一个CassandraDaemon().activate()的调用。

    public static void main(String[] args)
    {
        new CassandraDaemon().activate();
    }

这里调用的是AbstractCassandraDaemon中的方法。Cassandra虽然是用java写的,但是确实如我从前看到的一篇文章所说的那样,很有C的风格。具体来说,类中的static变量比比皆是,还有就是代码中常用static{}代码块,这个就是用来在第一次调用到这个类时一定会执行到的部分,于是,这就成了C语言中的面向过程编程了。而且由于步进时是不会自动在这些地方停留的,于是,必须手动在这个代码块中设置断点,否则总是直接执行完了,对于我们这种废柴读代码者来说很是难过,不过习惯了也就好了。

二、setup

继续刚才的过程,AbstractCassandraDaemon首先启动了log4j的相关功能,设置了定期检查配置更新。然后正式进入到activate()方法中。

这个方法做了2件事,也就是2步,第一步是初始化daemon,第二步是启动daemon。

初始化的工作其实是调用AbstractCassandraDaemon中的setup()方法。这其实是个非常繁复的过程。注意下面的类的初始化基本都包括了log4j的Logger注册,和Mbean的注册,所以就不单独写了。

1、设置logger

2、检查CLibrary

3、DatabaseDescriptor初始化

全名org.apache.cassandra.config.DatabaseDescriptor。很类似,这个类的初始化也是放在了static中,说的笼统一点,这里就是把各种配置文件读取了,然后存放在相应的变量中。但是如果真的要分的话,还是有2个部分:

第1部分是读取cassadnra.yaml,将其中的配置参数导入到各个变量中,整个过程中还涉及到对参数值的合法性检查,根据设置的类名启动相应的类或建立相应的对象等等。

第2部分是建立各种system tables,这些系统表的配置是直接写在代码中的(hardcode),而不是从配置中读取。具体来说,就是建立了一个名为system的keyspace的MetaData,然后其中包括以下的columnFamily:

  • StatusCf:persistent metadata for the local node,本节点的持久化元数据
  • HintsCf:hinted handoff data,便签式提交数据
  • MigrationsCf:individual schema mutations,单个(?)schema改变
  • SchemaCf:current state of the schema,当前schema状态
  • IndexCf:indexes that have been completed,已完成索引
  • NodeIdCf:nodeId and their metadata,节点ID和相应元数据

相关的代码如下:

            // Hardcoded system tables
            KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
                                                   LocalStrategy.class,
                                                   KSMetaData.optsWithRF(1),
                                                   CFMetaData.StatusCf,
                                                   CFMetaData.HintsCf,
                                                   CFMetaData.MigrationsCf,
                                                   CFMetaData.SchemaCf,
                                                   CFMetaData.IndexCf,
                                                   CFMetaData.NodeIdCf);
            CFMetaData.map(CFMetaData.StatusCf);
            CFMetaData.map(CFMetaData.HintsCf);
            CFMetaData.map(CFMetaData.MigrationsCf);
            CFMetaData.map(CFMetaData.SchemaCf);
            CFMetaData.map(CFMetaData.IndexCf);
            CFMetaData.map(CFMetaData.NodeIdCf);
            tables.put(Table.SYSTEM_TABLE, systemMeta);

从这里也可以看出一个keyspace的元数据需要包括哪些内容(名,副本策略,副本策略参数,ColumnFamily的元数据)。注意这里仅仅是建立了相关的元数据,并没有真正生成相应的keyspace的对象。

这个类的初始化很重要,是整个daemon启动的基础。

4、StorageService初始化

这个类的全名是org.apache.cassandra.service.StorageService,实际上是在DatabaseDescriptor初始化的过程中调用到了这个类的方法, 于是在第一次调用时初始化了。StorageService的初始化包括3个部分。

第一个部分定义了一系列的谓词(VERBS)和谓词阶段(verbStages),如下:

/* All verb handler identifiers */
    public enum Verb
    {
        MUTATION,
        BINARY,
        READ_REPAIR,
        READ,
        REQUEST_RESPONSE, // client-initiated reads and writes
        STREAM_INITIATE, // Deprecated
        STREAM_INITIATE_DONE, // Deprecated
        STREAM_REPLY,
        STREAM_REQUEST,
        RANGE_SLICE,
        BOOTSTRAP_TOKEN,
        TREE_REQUEST,
        TREE_RESPONSE,
        JOIN, // Deprecated
        GOSSIP_DIGEST_SYN,
        GOSSIP_DIGEST_ACK,
        GOSSIP_DIGEST_ACK2,
        DEFINITIONS_ANNOUNCE,
        DEFINITIONS_UPDATE_RESPONSE,
        TRUNCATE,
        SCHEMA_CHECK,
        INDEX_SCAN,
        REPLICATION_FINISHED,
        INTERNAL_RESPONSE, // responses to internal calls
        COUNTER_MUTATION,
        // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
        UNUSED_1,
        UNUSED_2,
        UNUSED_3,
        ;
        // remember to add new verbs at the end, since we serialize by ordinal
    }
    public static final Verb[] VERBS = Verb.values();

    public static final EnumMap<StorageService.Verb, Stage> verbStages = new EnumMap<StorageService.Verb, Stage>(StorageService.Verb.class)
    {{
        put(Verb.MUTATION, Stage.MUTATION);
        put(Verb.BINARY, Stage.MUTATION);
        put(Verb.READ_REPAIR, Stage.MUTATION);
        put(Verb.READ, Stage.READ);
        put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
        put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've just copied old behavior here
        put(Verb.STREAM_REQUEST, Stage.STREAM);
        put(Verb.RANGE_SLICE, Stage.READ);
        put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
        put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
        put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
        put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
        put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
        put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
        put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
        put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
        put(Verb.TRUNCATE, Stage.MUTATION);
        put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
        put(Verb.INDEX_SCAN, Stage.READ);
        put(Verb.REPLICATION_FINISHED, Stage.MISC);
        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
        put(Verb.COUNTER_MUTATION, Stage.MUTATION);
        put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
        put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
        put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
    }};

第二个部分是建立了2个线程池,一个是用来处理短任务,一个是用来处理非周期性长任务,如下:

    /**
     * This pool is used for periodic short (sub-second) tasks.
     */
     public static final RetryingScheduledThreadPoolExecutor scheduledTasks = new RetryingScheduledThreadPoolExecutor("ScheduledTasks");

    /**
     * This pool is used by tasks that can have longer execution times, and usually are non periodic.
     */
    public static final RetryingScheduledThreadPoolExecutor tasks = new RetryingScheduledThreadPoolExecutor("NonPeriodicTasks");

第三个部分是实例化了一个标记为public static final的StorageService对象。在实例化的过程中,首先注册了谓词处理(register the verb handlers),然后又初始化了一个org.apache.cassandra.streaming.StreamingService(是的,层层调用。。。),不过这个类的初始化仅仅是做了实例化,实例化的过程中除了注册Mbean什么都没做。。。

5、获取网络地址和端口(address and port)

6、ColumnFamilyStore初始化

同样的,这个类的初始化是在第一次调用到这个类时完成的,过程是完成所有static的定义和static块中代码的执行。

带有static的定义有3个,分别是

  • flushSorter :一个用来执行flush任务中的排序阶段的线程池,由于是CPU密集型(CPU-bound),所以会根据处理器数量的线程。
  • flushWriter :在排序之后,由此线程池进行写磁盘,由于是磁盘密集型(disk-bound),所以可以和flushSourter同时运作。注意,flushSorter和flushWriter处理的是Memtable和BinaryMemtable的flush,对于BinaryMemtable的flush,这两个线程池已经足够了,这两个线程池都是private的
  • postFlushExecutor : 不同于上面两个线程池,这个是用来处理live Memtable的flush。live Memtable的flush更复杂一些,需要switchMemtable做额外的2件事(这里只会由switchMemtable去调用submitFlush):第一,将这个Memtable放到memtablesPendingFlush中,直到flush完成,并且它已经被转为SSTableReader,加到了ssTable_中;第二,等到flush完成后,在commitLogUpdater中加入一项标记,markCompacted,调用onMemtableFlush。这允许在多核系统中多个flush同时进行,并且以正确的顺序调用onMemtableFlush,正确的顺序对于replay很重要,否就就要restart,因为当onMemtableFlush被调用时,是假设在给定位置之上的内容都已经被固化到SSTable中了。注意到这个线程池是public的,所以是会被别的类调用的,在这个类中有2处调用,一处是添加已flush的标记到commit log的header中,一处是添加已compact的标记到commitlogUpdater中。

static块中,则从StorageService.tasks线程池中建立一个线程,设置延迟1秒开始,两次执行间隔为1秒,执行的内容是一个org.apache.cassandra.db.MeteredFlusher对象。

7、MeteredFlusher线程的定期运作

首先,先获取非活跃Memtable的大小。

然后就是flush的操作,flush的过程分2部分,设m是管线pipeline中能够存在的最大Memtable数量,第一部分则是将所有已使用内存超过已分配内存的1/m的ColumnFamily进行flush,第二部分则是对剩下的Memtable根据大小进行排序,对超过阀值的进行flush

8、清洗系统表的目录

这一步是对系统表所在的目录做清理,保证作为系统基石的系统表不会出错,可以理解,系统有可能在任何时候down机,于是固化到硬盘上的数据文件也就可能处于各种各样的脏数据状态,这里就是为了纠正这些可能的错误,虽然手段很粗暴。具体来说,就是对每个系统表调用ColumnFamilyStore.scrubDataDirectories方法,代码如下:

        // check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
        // we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
        // until system table is opened.
        for (CFMetaData cfm : DatabaseDescriptor.getTableMetaData(Table.SYSTEM_TABLE).values())
            ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);

在方法的执行过程中,所做的工作是移除相应的ColumnFamily所在目录下的不必要的文件,所谓不必要的文件,包括以下这些:临时文件(temp files),孤儿(orphans,指缺少data file的),零长度文件(zero-length files),已合并sstable(compacted sstables)。至于无法辨认的文件,将会被略过。经过这个方法之后,一系列满足上述描述的Descriptor将会被移除。

在具体的实现中,首先处理掉的是各个temp files和compacted sstables,因为有标记,所以很好清除;其次,是orphans,需要检查data file的标记;再其次,是未完成的cache文件,需要对文件名进行匹配,然后删掉;最后是清理这个ColumnFamily的一级索引,具体的操作是对一个ColumnFamily中的各个Column的Index做一次scrubDataDirectories递归调用,这里有趣的是,在原本该填入column family name的地方,填入的是针对某个column的indexName,虽然我还没有看到那部分的代码,但是从这里可以推测,Cassandra中主索引的实现,是以特殊的ColumnFamily的方式实现的。

9、Table的初始化

全名是org.apache.cassandra.db.Table。这个类的初始化自然也是在第一次调用到它的时候才会发生,有趣的是,这个类的第一次调用发生的位置是有可能不确定的。

第一处出现的地方是在SystemTable的checkHealth()方法中:

            table = Table.open(Table.SYSTEM_TABLE);

checkHealth()会在AbstractCassandraDaemon的setup()方法中被调用,这是真正的起始调用处。所以调用过程其实是这样:AbstractCassandraDaemon.setup()->SystemTable.checkHealth()->Table.open(Table.SYSTEM_TABLE)

第二处出现的地方是在ColumnFamilyStore的all()方法中:

        for (Table table : Table.all())

真正的起始调用处是在MeteredFlusher中的run()方法,调用过程是这样:MeteredFlusher.run()->MeteredFlusher.countFlushingBytes()->ColumnFamilyStore.all()->Table.all()->Table.open(tableName)

由于这两个调用的地方处于不同的线程之中,所以谁先被执行到是不确定的,然而,重要的地方是Table这个类的很多方法是需要在不同线程中进行同步的。也就是说,这些方法同一时间只能有一个线程去访问。open()方法就是其中一个。

有点扯远了,不过,明白Table的用处对于理解它的初始化是有帮助的。这里的static块中的代码没有什么特点,仅仅是检查相关的keyspace的目录有没有建立好,没建立好就建一下。更加重要的是Table中各种变量的存在意义。

如同我之前的文章所说,keyspace其实就是table,在代码中也得到了体现。首先,是2个static的变量,一个是重用读写锁:

    /**
     * accesses to CFS.memtable should acquire this for thread safety.
     * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
     *
     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
     */
    static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();

一个就是用来储存所有Table的实例映射的变量:

    /** Table objects, one per keyspace.  only one instance should ever exist for any given keyspace. */
    private static final Map<String, Table> instances = new NonBlockingHashMap<String, Table>();

其他的非static变量则是一个table应该拥有的属性:name,columnFamilyStores(每个column family都有一个columnFamilyStore),indexLocks,flushTask,replicationStrategy。

无论如何,Table.open()方法都是至关重要的一个入口。其实这个方法做的事情很简单,就是从instances中取得相应名字的table,如果找不到的话,就新建一个。需要注意的是必须保证每个keyspace只有一个table对象,所以在新建table的代码块外面加上了synchronized标记,用来在多线程中同步。

Table的构造器则做是在读取配置之后,建立各个实例属性(也就是上面提到的那些,name,columnFamilyStores,indexLocks,flushTask,replicationStrategy)。

10、系统表健康检查

说是健康检查,实际上也就是打开系统表的一个Column Family,比了比里面的值。

第一步打开系统表:

            table = Table.open(Table.SYSTEM_TABLE);

这一步的结果有两种:1)成功,继续下面的操作;2)发生异常,注释上说的是当更改了partitioner(从OPP改成RP)的时候会发生,系统表的文件还在,却无法读取。

第二步是读取系统表中的一部分(一行两列),如下面的子表(粗体是具体值,带引号的是字符串,不带的是变量,灰色是变量名,通常都是常量,蓝色是说明,下同):

column family: SCHEMA_CF=”Schema
columns: PARTITIONER=”Partioner CLUSTERNAME=”ClusterName
Key: LOCATION_KEY=”L  value  value

具体代码如下:

        SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance);
        cols.add(PARTITIONER);
        cols.add(CLUSTERNAME);
        QueryFilter filter = QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF), cols);
        ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);

此处的情况就略为复杂一些,可能发生的结果有三种:

  • 1)cf不为空,读取成功,great;
  • 2)cf为空,于是再次尝试取得名为 STATUS_CF 的系统表中的ColumnFamily,检查其中的SSTable部分:
    • 2.1)如果SSTable部分不为空,那就抛出异常,因为这意味着这个CF的文件存在,但是却读不出来,这种情况是在更改了partitioner(从RP改成了OPP)的时候会发生。如果为空,则进入2.2;
    • 2.2)如果SSTable部分为空,意味着没有找到系统表的这部分文件,没关系,这会被认为是新的节点,于是进入处理新节点的流程;

此外,这里的QueryFilter对象的建立,ColumnFamily对象的获得,都是一系列比较复杂的过程,这也是后话。

第三步是检查读取到的partitioner和clustername是否与从配置文件中读取的相同,我们需要他们相同!

11、载入Schema

第一步是尝试获得最新的MigrationId。获取的位置依然是System Table,存入UUID类型的变量中,这里假设这个变量是version吧,version下面还会用到,讲起来比较方便。具体位置如下表:

column family: SCHEMA_CF=”Schema
columns: LAST_MIGRATION_KEY=”Last Migration
Key: LAST_MIGRATION_KEY=”Last Migration  value

第二步,判断这个值是否找到,这时,会发生2种情况:1,找不到,表现为version == null。然后此时再查看是否有数据文件存在,如果数据文件还在,那么就是系统无法读取schema,需要用户用CLI重新定义schema,如果没有数据文件存在,那么很好,说明是一个什么表都没建立的空节点;2,找到了这个版本号,那么就会调用org.apache.cassandra.db.DefsTable.loadFromStorage(UUID version)方法。

第三步,接上一步的最后一个分支,是从存储中载入某个版本的keyspace。具体来说,是从System Table的下面DEFINITION_SCHEMA_COLUMN_NAME列中读出一个json串

column family: SCHEMA_CF=”Schema
columns: DEFINITION_SCHEMA_COLUMN_NAME=”Avro/Schema 每个keyspace一个column,name这里是keyspace的name,直接是文本
Key: version  value是Schema的JSON 每个keyspace的定义, 根据左边的schema编码之后的数据,读出后需要用相应的schema解码

然后依次经过几个类型的容器,IColumn=>ByteBuffer=>String=>Schema,获得一个Schema对象。最终通过反序列化得到所有keyspace的元数据的集合Collection<KSMetaData>

  • IColumn=>ByteBuffer:直接取IColumn的value
  • ByteBuffer=>String:调用ByteBufferUtil.string(value),获得一个json字符串
  • String=>Schema:调用Schema.parse(s),这个方法来自Avro项目,具体的过程不在次讨论,简单介绍一下Avro中的Schema支持,以下翻译自(http://www.cloudera.com/blog/2009/11/avro-a-new-format-for-data-interchange/):“

    Avro使用JSON来定义一个数据结构的schema。举例来说,一个二维的点可以定义成以下的Avro记录:

    {“type”: “record”, “name”: “Point”,
    “fields”: [
    {“name”: “x”, “type”: “int”},
    {“name”: “y”, “type”: “int”},
    ]
    }

    这个记录的每个实例都被序列化成简单的两个整数,没有额外的每记录(per-record)或每域(per-field)的注解。整数使用可变长的zig-zag编码写下。因此,较小坐标值的点就能仅用两个字节来写下:100个点会需要大概200字节。

    在记录(records)类型和数值(numeric)类型之外,Avro还包括了对数组(arrays),映射(maps),枚举(enums),变量(variable),定长二进制数据(fixed-length binary data)以及字符串 (strings)的支持。Avro还定义了一个容器文件格式(container file format),以提供良好的支持给MapReduce以及其他分析框架。细节见Avro specification.

  • Collection<KSMetaData>:使用第三步开始得到的Schema对象依次反序列化这个ColumnFamily中的其他Column的值,每次生成一个KsDef类的对象,合起来就是所有的keyspace的元数据集合。这个Schema相当于是keyspace元数据的元数据。

第四步,对所有的keyspace:创建keyspace名到columnfamily名的映射<cfm.ksName, cfm.cfName>,然后添加到CFMetaData类的静态变量cfIdMap中;然后将这个keyspace的元数据,连同版本号一起添加到DatabaseDescriptor的相应静态变量中。

12、清洗所有表的目录

类似上述第8步,只是清洗的对象是所有的表,因为他们的元数据已经由上一步获得了,存在了DatabaseDescriptor.tables里。

13、打开所有表

对所有的表执行Table.open(table)方法。代码如下:

        // initialize keyspaces
        for (String table : DatabaseDescriptor.getTables())
        {
            if (logger.isDebugEnabled())
                logger.debug("opening keyspace " + table);
            Table.open(table);
        }

14、启动垃圾收集检查器的定期运作

这里的垃圾收集检查器是指GCInspector类,这个类也用到了Singleton Pattern,只被实例化一次。但是注意这一点:这个类的作用是定期对sun的类进行垃圾收集。实例化时,会在MBeanServer中注册。然后会启动一个线程定期运作。定期运作时,如果当完成一次完整的垃圾收集后仍然使用很多的内存,则会根据需要进行1)降低缓存大小;2)flush最大的Memtable。

代码就一句:

        try
        {
            GCInspector.instance.start();
        }
        catch (Throwable t)
        {
            logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
        }

15、CommitLog恢复

CommitLog的恢复是在server端启动时完成的,作用是处理未完成的操作。

第一步,从DatabaseDescriptor中获得commitlog的位置,然后检查大小,看是否需要恢复(空文件不需要恢复)。如果需要,那么对需要恢复的commitlog进行排序(会有多个commitlog文件),排序依据是修改时间

第二步,使用这些commitlog文件进行恢复。这一步是CommitLog恢复的主要操作所在,过程还是比较复杂,而且似乎不同版本间的这个方法是不一样的,这里的仅对0.8.0。

  1. 获得所有sstable文件的ReplayPosition。
    先来说说ReplayPosition,这个类有用的是两个属性:segment和position。segment在程序里表现为时间点的值,用这个语句生成:System.currentTimeMillis(),segment会在2个地方出现(至少到目前为止,我只看到两个地方),一个是commitlog的文件名中,另一个是sstable4元组(一个SSTable包括Data.db,Filter.db,Index.db和Statistics.db)中的Statistics.db。而position则是文件中的文件指针的位置,用来标记从文件的何处开始读。于是,这里要做的就是从每个ColumnFamily的各个SSTable的Statistics.db文件中读出每个SSTable的ReplayPosition,然后取每个ColumnFamily的各个SSTable的ReplayPosition中的最大值,存入cfposition变量中。具体的文件格式在以后的文章中再说。
  2. 获取cfposition中最小的那个ReplayPosition,存入globalPosition。这里的最小指的是segment最小,也就是时间最早,代表着上一次系统结束前最早操作的那个column family的时间点。这个时间点后面会用到。
  3. 获取commitlog中最小的那个的segment。正如上面所说,commitlog的文件名中就包含了这个segment,所以直接解析即可。
  4. 对每个commitlog,获得执行恢复操作所需要的commitlog文件的位置。判断的依据是比较globalPosition.segment和commitlog.segment(代码中不是这样写的,但是这里这样写方便叙述,包括下面提到的replayPosition为了方便叙述也写成commitlog.replayPosition)的值,会发生三种情况:
    1)globalPosition.segment<commitlog.segment:说明这个commitlog所做的操作还没有执行,所以把commitlog.replayPosition值设为0
    2)globalPosition.segment==commitlog.segment:说明这个commitlog所做的操作正杂执行,所以把commitlog.replayPosition值设为globalPosition.position
    3)globalPosition.segment>commitlog.segment:说明这个commitlog所做的操作已经执行过了,所以把commitlog.replayPosition值设为reader.length(),也就是这个commitlog的末尾。
  5. 如果要恢复,从replayPosition每次读取一项(entry),然后将读取到的每一项反序列化成RowMutation对象
  6. 将得到的RowMutation对象添加到SEDA中MUTATION阶段的线程池中,执行提交。
    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
  7. 最后等所有的更改结束之后flush涉及到的table。

第三步,删除所有的commitlog。

16、启动服务器

第一步,获得各节点的Token,即一系列<token,endpoint>,获取地址在系统表中,具体如下:

column family: STATUS_CF=”LocationInfo
columns: column name是各token值
Key: RING_KEY=”Ring column value是相应的endpoint值

第二步,更新系统中使用到token的地方,一处时tokenMetadata_变量,一处是Gossiper。

第三步,定义了一个ShutdownHook,添加到了Runtime中。

第四步,加入到Token Ring中。从这里开始是真正开始把服务器当做网络中的一个节点,所以各种设置监听。

首先就是启动Gossiper。Gossiper作为Cassandra中很有特色的一个东西,还是比较重要的,其作用是维护集群的状态,通过gossip,每个节点都能知道集群中包含哪些节点,以及这些节点的状态,这使得Cassandra集群中的任何一个节点都可以完成任意key的路由,任意一个节点不可用都不会造成灾难性的后果。Gossiper的详细介绍以后再说,这里仅给出涉及到的部分。

  1. Gossiper采用 Singleton Pattern,仅有一个实例,在实例化的时候,做了两件事:设置了两个时间长度,用作gossip过程中的时间上限,然后将这个Gossiper实例注册为FailureDetector的一个监听器。Gossiper和FailureDetector的羁绊在于IFailureDetectionEventListener接口,其中只有一个方法 convict(InetAddress ep),即标记一个节点死了,FailureDetector实现了”The Phi Accrual Failure Detector”,做出判决,然后由各监听器来执行。
  2. 在启动Gossiper之前,注册了2个预订用户(subscribers):StorageService实例和MigrationManager实例,他们的共同特点是要实现IEndpointStateChangeSubscriber接口,这个接口定义了5个方法,用于某节点a通知它感兴趣的那部分(?interested parties,不知道怎么翻。。)关于任意endpoint的状态改变。这个5个方法分别是:onJoin,onChange,onAlive,onDead,onRemove。
            // have to start the gossip service before we can see any info on other nodes.  this is necessary
            // for bootstrap to get the load info it needs.
            // (we won't be part of the storage ring though until we add a nodeId to our state, below.)
            Gossiper.instance.register(this);
            Gossiper.instance.register(migrationManager);
            Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.

    然后,Gossiper启动,参数是新的版本号。在启动过程中,首先获得所有的seed节点。然后启动本机的心跳状态(HeartBeatState),将本机状态设为alive,并将<netAddress, EndpointState>映射存入endpointStateMap。接下来通知snitches,告诉他们gossip要开始了。最后启动定期运作的线程,定期GossipTask类中的run方法,运作的内容如下(摘自袁大星的文档):

    Cassandra内部有一个Gossiper,每隔一秒运行一次(在Gossiper.java的start方法中),按照以下规则向其他节点发 送同步消息:

    1) 随机取一个当前活着的节点,并向它发送同步请求

    2) 向随机一台不可达的机器发送同步请求

    3) 如果第一步中所选择的节点不是seed,或者当前活着的节点数少于seed数,则向随意一台seed发送同步请求

    第一和第二步好理解,通过第一步可以和当前活着的节点同步状态,以更新本地的状态,通过第二步可以尽早发现不可用的节点重新可用了。

    第三步中的第一个条件,如果第一步中的节点不是seed,则向随意一台seed发送同步请求也比较好理解,因为seed理论上总是有较多的节点状态 信息。

    第三步中第二个条件则有点难理解,当活着的节点数少于seed时,也需要向随机的seed发送同步消息。其实这里是为了避免出现seed孤岛。

    如果没有这个判断,考虑这样一种场景,有4台机器,{A, B, C, D},并且配置了它们都是seed,如果它们同时启动,可能会出现这样的情形:

    A节点起来,发现没有活着的节点,走到第三步,和任意一个种子同步,假设选择了B

    B节点和A完成同步,则认为A活着,它将和A同步,由于A是种子,B将不再和其他种子同步

    C节点起来,发现没有活着的节点,同样走到第三步,和任意一个种子同步,假设这次选择了D

    C节点和D完成同步,认为D活着,则它将和D同步,由于D也是种子,所以C也不再和其他种子同步

    这时就形成了两个孤岛,A和B互相同步,C和D之间互相同步,但是{A,B}和{C,D}之间将不再互相同步,它们也就不知道对方的存在了。

    加入第二个判断后,A和B同步完,发现只有一个节点活着,但是seed有4个,这时会再和任意一个seed通信,从而打破这个孤岛。

  3. MessagingService开始监听本地地址。这是用于消息传递的类。
  4. StorageLoadBalancer开始广播。这个类是这篇文章《Scalable range query processing for large-scale distributed database applications》的实现。用于监视负载信息,必要时会进行负载平衡,运行间隔是5分钟。
  5. 向所有seed节点声明自己的版本号。先用消息进行积极(actively)的声明,最后使用Gossip进行消极(passively)的声明。
  6. 在Gossiper的本地映射中添加应用状态。
            MessagingService.instance().listen(FBUtilities.getLocalAddress());
            StorageLoadBalancer.instance.startBroadcasting();
            MigrationManager.announce(DatabaseDescriptor.getDefsVersion(), DatabaseDescriptor.getSeeds());
            Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
  7. HintedHandOffManager注册Mbean。
  8. 判断是不是AutoBootstrap,然后分别有不同的处理流程。如果不是AutoBootstrap的话,那么就设置一下token就好了。如果是AutoBootstrap的话,事情就大条了,先要用StorageLoadBalancer获得负载信息,然后看看这个节点是不是已经是token ring中一部分,是的话就异常了,有幸进行下去的话,用获得的负载信息来决定本机的token值,然后再用这个token启动。

17、载入mx4j

这是为了能够使用JMX。实现的代码里充斥着reflection。。。

三、start

启动RPCServer。最终是调用了这个方法:

    protected void startServer()
    {
        if (server == null)
        {
            server = new ThriftServer(listenAddr, listenPort);
            server.start();
        }
    }

四、后记

在拖延症的影响下,这篇日志写了2个多月,真是汗颜啊。。。当然,也有第一次看这个代码的原因,往往一个方法要搞懂得看好久,不过,完成了这篇日志,还是很有好处的,了解了许多实现内部的细节,以后应该就能看的快些了。最后一些部分有些流水账,写的太急。还有一些方法的细节部分没有写,准备以后专门用别的日志来写,这篇已经太长太长了。。。

Cassandra 0.8.0 源码分析——数据类型

一、Introduction

Cassandra中用到了一些可排序的数据类型,也就是放在db.marshal包中的那些类,他们实际上就是将一些基本类型,比如String,BigInteger,Long,ByteBuffer,UUID等,封装起来,最重要的是实现了Comparator接口,让这些封装后的数据类型可以用于排序集(例如SortedSet,TreeSet等),封装的过程中还加入了一些特有的方法。这些类的关系如下图(此外还有个异常类,不太重要):

二、设计特点

首先看看处于顶端的抽象类,AbstractType<T>,这个类虽然是抽象类,但是每次调用它的子类的构造器时总是会调用到它的构造器,这是其中的一个设计特点,先来看一个典型的AbstractType<T>的子类的域和构造器部分:

public class BytesType extends AbstractType<ByteBuffer>
{
    public static final BytesType instance = new BytesType();

    BytesType() {} // singleton
    ...
}

可以看到,这里使用了设计模式中的Singleton Pattern。仅有的那个静态常量是instance,当第一次调用到ByteType时,instance被初始化,实际上,instance成为了唯一的对外实例化ByteType的手段,而且成为了唯一的ByteType实例。这里用到的是传统的Singleton Pattern实现方法,一旦类被初始化,那么就会实例化,更好的是Bill Pugh的方案,不过已经不在本文讨论范围内。然而,可以看到,构造器中并无代码,于是,无疑是其父类的构造器被调用了,也就是AbstractType<T>中的构造器,那么,来看看AbstractType<T>的域和构造器部分:

public abstract class AbstractType<T> implements Comparator<ByteBuffer>
{
    public final Comparator<IndexInfo> indexComparator;
    public final Comparator<IndexInfo> indexReverseComparator;
    public final Comparator<IColumn> columnComparator;
    public final Comparator<IColumn> columnReverseComparator;
    public final Comparator<ByteBuffer> reverseComparator;

    protected AbstractType()
    {
        indexComparator = new Comparator<IndexInfo>()
        {
            public int compare(IndexInfo o1, IndexInfo o2)
            {
                return AbstractType.this.compare(o1.lastName, o2.lastName);
            }
        };
        indexReverseComparator = new Comparator<IndexInfo>()
        {
            public int compare(IndexInfo o1, IndexInfo o2)
            {
                return AbstractType.this.compare(o1.firstName, o2.firstName);
            }
        };
        columnComparator = new Comparator<IColumn>()
        {
            public int compare(IColumn c1, IColumn c2)
            {
                return AbstractType.this.compare(c1.name(), c2.name());
            }
        };
        columnReverseComparator = new Comparator<IColumn>()
        {
            public int compare(IColumn c1, IColumn c2)
            {
                return AbstractType.this.compare(c2.name(), c1.name());
            }
        };
        reverseComparator = new Comparator<ByteBuffer>()
        {
            public int compare(ByteBuffer o1, ByteBuffer o2)
            {
                if (o1.remaining() == 0)
                {
                    return o2.remaining() == 0 ? 0 : -1;
                }
                if (o2.remaining() == 0)
                {
                    return 1;
                }

                return -AbstractType.this.compare(o1, o2);
            }
        };
    }
    ...
}

可以看到,其实这里并没有做很多特别的操作,仅仅是初始化了5个Comparator,以供以后的子类调用,由于这5个对象仅仅实现了一个Comparator接口,所以它们存在的意义也就仅仅是提供一个compare方法,然后就可以作为参数填入。

除了上述的工作,剩下的就是实例化的意义。一旦实例化之后,就相当于得到了一个实现了Comparator接口的实例,于是在很多方法的调用中都可以将其填入。此外,Singleton的意义在于这个实例成为了某种特定的类型的一系列操作工具的入口,而不是什么保存某个特定类型数据的地方。

三、基本类型与封装后类型的关系

这里有一个问题,就是实现的接口Comparator<T>中,为什么填入的是ByteBuffer,这里有一篇关于Java中的Buffer的文章(http://zcdxzsz.iteye.com/blog/310917),ByteBuffer的存在相当于是所有其他基本Buffer的综合,然后在Buffer中以Byte的形式进行操作。然后,Comparator<ByteBuffer>迫使所有的最终子类实现一个方法,作为排序的依据:

    public int compare(ByteBuffer o1, ByteBuffer o2)

实现Comparator<ByteBuffer>的意义也就止于此。至于上面提到的5个Comparator,我的理解是他们没有独特的操作,连写一个子类的价值都没有,所以就只有5个变量而已 。

下一个让人关注的问题就是泛型AbstractType<T>是怎么工作的,自然,T的填入是根据需要封装的类型而决定的,具体来说,是下面几个:String,BigInteger,Long,ByteBuffer,UUID。填入的具体的T有什么作用呢?来看一看AbstractType<T>中是怎么定义的:

    public abstract T compose(ByteBuffer bytes);

    public abstract ByteBuffer decompose(T value);

    /** get a string representation of a particular type. */
    public abstract String toString(T t);

    /** get a string representation of the bytes suitable for log messages */
    public abstract String getString(ByteBuffer bytes);

    /** get a byte representation of the given string.
     *  defaults to unsupportedoperation so people deploying custom Types can update at their leisure. */
    public ByteBuffer fromString(String source) throws MarshalException
    {
        throw new UnsupportedOperationException();
    }

    /* validate that the byte array is a valid sequence for the type we are supposed to be comparing */
    public abstract void validate(ByteBuffer bytes) throws MarshalException;

    /** @deprecated; use reverseComparator field instead */
    public Comparator<ByteBuffer> getReverseComparator()
    {
        return reverseComparator;
    }

    /* convenience method */
    public String getString(Collection<ByteBuffer> names)
    {
        StringBuilder builder = new StringBuilder();
        for (ByteBuffer name : names)
        {
            builder.append(getString(name)).append(",");
        }
        return builder.toString();
    }

    /* convenience method */
    public String getColumnsString(Collection<IColumn> columns)
    {
        StringBuilder builder = new StringBuilder();
        for (IColumn column : columns)
        {
            builder.append(column.getString(this)).append(",");
        }
        return builder.toString();
    }

    public boolean isCommutative()
    {
        return false;
    }

    /** returns the class this AbstractType represents. */
    public abstract Class<T> getType();

    //
    // JDBC metadata
    //

    public abstract boolean isSigned();
    public abstract boolean isCaseSensitive();
    public abstract boolean isCurrency();
    public abstract int getPrecision(T obj);
    public abstract int getScale(T obj);
    public abstract int getJdbcType();
    public abstract boolean needsQuotes();

可见,关于T的重要的是compose()和decompose()方法,涉及到如何被封装的类型如何与ByteBuffer之间进行转换,其他涉及到T的方法,toString(),getType(),getPrecision(),getScale(),都是不甚重要的,甚至有些子类都不会使用getPrecision()和getScale(),估计是历史遗留问题。

于是,看完这些之后,就明白,marshal包中的类的作用就是

  1. 提供ByteBuffer中的compare()方法,使其能在排序集中使用;
  2. 提供被封装的类型与ByteBuffer之间的转换;
  3. 封装原始类型到具体的自定义类型,提供各类型的特有方法。

Cassandra 0.8.0源码包简介

一、overall

这个是Cassandra 0.8.0在Eclipse中配置完毕之后的有关于源码的部分,截图如下:



在正确配置之后,这个工程中应该有7个文件夹,分别是(1)java源代码(src/java);(2)生成java源代码(src/gen-java);(3)thrift生成java源代码(interface/thrift/gen-java);(4)cql的jdbc源代码(drivers/java/src);(5)cql的jdbc测试代码(drivers/java/gen-java);(6)单元测试代码(test/unit);(7)长时间测试代码(test/long)。

虽然文件夹是这么多,其实可以对所有的代码分个类,第一类是纯源代码,第二类是生成代码,第三类是测试代码。如果按照产品进行分类的话,那么也可以分为:第一类是Cassandra服务器端,第二类是Cassandra的Cli,第三类是CQL的JDBC,不过这里依赖关系比较多,所以也不尽然。

二、生成代码(gen-java)

那么先从不太重要的讲起,生成代码。所谓生成代码,就是在src版本中不存在,而是在build的时候生成的代码,在src版本中只有关于这些内容的中间语言版本。我的理解是,这是为了跨语言,比如我现在用的是java,那么就把那中间语言翻译成java,然后生成相应的java文件进行使用。

在上述的代码中,src/gen-java和interface/thrift/gen-java中的代码都是生成代码。Cassandra现在看来真是一个庞然大物,糅合了各种技术在其中,这里的生成代码的技术就用到了3种。

  • Antlr
    这个东西主要是生成自己的语法解析器,简单来说,用之前的JLine进行命令行的IO,然后把获得的命令在这里与相应的函数进行关联。使用的包是lib文件夹下的antlr-3.2.jar。具体用法见这里:http://wenku.baidu.com/view/d8580e5f804d2b160b4ec074.html。在源代码中,下面2个包使用Antlr生成的:src/gen-java下的org.apache.cassandra.cliorg.apache.cassandra.cql,原始文件是src/java/org/apache/cassandra/cli/Cli.gsrc/java/org/apache/cassandra/cql/Cql.g。具体的生成顺序(根据build.xml中),首先是检查生成语法(check-gen-xxx-grammar),从一个*.g 文件生成一个*.token文件,比如,在CLI中,就是从Cli.g文件生成了Cli.token文件,然后生成语法解析器,用生成xxxLexer.java和xxxParser.java,比如,在CLI中,生成了CliLexer.java和CliParser.java。
  • Avro
  • Thrift
    Avro和Thrift放在一起讲,下面的话引自这里:http://www.webguo.com/2011/02/11/avro_vs_thrift.html。“Avro和Thrift都是跨语言,基于二进制的高性能的通讯中间件. 它们都提供了数据序列化的功能和RPC服务. 总体功能上类似,但是哲学不一样. Thrift出自Facebook用于后台各个服务间的通讯,Thrift的设计强调统一的编程接口的多语言通讯框架. Avro出自Hadoop之父Doug Cutting, 在Thrift已经相当流行的情况下Avro的推出,其目标不仅是提供一套类似Thrift的通讯中间件更是要建立一个新的,标准性的云计算的数据交换和存储的Protocol。 这个和Thrift的理念不同,Thrift认为没有一个完美的方案可以解决所有问题,因此尽量保持一个Neutral框架,插入不同的实现并互相交互。而Avro偏向实用,排斥多种方案带来的 可能的混乱,主张建立一个统一的标准,并不介意采用特定的优化。Avro的创新之处在于融合了显式,declarative的Schema和高效二进制的数据表达,强调数据的自我描述,克服了以往单纯XML或二进制系统的缺陷。Avro对Schema动态加载功能,是Thrift编程接口所不具备的,符合了Hadoop上的Hive/Pig及NOSQL 等既属于ad hoc,又追求性能的应用需求.”在源代码中,使用Avro生成的是以下3个包:src/gen-java中的org.apache.cassandraorg.apache.cassandra.db.migration.avro以及org.apache.cassandra.utils.avro,他们都是从文件src/avro/internode.genavro中生成的。在源代码中,使用Thrift生成的是interface/thrift/gen-java中的org.apache.cassandra.thrift包,原始文件是interface/cassandra.thrift文件。

三、测试代码(test/unit, test/long and drivers/java/test)

没什么好讲的,就是针对每个包中的代码的测试。

四、源代码(src/java and drivers/java/src)

src/java中有34个包,有内容的有33个。正如同Cassandra的论文中的行文结构一样,Cassandra其实没有什么层次性,甚至连系统架构图都没有,然后再看各个包的话,可以发现其实是针对一个分布式系统的不同方面的实现,比如在论文中的系统架构章节提到的几个部分:Partitioning,Replication,Membership,Bootstrapping,Scaling the Cluster,Local Persistence,还有一些其他的实现技术,比如SEDA等等。这几个部分算是系统的几个主要模块,但是包与包之间又不是简单的模块关系,反而更像是根据分布式系统各部分相关论文的技术而做的实现,然后再以一种粗暴的方法拼接在一起,不牢固的地方再加上其他的实现加以过度。当然,这是我粗看之下的愚见,不能作数,但是Cassandra其实算不上优雅,更像是一个缝合憎恶。

所以,将这些包分成几个大的部分比较好:

  • 存储和数据模型
    这个部分如同名字,其实应该分为2个部分,但是由于这两部分的实现过于紧密(比如SStable和Memtable的实现就在不同的包中),只好归为一类。作为最底层的部分,存储部分的主要参照是Bigtable中的SStable+Memtable模型,使用这个模型就不可避免地需要用到BloomFilter和Compaction,当然还有一些cache的技术也算在这个部分,数据模型部分的主要参照也是Bigtable中的Keyspace+Row+ColumnFamily+Column的模型,不过Cassandra又在其中加上了SuperColumn一层,除了模型的实现,还有模型提供的各种操作的实现也在其中,此外,还要加上CommitLog的相关实现。具体的细节不在这里讨论,相关的包是以下几个:
    org.apache.cassandra.io
    org.apache.cassandra.io.sstable
    org.apache.cassandra.io.util
    org.apache.cassandra.db
    org.apache.cassandra.db.columniterator
    org.apache.cassandra.db.commitlog
    org.apache.cassandra.db.context
    org.apache.cassandra.db.filter
    org.apache.cassandra.db.marshal
    org.apache.cassandra.db.migration 
    org.apache.cassandra.cache 
  • P2P
    这个部分是用来实现P2P,也就是去中心化架构的,主要是实现了一个分布式哈希表(DHT),其中实现了几种不同的Partition方案供选择。相关的包是这个:
    org.apache.cassandra.dht 
  • 副本
    这个部分是用来实现分布式系统中常用的副本机制的,这里也提供了几种副本策略。关于副本的设置问题,在配置文件中有副本策略的设置,此外,还有副本数量的设置是写在每个Keyspace的Metadata中的。相关的包是这个:
    org.apache.cassandra.locator 
  • Gossip
    这个部分实现了Gossip,用来进行分布式系统中的成员管理。相关的类是下面这个:
    org.apache.cassandra.gms 
  • SEDA
    这个部分实现了SEDA的框架,用于更高效的并行操作。相关的类是下面这个:
    org.apache.cassandra.concurrent 
  • 通信
    这个部分实现了分布式系统内部用于通信的消息机制,以及内部传输的数据流机制。相关的包是以下几个:
    org.apache.cassandra.net
    org.apache.cassandra.net.io
    org.apache.cassandra.net.sink 
    org.apache.cassandra.streaming 
  • 请求调度
    对到达的请求进行简单的调度,相关包是:
    org.apache.cassandra.scheduler 
  • 服务封装
    这个部分实现了系统对外的调用接口,无论系统内部是如何实现的,对使用者来说,只需要从这里的服务中进行调用就可以了。此外,与hadoop的接口算在其中。相关的包是下面这2个:
    org.apache.cassandra.service 
    org.apache.cassandra.thrift 
    org.apache.cassandra.hadoop 
  • 客户端
    这个部分包括了用于管理系统的命令行接口和CQL客户端,至于Thrift客户端,那个在生成代码中。相关的包是下面这几个:
    org.apache.cassandra.cli
    org.apache.cassandra.client
    org.apache.cassandra.cql 
    org.apache.cassandra.tools 
  • 配置
    这个部分用来读取配置然后导入到系统中。相关的包是下面这个:
    org.apache.cassandra.config 
  • 授权与安全
    这个部分实现的是简单的用户授权管理和安全管理。相关的包是下面几个:
    org.apache.cassandra.auth
    org.apache.cassandra.security
    org.apache.cassandra.security.streaming  
  • 憎恶的缝合线
    好吧,这里卖萌了,其实这里就是我所说的用来对整个系统进行修修补补的地方,比如为了实现某个功能,但是少了个什么东西,于是就实现了放在这里,具体是下面2个包:
    org.apache.cassandra.utils
    org.apache.cassandra.utils.obs 

2011/07/11 通过JMX与Cassandra交互

1、首先去http://mx4j.sourceforge.net上下载3.0.1版本以上的MX4J库

2、解压后在lib文件夹下找到mx4j.jar和mx4j-tools.jar,拷贝到cassandra/lib下,重启cassandra

3、在jdk的bin目录中找到jconsole,启动,下面

3.1、如果是jconsole和cassandra都在一台机器上,那么在Local Process块中选择 org.apache.cassandra.thrift.CassandraDaemon

3.2、如果不在一台机器上,那么在Remote Process块中输入需要连接的<hostname>:<port>

——————————————–渣翻译来了————————————————————-

JMX:

JMX是一个Java API,通过两种方式提供了application的管理。一,JMX让你了解你的application的健康状况,和总体性能表现(例如memory,threads,和CPU usage),这些都是普遍适用于所有Java application的。二,JMX让你能够接触到你的application中(已经仪表化的application)的特定的方面。

所谓仪表化,就是指将application code包起来,以提供从application到JVM的hook,这使得JVM能够收集一些额外工具能够使用的数据。这些工具包括监视代理(monitoring agents),数据分析工具(data analysis tools),分析器(profilers)等。JMX不光能让你看到这些数据,还能在运行时更改application的值,以管理这些application(如果启动了这项功能)。

JMX所作的控制操作包括以下:

  • 低可用内存的检测,包括堆中每个刻度空间(graduation space)的大小
  • 进程信息,例如死锁检测,进程数量的峰值,以及当前活动进程数量。
  • 详细的类加载器 追踪
  • 日志级别控制
  • 应用的基本信息,例如application的运行时间,classpath等等

jconsole就是一个JMX代理,当然也有其他的JMX代理。

MBeans:

MBean,就是 managed bean,这是一个特殊的java bean类型,代表着JVM中一个可被管理的资源。MBeans与一个MBean server交互,以让他们的功能远程可用。

2011/06/28 BigTable中与Cassandra中的数据模型比较

一、Cassandra中的典型数据结构

二、BigTable中与Cassandra中的数据模型比较

BigTable中的数据模型更加像是建立在关系型数据库的表之上的,一个KeySpace就类似于从前的一个Table。所不同的是,BigTable中加入了几个新的特点:

1,按列存储,对于稀疏表更加友好;

2,Column Family,翻译过来就是列族,其实呢,就是把几个关联性强的列储存在一起,这个概念既是逻辑上的,也是物理上的,但是使用者其实并不需要太过关心于Column Family,设计时也不会从Column Family开始

3,历史数据,这里像是借鉴了数据库概念中的时态数据库(Temporal Database),保留了一个column中的历史数据,于是引入了timestamp。

所以呢,BigTable的数据模型更像是这样:

其中的值部分可能是稀疏的,但是按列存储避免了稀疏大表的资源浪费。

Cassandra中的数据模型更加像是建立在BigTable之上,然后加入了一个SuperColumn的概念。然后无论是网上流传的Cassandra中的数据模型使用方法,还是《Cassandra The Definitive Guide》中所介绍的数据模型部分,似乎都与BigTable中的数据模型迥异了,原本的BigTable还有与传统的Table的类比之处,如KeySpace与Table,Column与Column,但是在Cassandra中,KeySpace类似于Application,ColumnFamily类似于Table。个人觉得原因有两个:

1,由于引入了SuperColumn的概念,使得数据模型从BigTable中的KeySpace->Key->ColumnFamily->Column,变成了KeySpace->Key->ColumnFamily->Column或者KeySpace->Key->ColumnFamily->SuperColumn->SubColumn,于是数据维数上升了,而且变得不确定了,有时4维有时5维的数据模型就缺少一个通用的使用方法。虽然SuperColumn的引入丰富了整个数据模型,但是我认为由于原本的四维数据模型已经足够简约和使用,SuperColumn只能算作锦上添花,而且由于Cassandra本身功能不够强大,如果使用SuperColumn回来带来一些问题,例如索引问题,条件搜索的实现更加困难。

2,Cassandra的固有实现问题。可能在一开始的时候,Cassandra的开发人员对BigTable的模型理解就有偏差(这里并没有对不对的问题,只是与更广泛的理解略有不同),所以KeySpace的实现就不是当做一个Table来实现的,所以才会出现早期版本中必须重启整个Cassandra集群才能添加新KeySapce的情况,在后来的版本中,多KeySpace也并不被鼓励。

所以,Cassandra的数据模型更像是这样:

如图中所示,虽然这是一个KeySpace,但是这个KeySpace实际上是被切成了几份,Key虽然看似是同等的地位,但是实际上是分别专用于不同的ColumnFamily,并不能通用,他们的命名方式也各不相同。总的来说,这也是一个稀疏的大表,只不过空洞部分更加集中(图中XX部分),而有意义的ColumnFamily中(着色部分)空洞则少得多。而由于按列存储的机制,确实使得ColumnFamily看起来更像一个Table了。

比较BigTable和Cassandra中的数据模型,二者其实很相似,如果真的愿意的话,实际上可以互相转化。不过针对BigTable的设计更偏重于以row为单位,一个row中的内容可以非常丰富,而针对Cassandra的设计更偏重于以ColumnFamily为单位,KeySpace的意义被淡化了。

2011/06/20-21 在Eclipse中配置Cassandra_3

9. cassandra-cli在Eclipse中的配置

服务端搞定之后,接下来配置客户端,先分析bin版本中bin目录下的cassandra-cli.bat,有这么一段

:okClasspath
REM Include the build\classes\main directory so it works in development
set CASSANDRA_CLASSPATH=%CLASSPATH%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\thrift"
goto runCli

说明在开发环境中,客户端的classpath需要”%CASSANDRA_HOME%\build\classes\main”和”%CASSANDRA_HOME%\build\classes\thrift”这两个地址,同样的,我们在Eclipse项目中加入以上地址,在Properties->Java Build Path->Libraries点Add Class Folder,加入这两个地址。这样就能成功运行客户端了。

客户端其实是个命令行控制的程序,具体的入口是org.apache.cassandra.cli.CliMain。

10.jline在Windows下的Eclipse中不能正常运行的问题

在Eclipse中虽然运行了cli,但是Eclipse中运行cli会直接跳到了结束,虽然出现了cli启动时的输出,但是还没等输入命令cli就已经中止了(terminated)。那么就一步步运行看看,发现之所以会结束,问题出在下面的代码

String prompt;
        String line = "";
        String currentStatement = "";
        boolean inCompoundStatement = false;

        while (line != null)
        {
            prompt = (inCompoundStatement) ? "...\t" : getPrompt(cliClient);

            try
            {
                line = reader.readLine(prompt);
            }
            catch (IOException e)
            {
                // retry on I/O Exception
            }

            if (line == null)
                return;

            line = line.trim();

            // skipping empty and comment lines
            if (line.isEmpty() || line.startsWith("--"))
                continue;

            currentStatement += line;

            if (line.endsWith(";") || line.equals("?"))
            {
                processStatement(currentStatement);
                currentStatement = "";
                inCompoundStatement = false;
            }
            else
            {
                currentStatement += " "; // ready for new line
                inCompoundStatement = true;
            }
        }

其中

line = reader.readLine(prompt);

这里调用jline包中的方法从Console中读一行,但是实际上在eclipse中反悔了null于是在下面的代码中

if (line == null)
    return;

导致了运行的结束。而真正的原因是在我步进了jline包的源码中时,才发现的(jline-0.9.94下载:http://sourceforge.net/projects/jline/files/jline/0.9.94/),当调用reader.readLine()方法时总是返回null。后来google了一下,发现这个问题不是个别情况,而是一个非常知名的问题,如这篇文章(http://whitesock.iteye.com/blog/692816)中所说,“JLine最知名的问题莫过于在Windows平台下的Eclipse中启动的程序中调用reader.readLine()方法时总是返回null(正确的行为是等待用户输入)”,我下载了最新的版本jline-1.0,问题仍然存在。

根据那篇文章中的解决办法,“笔者发现通过设置jline.WindowsTerminal.directConsole属性为false,可以解决返回null的问题”,我确实解决了问题,代码如下:

jline.WindowsTerminal winTerm=(jline.WindowsTerminal)reader.getTerminal();
winTerm.setDirectConsole(false);

然后,我想看看究竟是哪里导致了这个问题,在CliMain.java中,初始化reader时,调用了jline.ConsoleReader的构造器,在jline.ConsoleReader.class中,发现时以下语句初始化了terminal

this(in, out, bindings, Terminal.getTerminal());

继续跟进,Terminal.getTerminal()是个静态方法,下面是其代码

    public static Terminal getTerminal() {
        return setupTerminal();
    }

这里调用了Terminal.setupTerminal()方法,这个方法实际上是用来判断操作系统是windows还是unix,然后根据情况创建WindowsTerminal或者UnixTerminal,既然出问题的是Windows系统下,那么再继续跟进到WindowsTerminal类中,在这里发现了上文提到的directConsole属性,其初始化的代码写在构造器中,如下:

        String dir = System.getProperty("jline.WindowsTerminal.directConsole");
        if ("true".equals(dir)) {
            directConsole = Boolean.TRUE;
        } else if ("false".equals(dir)) {
            directConsole = Boolean.FALSE;
        }

而具体使用到这个变量的地方,是这里:

    public int readCharacter(final InputStream in) throws IOException {
        // if we can detect that we are directly wrapping the system
        // input, then bypass the input stream and read directly (which
        // allows us to access otherwise unreadable strokes, such as
        // the arrow keys)
        if (directConsole == Boolean.FALSE) {
            return super.readCharacter(in);
        } else if ((directConsole == Boolean.TRUE)
            || ((in == System.in) || (in instanceof FileInputStream
                && (((FileInputStream) in).getFD() == FileDescriptor.in)))) {
            return readByte();
        } else {
            return super.readCharacter(in);
        }
    }

根据我在debug的过程中的步进,发现实际上在这个3分支的判断中,进入的是第二个,也就是directConsole == Boolean.TRUE,这说明在WindowsTerminal的构造器中,System.getProperty(“jline.WindowsTerminal.directConsole”)得到的是true,
但是这种情况是很奇怪的,我在CliMain中加入了以下代码进行debug

        System.out.println("true".equals(System.getProperty("jline.WindowsTerminal.directConsole")));

发现这里打印出的值是false,也就是说,在WindowsTerminal的构造器中directConsole被赋予了错误的值,这就是为什么要手动将其设为false

2011/06/20 在Eclipse中配置Cassandra——配置文件介绍2

接上回

https://hycz.wordpress.com/2011/06/17/cassandra-yaml/

8.

今天介绍一下conf文件夹下的其他配置文件

首先是log4j-server.proprties,这个和cassandra.yaml一样是必须的配置文件。具体的用法参见http://kdboy.iteye.com/blog/208851

下面是passwd.properties,这是一个SimpleAuthenticator的用户密码配置,每行格式是username=password,如果-Dpasswd.mode=MD5,那么密码部分是md5 digest,否则,密码部分是明文。(用户名中空格用’\ ‘)

接下来是access.properties,这是一个SimpleAuthority的访问配置,每行格式是KEYSPACE[.COLUMNFAMILY].PERMISSION=USERS,其中,KEYSPACE是keyspace名,COLUMNFAMILY是column family名,PERMISSION是<ro>或者<rw>,意思是read only或read-write,USERS是一系列用户名,用逗号隔开,这些用户名必须在passwd.properties中定义。例:Keyspace1.Standard1.<rw>=jsmith,Elvis Presley,dilbert

最后是cassandra-topology.properties,这是一个定义cassandra集群拓扑的配置,每行格式是Cassandra Node IP=Data Center:Rack,其中,等号左边是ip,右边是数据中心名和机架名(似乎只是个名字,不用定义),这里主要是用来判断节点的临近性,由org.apache.cassandra.locator.PropertyFileSnitch使用。注意,如果ip不明的话,比如后来加入的节点,那么ip部分就用default代替。如果需要使用ipv6,那么需要在cassandra-env.sh中把其中一个参数设置改为:JVM_OPTS=”$JVM_OPTS -Djava.net.preferIPv4Stack=true”。

2011/06/15-17 在Eclipse中配置Cassandra——配置文件介绍

接上回https://hycz.wordpress.com/2011/06/14/20110614-%E5%9C%A8eclipse%E4%B8%AD%E9%85%8D%E7%BD%AEcassandra2/

7.
今天要对Cassandra进行配置,这话有点拗口,简单来说,就是改一下conf文件夹里的配置文件,因为原来的是默认设置,而且是针对linux的文件系统的,这里要改成windows下的。首先是cassandra.yaml文件(这坑好大。。。慢慢填),主要翻译翻译注释。。。。

1、cluster_name:集群名,主要用来防止机器加入到其他的逻辑集群中
2、initial_token:cassandra中有个token环,每个节点都负责其中的一部分,这个参数就是用来设定这台机器负责哪部分的。如果空着不写,那么就自动取负载最重的节点所拥有的token范围的一半。如果取不到负载信息,那么就随机取一段。
3、auto_bootstrap:cassandra中的节点其实还是分了2种,一个是seed节点,一个是non-seed节点,一个节点要加入集群实际上还是要一个入口,这个入口就是种子节点,所以一次能够加入集群的节点数量是有限的,即种子节点的数量,如果需要加入集群的节点数量多于这个阀值,那么就需要分阶段进行加入操作。这个参数是用来设定一个非种子节点是否在启动时自动从已有节点中迁移数据进来。默认false
4、hinted_handoff_enable:这个参数涉及了cassandra的一个机制,当一个写操作所针对的副本节点down掉的时候,cassandra会在另一台活着的副本几点上写一个hint(提示?暗示?还不如说是个便签。。。),这个hint用来表明那台down掉的节点需要重新执行这个写操作。如果所有的副本节点都死了,而且一致性等级是ConsistencyLevel.ANY(五种?ZERO,ONE,QUORUM,ALL,ANY,这里ANY是说只要一个写操作在任何地方成功了,hint写也算。。。那就算成功。。),协调节点(coordinating node)会在本地写下这个hint。这个机制主要是用来减少暂时失效节点通过活着的节点重新恢复到一致状态 所需要的时间,并且在不需要一致性的情况下提供额外的可用性。
     max_hint_window_in_ms:节点死掉之后保留hint的最长时间,死太久了,超过这个时间那么之前写的hint就被丢弃。
     hinted_handoff_throttle_delay_in_ms:每发送一行或者行碎片时sleep这么长时间?这里不确定
6、authenticator:认证后端,实现接口IAuthenticator,用来识别用户
7、authority:授权后端,实现接口IAuthority,用来限制访问,提供通行权
8、partitioner:这里是Cassandra中很重要的一部分,设置了Cassandra中数据的数据分割方式,用通俗的话来说,就是数据是以怎样的规则分布在整个集群的许多节点中的,数据的分割是以row为单位的,row使用key来唯一辨别的,所以其实就是key的分布。设置的值是个类名,这个类需要实现IPartitioner接口。Cassandra提供了几种分布方式如下

org.apache.cassandra.dht.RandomPartitioner
org.apache.cassandra.dht.ByteOrderedPartitioner
org.apache.cassandra.dht.OrderPreservingPartitioner (deprecated)
org.apache.cassandra.dht.CollatingOrderPreservingPartitioner(deprecated)

第一个是随机分割,使用md5;第二个是根据key的每个byte的字母顺序进行排序,这种排序支持row级别的(其实就是key啊)范围查询,但是对于顺序插入的负载,这种分割会存在热点问题,但是没办法啊。。。不然怎么顺序查询。。。泪奔;第三个是第二个的废弃版本,格式比第二个低效,使用UTF-8编码的字符串当做key;第四个似乎是和第三个差不多的,不过用了另一种排序方式,反正都被废弃了,不管了。
9、data_file_directories:存data的目录
     commitlog_directory:存commitlog的目录
     saved_caches_directory:存cache的目录
     commitlog_rotation_threshold_in_mb:一个阀值,单位MB,当commitlog超过这个大小时,就产生一个新的段
10、commitlog_sync:这里设置commitlog的同步方式,应该就是从内存中同步到磁盘上做持久化,这里有2种设置,一个是batch,一个是periodic。在batch模式中,只有当commit log同步到了磁盘上之后才会执行写操作,在执行同步之前,将会等待CommitLogSyncBatchWindowInMS毫秒,等其他的写操作结束。在periodic模式中,写操作会被立即执行,无视commitlog有没有写到磁盘上,而commitlog则是每隔commitlog_sync_period_in_ms这么长时间才周期性的进行同步。
      commitlog_sync_period_in_ms:commitlog以periodic模式同步时的周期值,单位毫秒。
11、seed_provider:这个参数是seed节点的设置,首先是要设置seed_provider的子参数- class_name,这里需要制定一个类来提供seed功能,这个类需要实现SeedProvider接口,然后设置子参数- seeds,也就是seed节点的地址,多个地址间用逗号隔开
12、flush_largest_memtables_at:这是第一个紧急状况压力阀值,这里的值设置在0-1.0之间,这个值的意思是每当full garbage collection之后,如果堆的使用率高于这个阀值的话,那么就会把最大的memtable flush到磁盘上。设到1.0就是永远不执行,也最好不要低于CMSInitiatingOccupancyFraction。
13、reduce_cache_sizes_at:这是第二个紧急状况压力阀值,在full garbage collecion之后,如果首次出现堆使用率高于这个阀值,那么cassandra就会降低cache的最大容量,并且降低cache的当前大小。
reduce_cache_capacity_to:当上述情况发生时,cache的最大容量降低比例
14、concurrent_reads:并发读的数量,当负载高于内存中能满足的数据时,读会成为cassandra的瓶颈,建议设置成16*驱动器数量
      concurrent_writes:并发写的数量,一般对于写操作,IO不会成为瓶颈,计算机的处理器core数才是影响其性能的关键,所以可以设置成8*core数量
15、memtable_total_space_in_mb:memtable的最大值,省略的话自动设置成堆的三分之一大小
        memtable_flush_writers:memtable flush的进程数量,由于flush进程会被磁盘io所阻塞,而且当被阻塞时,每个进程都会持有一个memtable,所以当堆很大,而且有很多数据目录时再去提高这个值。省略的话默认设置为数据目录的数量
      memtable_flush_queue_size:当memtable满了的时候能够进入等待flush队列的数目,最小应该设置为一个Column Family中辅助索引的最大数目
16、sliced_buffer_size_in_kb:处理连续的column slice的buffer大小
17、storage_port:TCP的端口
18、listen_address:其他节点用来连接的地址,也就是本机的地址,空着的话会自动调用InetAddress.getLocalHost()来填这个空,一般是填本机的hostname,不能填0.0.0.0
19、rpc_address:Thrift RPC service所绑定的地址,客户端会连接这个地址,0.0.0.0意味着监听所有,空着同上。
      rpc_port:Thrift RPC service绑定的端口
      rpc_keepalive:是否保持rpc连接存活,也就是用不用长连接吧
20、rpc_min_threads:对于客户端RPC,Cassandra使用一个进程进行处理,这是很昂贵的消耗,所以更好的方法是使用连接池。不过这里还是提供了rpc数量设置,这个参数是最小进程数。注释掉就默认16
      rpc_max_threads:这个参数是最大进程数,注释掉默认无上限
21、rpc_send_buff_size_in_bytes:rpc连接的socket buffer大小,可以注释掉
      rpc_recv_buff_size_in_bytes:rpc连接的socket buffer大小,可以注释掉
22、thrift_framed_transport_size_in_mb:thrift的框架大小,设成0就不用TFramedTransport,而用TSocket,但是不推荐设成0
23、thrift_max_message_length_in_mb:thrift消息的最大长度,包括所有的字段(field)和thrift内部的开销
24、incremental_backups:如果设为true的话,Cassandra会建立到每个已在本地flush后或stream之后的sstable的硬链接,这些sstable在Keyspace数据的备份或者子目录中。移除这些链接将是操作者的职责。一般设成false?这个参数没看懂。。。
25、snapshot_before_compaction:是否在每次compaction之前做snapshot,注意,如果设为true的话,cassandra是不会去清理这些snapshot的,所以要手动清理,估计会很多,很麻烦。。。一般设成false吧
26、compaciton_thread_priority:设置compaciton进程的优先级,注释掉的话默认设为1,在java中1是最低。。。。
27、column_index_size_in_kb:当一个row的内容超过这里设置的大小时,会添加column索引,默认设成了64,如果column的值都很大,或者有很多column的话,可以相应设置更大的值。这带来了另一个很矛盾的问题,即使只是读一个单独的column,Cassandra也需要反序列化(deserialize)这部分索引数据,所以这些索引数据是越小越好,一个column的情况可能比较极端,但至少能在读一行的一部分时(这种情况比较多见)能小一些;但是每次存取时是都需要先访问索引数据的,所以我们也不能一味图小,而使得索引中无法包括足够有用的信息。说了这么多废话,意思就是这个值要根据应用好好斟酌,不能太大,不能太小。默认64KB
28、in_memory_compaction_limit_in_mb:在内存中进行compaction的row的最大大小,超过这个值的话,超过的部分会暂时放到磁盘上,那么这个compaction效率就不行了。一个消息会log下来,用来辨别这个row key
29、concurrent_compactors:compation进程的数量,注释掉默认设置为处理器的数量,建议是使用多个进程进行处理,这样可以在读写混合型负载中保持读性能,原因是这样避免了在长时间的compaciton中sstable的积聚。。。其实嘛,就是compaciton的快一些。。
30、compaction_throughput_mb_per_sec:设置了一个阀值限制compaction输出的速率,单位是MB/s,要知道,compaciton是一个很差火的操作,同时消耗CPU,MEMORY,IO资源,所以compaction的时机非常重要,否则会对整个系统产生重大影响。compaction的相关设置都值得好好研究,如果希望系统高效的话,这个值的设置还是要结合实际使用情况。插入数据的速度越快,那么compaction的速度也要越快,以控制sstable的数量。一般设置这个值为插入数据速率的16到32倍比较好。设为0的话就没有阀值。
31、compaction_preheat_key_cache:是否在compaction过程中跟踪已缓存的row key,并且重新缓存这些row key在compact之后的sstable中的新位置。如果有足够大的key缓存的话,可以设为false(为什么。。。。)
32、rpc_timeout_in_ms:等待其他节点回复的时间,超过这个时间则执行的命令失败
33、phi_convict_threshold:这里就是论文中提到的Failure Detection所用到的phi,这里有一个算法,算出一个节点的phi值,如果超过阀值,那么就判断这个节点已经down掉。一般用不到,注释掉
34、endpoint_snitch:这里的参数值是一个实现了IEndpointSnitch接口的类,用来让Cassandra知晓网络拓扑,以更高效的route(这里忍不住吐槽一下,翻译成路由真是不知所云,还不如直接翻译成寻路)请求。snitch也是一个不好理解的地方,原意为告密,实际是理解为告知别人自己的拓扑结构么?那这个参数叫端点告知?Cassandra提供了3个策略

org.apache.cassandra.locator.SimpleSnitch:
org.apache.cassandra.locator.RackInferringSnitch:
org.apache.cassandra.locator.PropertyFileSnitch:

第一个是是简单策略,就是一个简单的邻近策略,能够提高缓存的locality(本地性?或者说集中性?),但是不能进行读修复(读修复能够进一步提高吞吐?);第二个是机架策略,邻近性由机架和数据中心决定,这里假设邻近性由不同节点的ip(ipv4)的第三个和第二个8位数(A.B.C.D中的B和C。。。)来决定,也就是说ip相近,那就认为更邻近;第三个是用配置文件显性地写好了哪些节点邻近,配置文件是cassandra-topology.properties。
35、dynamic_snitch:是否使用dynamic_snitch。dynamic_snitch是包裹在endpoint_snitch外,用来监控读延迟,以避免从已经变慢的主机上进行读取。
      dynamic_snitch_update_interval_in_ms:多久进行一次分数计算(代价昂贵?)
      dynamic_snitch_reset_interval_in_ms:多久充值所有机器的分数,通过这样的操作很可能可以让坏掉的节点恢复
      dynamic_snitch_badness_threshold:这里又提到了另一个机制,副本的pin(钉住?不知怎么翻译),意思就是反复从一个副本上读数据,这样cache的能力会得到提高。当这个参数的值(0.0到1.0闭区间)设置为大于0的时候,这个机制就会启动。但是基于dynamic snitch机制,副本是有分数的,dynamic snitch是用来监控读延迟,并且给主机打分,以评出优劣,然后选择更优的副本提供读,当一个已经pinned的副本变坏时,就需要换一个性能更优的副本pin住提供读。那么如何判断一个副本已经很坏了,需要换一个副本提供读呢?这里就用到了这个阀值,举个例子,当这个值为0.2时,当已经被pin的副本的分数比最快的副本的分数低20%时,cassandra就会选择另一个副本。

36、request_scheduler:这个参数是一个实现了RequestScheduler的类,用来基于特定的策略来调度传入的客户端请求,当一个Cassandra集群上有多租户时,这个调度会很有用。这个调度只是针对客户端请求,并不会影响内部节点通信。下面是提供的几个类

org.apache.cassandra.scheduler.NoScheduler
org.apache.cassandra.scheduler.RoundRobinScheduler

第一个是不使用任何调度;第二个是使用round robin调度客户端请求,每一个不同的request_scheduler_id都有一个单独的队列,具体的option会由request_scheduler_options设置
      request_scheduler_options:如果用NoScheduler,那么没有option,如果用RoundRobin,那么会有3个子选项:1,throttle_limit,每个客户端的请求数量阀值,超过这个数量的请求都会进入队列,大小设置一般是(并行读+并行写)*2;2,default_weight,可选选项,默认权值的设置,原先是1,但是可以自己设定;3,weights,也是可选选项,如果不写,那么权值会被设置成default_weight,这个参数是设定每轮RoundRobin中,基于scheduler id的权值,有多少请求会被处理,scheduler id的设置在下面。
      request_scheduler_id:基于这个id来进行请求调度,当前唯一的选择是keyspace
37、index_interval:对一个sstable的row key进行采样的大小,采样越大,index越有效,但是要消耗更多的空间
38、encryption_options:节点间的加密

2011/06/14 在Eclipse中配置Cassandra_2

接上回https://hycz.wordpress.com/2011/06/13/20110613-%E5%9C%A8eclipse%E4%B8%AD%E9%85%8D%E7%BD%AEcassandra/

  1. 见上回
  2. 见上回
  3. 见上回
  4. 遇到问题,开始Debug的时候出现
    “‘Launching CassandraDaemon’ has encountered a problem.
    Cannot connect to VM”Details中:
    “Cannot connect to VMselect failed”不光是这个项目,我新建了一个最简单的project写了个helloworld都不能用debug,说明这是eclipse层的问题。具体出错处是什么socket出错,网上的说法就几种:1,有其他程序占用了Eclipse的Debug端口,导致出错;2,操作系统的socket出了问题要重置。对于第一种解释,虽然我表示同意是另一个程序影响了Eclipse,但是实在难以找到相应的程序,对于第二种解释,我又找不到win7下的重置sock的工具。无奈之下,就用了360的LSP修复,修复之后Debug能用了,但是不能打开网页了。我在修复前看了一下,使用到LSP的非系统程序只有2个,都是我的游戏代理,一个是27代理,一个是迅雷加速器,恐怕问题就出在他们身上。无论如何,网页是打不开了,qq还在,于是重启看看。重启之后,依然不能上网,连qq都上不去了,打开360,居然要求自动修复,那就自动修复吧,结果瑞星狂叫360是木马,现在吧,我觉得瑞星更像木马一些。修复完了,按要求重启,qq能上了,但是网页打不开,这种情况好像DNS坏了一样,就用cmd ping了一下百度,居然ping的通,然后我又用Access Connections诊断了一下网络故障,他叫我检查防火墙设置。360防火墙怎么莫名其妙开了,关掉,不能开网页,瑞星防火墙,关掉,能开网页了。瑞星果然是娱乐货,没事弄个免费,天天弹那些狗屎免费游戏,真的考虑要不用它了。又启动了一下迅雷网游加速器,果然问题再现了,使用他自带的LSP修复,然后问题消失。果然还是LSP引起的问题么。这种诡异的问题不要再碰到了啊!这样一来,代理还是能用,只是用完了要修复,不能一边wow一边debug了。。
  5. 类中的static{}属性,似乎是在调用类的方法时,会首先执行的代码,在Cassandra的org.apache.cassandra.service.AbstractCassandraDaemon类中,就有这么一段,是用来初始化logging的
    //Initialize logging in such a way that it checks for config changes every 10 seconds.
        static
        {
        	String config = System.getProperty("log4j.configuration", "log4j-server.properties");
            URL configLocation = null;
            try
            {
                // try loading from a physical location first.
                configLocation = new URL(config);
            }
            catch (MalformedURLException ex)
            {
                // load from the classpath.
                configLocation = AbstractCassandraDaemon.class.getClassLoader().getResource(config);
                if (configLocation == null)
                    throw new RuntimeException("Couldn't figure out log4j configuration.");
            }
            PropertyConfigurator.configureAndWatch(configLocation.getFile(), 10000);
            org.apache.log4j.Logger.getLogger(AbstractCassandraDaemon.class).info("Logging initialized");
        }

    这里首先定义了一个变量config,这个config的属性关键字是log4j.configuration,然后给了一个默认值log4j-server.properties,这个变量的主要目的是储存log4j的配置文件地址,默认的文件名是log4j-server.properties,如果在启动时手动设置了其他值的话,那么就会是那个手动设置的值。

    然后首先尝试通过物理地址查找这个配置文件(try中),如果找不到(catch中),那么就到启动时设置的CLASSPATH中去找这个文件。

  6. 再看cassandra-0.8.0的bin版本中的bin文件夹中的cassandra.bat文件(我用的是windows7。。。) ,通过分析这个文件中所作的操作,可以知道如何在Eclipse中配置Cassandra的运行参数。以下摘自cassandra.bat文件。
    if "%OS%" == "Windows_NT" setlocalif NOT DEFINED CASSANDRA_HOME set CASSANDRA_HOME=%~dp0..
    if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.thrift.CassandraDaemon
    if NOT DEFINED JAVA_HOME goto err

    这段分别定义了cassandra的根目录地址,cassandra的入口程序,检查了JAVA_HOME是否定义。

    REM ***** JAVA options *****
    set JAVA_OPTS=^
     -ea^
     -Xms1G^
     -Xmx1G^
     -XX:+HeapDumpOnOutOfMemoryError^
     -XX:+UseParNewGC^
     -XX:+UseConcMarkSweepGC^
     -XX:+CMSParallelRemarkEnabled^
     -XX:SurvivorRatio=8^
     -XX:MaxTenuringThreshold=1^
     -XX:CMSInitiatingOccupancyFraction=75^
     -XX:+UseCMSInitiatingOccupancyOnly^
     -Dcom.sun.management.jmxremote.port=7199^
     -Dcom.sun.management.jmxremote.ssl=false^
     -Dcom.sun.management.jmxremote.authenticate=false^
     -Dlog4j.configuration=log4j-server.properties^
     -Dlog4j.defaultInitOverride=true

    这段是各种参数,需要把这段复制到Eclipse中Run->Debug Configration中建立的Debug项中的VM arguments中(注意把每行最后的^删掉)。

    REM Ensure that any user defined CLASSPATH variables are not used on startup
    set CLASSPATH="%CASSANDRA_HOME%\conf"

    这里就定义了CLASSPATH,原来这里把conf文件夹加入到了CLASSPATH里(CLASSPATH里不止一个地址),怪不得能找到。所以在Eclipse的项目的Properties->Java Build Path->Libraries点Add Class Folder,加入这个conf地址,就不会报错了

    bat文件中其他的设置也是大同小异,就不写了。