Hycz's Blog

Life is a game. Why so serious?

Cassandra 0.8.0 源码分析——Thrift相关

一、引子

这两天在写一个简单的cassandra测试程序,中间参考了test/distributed下的代码,不过其中用到的whirr实在蛋疼,首先这里用到的是whirr0.4版本,跟最新的0.7版本差别太大,其次是没有详细的文档说明如何在服务器端配置能够使用whirr进行集群管理,所以我就自己写了个简单的,把whirr的部分都砍掉了。

写这个简单的测试程序的过程中,出了些开始觉得诡异,后来觉得愚蠢的错误,不过也因此给我理由步进进去看了看源码,比如libthrift-0.6里的源码,还有自动生成的代码。

二、Introduction

首先推荐一个不错的thrift介绍,http://kingxss.iteye.com/blog/1413696

其他介绍的话就不说了,那篇博客里有写,这里就说下自己的感受。Thrift就是定义了一套自己的语法、传输协议,以此提供了与具体语言无关的一系列定义,在需要的时候翻译成相应的语言,同时提供了服务器客户端之间的底层通讯功能,于是可以让使用者专注于实现自己的功能

具体的表现,就是我接触到的Cassandra中thrift的使用,在cassandra.thrift文件中定义好数据结构struct,服务功能service,以及枚举enum、异常exception等部分后,使用thrift根据这个文件中的定义,自动生成大量的代码,在Cassandra中,就是interface/thrift/gen-java下的代码。

三、真的是生成代码?

开始我一直奇怪的就是,为何.thrift文件只有650+行,却生成了Cassandra.java这个有30000+行的庞然大物,而且.thrift文件中没有业务逻辑,仅仅是一些功能的名称。所以我下了个thrift的可执行程序(http://www.apache.org/dyn/closer.cgi?path=/thrift/0.8.0/thrift-0.8.0.exe)自己生成了一次。

首先我写了个简单的定义Hello.thrift,代码如下:

namespace java service.demo   
service Hello{   
 string helloString(1:string para)   
 i32 helloInt(1:i32 para)   
 bool helloBoolean(1:bool para)   
 void helloVoid()   
 string helloNull()   
}

执行命令thrift-0.8.0.exe -gen java Hello.thrift后,查看了一下生成代码的大小,真是吓人,仅仅几行的定义就生成了几千行的代码。

再看看cassandra.thrift,本身就有650+行,执行命令thrift-0.8.0.exe -gen java cassandra.thrift之后,确实生成了在原本项目中interface/thrift/gen-java目录下的代码。

四、生成了什么代码

刚才提到.thrift文件被叫做接口定义(interface definition),使用Thrift IDL(Thrift interface description language)(http://thrift.apache.org/docs/idl/)写成,这种文件一般包括了很多thrift types 和 services。services也是thrift types的一种,不过这里定义的services还需要由服务端实现,由客户端调用

所以,重要的就是理解.thrift文件中的各种项。官网上有2处提到了这些类型,一个是Thrift IDL(http://thrift.apache.org/docs/idl/),一个是Thrift Types(http://thrift.apache.org/docs/types/),我认为前者是完整的描述,后者则是其中常用的一些类型。

先说说下面列举一些常用的Type:

  • Base Types
    就是下面的7种基本类型,不用太多解释:

    • bool: A boolean value (true or false)
    • byte: An 8-bit signed integer
    • i16: A 16-bit signed integer
    • i32: A 32-bit signed integer
    • i64: A 64-bit signed integer
    • double: A 64-bit floating point number
    • string: A text string encoded using UTF-8 encoding
  • Special Types
    只有一种binary类型,当前只是string类型的一种特殊形式。

    • binary: a sequence of unencoded bytes
  • Structs
    定义了一个通用对象,类似于OOP中的Class,但是没有继承。一个Struct有一系列的拥有单独名称标识的强类型域(strongly typed fields)。在生成java代码时,每个Struct将单独生成一个类,由于定义时不会有方法,所以生成的Class代码中只有众多的存取方法。
  • Containers
    多种编程语言中常用的强类型容器,具体有3种:

    • list:元素的有序列表。会被翻译成STL vector,Java ArrayList,native arrays in scripting languages等等。
    • set:不同元素的无序列表。会被翻译成STL set, Java HashSet, set in Python等等。PHP中不支持set,所以会被看成语List相同。
    • map:严格不同的键值对map。可以被翻译成STL map, Java HashMap, PHP associative array, Python/Ruby dictionary等等。

    在实际使用中,容器常常作为field的类型使用。

  • Exceptions
    功能上Exception和Struct是一样的,不过Exception在翻译成目标语言的时候,都会继承相应的Exception类。生成Java代码时,也会生成一个单独的Exception文件。
  • Services
    定义一个service等同于在OOP中定义一个接口(或者纯虚拟抽象类)。Thrift编译器产生完整功能的,实现了这个接口的客户端和服务端stub。
    一个service有一系列命名了的functions组成,function有一系列参数和返回类型。

更加完整的定义还是得去看Thrift IDL,这里不细说了。

五、service生成代码的分析

interface/thrift/gen-java目录下,生成了很多的java文件,其中Cassandra.java是客户端的所在,其他则是一些数据结构的类、异常的类和枚举的类,所以这些闲散的类就没什么看头了,研究一下服务的类Cassandra.java。

我觉得Cassandra.java可以分为4个部分:接口,Client,Processor,辅助类

1、接口部分

接口部分有两个,IfaceAsyncIface,两者都是根据.thrift文件中的”service Cassandra{…}”这一项中的定义生成的,两个接口的不同之处是AsyncIface中的方法都比Iface中的多一个Call back的参数。

2、Client

Client就是实现了上述接口的类,因此同样有2个,一个是Client类,一个是AsyncClient类。Client类分为了4个部分,分别是:实例属性,Factory类,功能方法,sendrecv方法

2.1 实例属性

之所以把实例属性放在其他部分之前说,是由于实例属性体现了这是一个分布式程序,并且影响着其他部分。

实例属性只有3个:iprot_, oprot_, seqid。前2个都是org.apache.thrift.protocol.TProtocol类型的,可以理解为提供了分别设置输入协议对象和输出协议对象的能力,虽然使用时可以设置为一个。seqid则表示这客户端的编号,每次收到的消息都要比较seqid是否相同。

2.2 Factory类

Factory类就是提供了2个public级别的Client的构造器,2种构造器的参数列表略有不同,一个是只传入了一个协议对象,这将会把iprot_和oprot_设为相同对象;另一个则传入了2个协议对象,分别对应iprot_和oprot_

2.3 功能方法

这里的功能方法即为接口Iface中定义的方法,有趣的地方是所有的功能方法都有着统一的形式,即顺序执行send和recv方法。比如下面这个例子

public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_add(key, column_parent, column, consistency_level);
      recv_add();
    }

2.4 sendrecv方法

这里是真正涉及到了分布式的地方,每一个功能方法都对应着一个send方法和recv方法。send方法写消息,用oprot_写,recv方法收消息,从iprot_中读。比如下面这个例子

    public void send_add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("add", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      add_args args = new add_args();
      args.setKey(key);
      args.setColumn_parent(column_parent);
      args.setColumn(column);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

    public void recv_add() throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
      if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
        org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
        iprot_.readMessageEnd();
        throw x;
      }
      if (msg.seqid != seqid_) {
        throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "add failed: out of sequence response");
      }
      add_result result = new add_result();
      result.read(iprot_);
      iprot_.readMessageEnd();
      if (result.ire != null) {
        throw result.ire;
      }
      if (result.ue != null) {
        throw result.ue;
      }
      if (result.te != null) {
        throw result.te;
      }
      return;
    }

这里就涉及到了Thrift中的Message的结构,见下一章。

3、Processor

Processor类实现了org.apache.thrift.TProcessor接口,这个类封装了各种从输入输出流中读写数据的操作,在Thrift架构中在传输之上,应用之下,连接了底层操作和用户功能实现。Processor类大致分为这么几个部分:构造器,process方法,具体功能类

3.1 构造器

构造这个类的实例时,需要传入Iface中定义的方法的具体实现,具体来说,就是一个实现了Iface接口的对象,然后构造器中新建了各个功能类的的实例放入processMap_。具体代码如下:

public static class Processor implements org.apache.thrift.TProcessor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
  public Processor(Iface iface)
  {
    iface_ = iface;
    processMap_.put("login", new login());
    processMap_.put("set_keyspace", new set_keyspace());
    processMap_.put("get", new get());
    processMap_.put("get_slice", new get_slice());
    processMap_.put("get_count", new get_count());
    processMap_.put("multiget_slice", new multiget_slice());
    processMap_.put("multiget_count", new multiget_count());
    processMap_.put("get_range_slices", new get_range_slices());
    processMap_.put("get_indexed_slices", new get_indexed_slices());
    processMap_.put("insert", new insert());
    processMap_.put("add", new add());
    processMap_.put("remove", new remove());
    processMap_.put("remove_counter", new remove_counter());
    processMap_.put("batch_mutate", new batch_mutate());
    processMap_.put("truncate", new truncate());
    processMap_.put("describe_schema_versions", new describe_schema_versions());
    processMap_.put("describe_keyspaces", new describe_keyspaces());
    processMap_.put("describe_cluster_name", new describe_cluster_name());
    processMap_.put("describe_version", new describe_version());
    processMap_.put("describe_ring", new describe_ring());
    processMap_.put("describe_partitioner", new describe_partitioner());
    processMap_.put("describe_snitch", new describe_snitch());
    processMap_.put("describe_keyspace", new describe_keyspace());
    processMap_.put("describe_splits", new describe_splits());
    processMap_.put("system_add_column_family", new system_add_column_family());
    processMap_.put("system_drop_column_family", new system_drop_column_family());
    processMap_.put("system_add_keyspace", new system_add_keyspace());
    processMap_.put("system_drop_keyspace", new system_drop_keyspace());
    processMap_.put("system_update_keyspace", new system_update_keyspace());
    processMap_.put("system_update_column_family", new system_update_column_family());
    processMap_.put("execute_cql_query", new execute_cql_query());
  }
…
private Iface iface_;
protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
…

3.2 process方法

这个process方法是使用Processor类进行处理时的入口,通过读入输入流中的Message,寻找是否有相应的功能类来处理,如果有,则转至相应功能类进行处理,否则,写回一个Exception。具体代码如下:

public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
  ProcessFunction fn = processMap_.get(msg.name);
  if (fn == null) {
    org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
    iprot.readMessageEnd();
    org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
    oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
    x.write(oprot);
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
    return true;
  }
  fn.process(msg.seqid, iprot, oprot);
  return true;
}

3.3 具体功能类

对已Iface中定义的每个方法,这里都建立了一个类来与其对应,这样的类都实现了processFunction接口,其中只实现了一个process方法,用来处理输入输出流部分的操作,比如读入输入参数,写回结果,写回异常等等当然,最后还是要使用构造时传入的具体处理功能实现。这里代码比较多,随便贴一个来看看:

    protected static interface ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
    }
…

    private class get_slice implements ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
      {
        get_slice_args args = new get_slice_args();
        try {
          args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
          iprot.readMessageEnd();
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd();
        get_slice_result result = new get_slice_result();
        try {
          result.success = iface_.get_slice(args.key, args.column_parent, args.predicate, args.consistency_level);
        } catch (InvalidRequestException ire) {
          result.ire = ire;
        } catch (UnavailableException ue) {
          result.ue = ue;
        } catch (TimedOutException te) {
          result.te = te;
        } catch (Throwable th) {
          LOGGER.error("Internal error processing get_slice", th);
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, "Internal error processing get_slice");
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
      }

    }

4、辅助类

我所称的辅助类其实也跟所有的功能方法有关,每一个功能方法都有2个辅助类,分别是args和result。比如对于add功能,就有add_args和add_result两个辅助类。

args类是根据.thrift文件中定义的方法中的参数列表生成的,主要是对参数列表中的每一项建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这里的代码比较冗长,主要是供Client中的send方法使用。

result类是根据.thrift文件中定义的方法返回值和异常列表生成的,注意这里如果有返回值的话,那么会储存在success属性中,主要是对返回值和异常建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这个类主要供Client中的recv方法使用。

六、Thrift中的Message的结构

既然客户端和服务端之间的通讯的媒介是Message,那么我们就简单看下Message的结构是怎样的。

因为是生成代码,所以Cassandra.java中的代码结构就有迹可循。这一节我们先用一个例子来看一下消息是如何被一点点组织起来,完成后又被一点点读取出来的,这里用get_range_slice功能,然后总结一下Message的通用结构。

    public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_get_range_slices(column_parent, predicate, range, consistency_level);
      return recv_get_range_slices();
    }
    public void send_get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_range_slices", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      get_range_slices_args args = new get_range_slices_args();
      args.setColumn_parent(column_parent);
      args.setPredicate(predicate);
      args.setRange(range);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

1、Message Begin和End

send方法中的第一句就是写入一个Message Begin。这里实际上给了我们2个信息,一个是这个Begin究竟包括了哪些内容,另一个是如何去写入一个Message Begin。

1.1 Message Begin包括的内容

且不论不同的协议会如何对待传入的参数,我们知道了这个Message Begin是一个org.apache.thrift.protocol.TMessage类型的。这里是这个类的构造器:

public TMessage(String n, byte t, int s) {
name = n;
type = t;
seqid = s;
}
public final String name;
public final byte type;
public final int seqid;

由此可见,一个Message应该包括的内容有3个:name,type,seqid

  • name:根据上面send方法中的设置来看,name被设置为功能名称,如果使用TMessage的无参数构造器的话,name会被设为空字符串””。
  • type:虽然这是一个byte类型的,但是实际上是从TMessageType中选择的,具体含义见源码,不过如果使用TMessage的无参数构造器的话,type会被置为TType.STOP。
    public final class TMessageType {
      public static final byte CALL  = 1;
      public static final byte REPLY = 2;
      public static final byte EXCEPTION = 3;
      public static final byte ONEWAY = 4;
    }
  • seqid:这个就是用来标记客户端的。

1.2 不同协议的write

语句中写的是oprot_.writeMessageBegin,所以注定不同的协议会有不用的写法。先看看0.6版本中有哪些协议:

  • TBinaryProtocol
  • TCompactProtocol
  • TJSONProtocol
  • TSimpleJSONProtocol

这些协议都继承了TProtocol这个抽象类。在源码提供的测试程序中使用的是TBinaryProtocol。

当所有的写入完成后,需要执行writeMessageEnd方法。TBinaryProtocol中这是一个空方法。TJSONProtocol中就不是空方法。

2、args写入

send方法中的第二部分就是把参数写入,这里就借助了args辅助类来执行这个操作,当然,具体执行的之后,实际上还是调用的oprot_中的write方法,具体如下:

    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      validate();

      oprot.writeStructBegin(STRUCT_DESC);
      if (this.column_parent != null) {
        oprot.writeFieldBegin(COLUMN_PARENT_FIELD_DESC);
        this.column_parent.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.predicate != null) {
        oprot.writeFieldBegin(PREDICATE_FIELD_DESC);
        this.predicate.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.range != null) {
        oprot.writeFieldBegin(RANGE_FIELD_DESC);
        this.range.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.consistency_level != null) {
        oprot.writeFieldBegin(CONSISTENCY_LEVEL_FIELD_DESC);
        oprot.writeI32(this.consistency_level.getValue());
        oprot.writeFieldEnd();
      }
      oprot.writeFieldStop();
      oprot.writeStructEnd();
    }

从这里已经可以看出Message中的大致结构了,依次是Message级,Struct级,Field级

3、Message结构

Message的结构有三级,依次是Message级,Struct级,Field级。这是Thrift中通用的逻辑结构,不同的协议实现的不同。大致的示意图如下:

Message Begin
Struct Begin
Field Begin
arg
Field End
Struct End
Message End

其中arg是包含特定数据的地方,比如String,Int,Map,List等等,这里的结构就不固定了,arg部分的写入一般是在args类中的write方法里完成的。

在Cassandra.java中,用的是TBinaryProtocol,Message结构中的很多部分没有用到,比如writeMessageEnd方法,writeStructBegin方法,writeStructEnd方法,writeFieldEnd方法都是空方法,而在Message Begin和Field Begin这两处,则写入了一些标识信息,例如Field Begin中就标识了arg中放入的数据的类型,如下:

Message Begin: name, type, seqid
Struct Begin: nothing
Field Begin: field type, field id
arg: blablabla
Field End: nothing
Struct End: nothing
Message End: nothing

TBinaryProtocol类中的具体代码如下:

  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }

  public void writeMessageEnd() {}

  public void writeStructBegin(TStruct struct) {}

  public void writeStructEnd() {}

  public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
  }

  public void writeFieldEnd() {}

  public void writeFieldStop() throws TException {
    writeByte(TType.STOP);
  }

所以Message大致就是这么回事,关于上面的代码还有几个需要解释的地方:

  • 几种type:之前提到过Message的type,这里又出现了Field的type,两者是不同的,如同字面上的意思,Message的type表示了整个Message的类型,而Field的type仅表示了这个Field的类型Message的type是在TMessageType类中定义,Field的type是在TType类中定义,具体如下:
    /**
    * Type constants in the Thrift protocol.
    */
    public final class TType {
      public static final byte STOP   = 0;
      public static final byte VOID   = 1;
      public static final byte BOOL   = 2;
      public static final byte BYTE   = 3;
      public static final byte DOUBLE = 4;
      public static final byte I16    = 6;
      public static final byte I32    = 8;
      public static final byte I64    = 10;
      public static final byte STRING = 11;
      public static final byte STRUCT = 12;
      public static final byte MAP    = 13;
      public static final byte SET    = 14;
      public static final byte LIST   = 15;
      public static final byte ENUM   = 16;
    }

    于是这里就列出了所有arg中可能出现的数据的类型,对于String及String之后的类型,都有专门的writeXXXBegin、writeXXXEnd、readXXXBegin和readXXXEnd方法。

  • 关于Field Stop:在白皮书(http://thrift.apache.org/static/files/thrift-20070401.pdf)中提到,“所有的write方法都有一个对应的read方法,除了writeFieldStop这个例外这是一个特殊的方法,用来标记一个Struct的结束”。

七、Thrift的结构和简单使用(Java版)

首先有两篇介绍Thrift的文章,跟之前的不同是没有讲代码,http://diwakergupta.github.com/thrift-missing-guide/#_versioning_compatibilityhttp://dongxicheng.org/search-engine/thrift-guide/ ,前者是鸟文的,后者是中文的,不过翻译了不少前者的内容。

1、Thrift网络栈

下面这个是我画的很丑的Thrift 网络栈:

/—————————————————————
|        Server                                         |
|        (Single-threaded, event-driven etc) |
|—————————————————————|
|        Processer                                     |
|        (compiler generated)                     |
|—————————————————————|
|        Protocol                                       |
|        (Binary, JSON, Compact etc)           |
|—————————————————————|
|        Transport                                     |
|        (raw TCP, HTTP etc)                     |
|—————————————————————/

之后的内容大部分摘自http://dongxicheng.org/search-engine/thrift-guide/

1.1 Transport

Transport层提供了一个简单的网络读写抽象层。这使得thrift底层的transport从系统其它部分(如:序列化/反序列化)解耦。以下是一些Transport接口提供的方法:

  • open
  • close
  • read
  • write
  • flush

除了以上几个接口,Thrift使用ServerTransport接口接受或者创建原始transport对象。正如名字暗示的那样,ServerTransport用在server端,为到来的连接创建Transport对象。

  • open
  • listen
  • accept
  • close

1.2 Protocol

Protocol抽象层定义了一种将内存中数据结构映射成可传输格式的机制。换句话说,Protocol定义了datatype怎样使用底层的Transport对自己进行编解码。因此,Protocol的实现要给出编码机制并负责对数据进行序列化。

Protocol接口的定义如下:

  • writeMessageBegin(name, type, seq)
  • writeMessageEnd()
  • writeStructBegin(name)
  • writeStructEnd()
  • writeFieldBegin(name, type, id)
  • writeFieldEnd()
  • writeFieldStop()
  • writeMapBegin(ktype, vtype, size)
  • writeMapEnd()
  • writeListBegin(etype, size)
  • writeListEnd()
  • writeSetBegin(etype, size)
  • writeSetEnd()
  • writeBool(bool)
  • writeByte(byte)
  • writeI16(i16)
  • writeI32(i32)
  • writeI64(i64)
  • writeDouble(double)
  • writeString(string)
  • name, type, seq = readMessageBegin()
  • readMessageEnd()
  • name = readStructBegin()
  • readStructEnd()
  • name, type, id = readFieldBegin()
  • readFieldEnd()
  • k, v, size = readMapBegin()
  • readMapEnd()
  • etype, size = readListBegin()
  • readListEnd()
  • etype, size = readSetBegin()
  • readSetEnd()
  • bool = readBool()
  • byte = readByte()
  • i16 = readI16()
  • i32 = readI32()
  • i64 = readI64()
  • double = readDouble()
  • string = readString()

下面是一些对大部分thrift支持的语言均可用的protocol:

  • binary:简单的二进制编码
  • Compact:具体见THRIFT-11
  • Json

1.3 Processor

Processor封装了从输入数据流中读数据和向输出数据流中写数据的操作。读写数据流用Protocol对象表示。Processor的结构体非常简单:

/**
 * A processor is a generic object which operates upon an input stream and
 * writes to some output stream.
 *
 */
public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}

与服务相关的processor实现由编译器产生。Processor主要工作流程如下:从连接中读取数据(使用输入protocol),将处理授权给handler(由用户实现),最后将结果写到连接上(使用输出protocol)。

1.4 Server

Server将以上所有特性集成在一起:

  1. 创建一个transport对象
  2. 为transport对象创建输入输出protocol
  3. 基于输入输出protocol创建processor
  4. 等待连接请求并将之交给processor处理

2、Client 和 Server的编写

2.1 Client编写

好吧, 这个Client不是指Cassandra.java中的Client,而是指单独写的一个可以通过使用Cassandra.java来对服务器进行操作的客户端。其实只是一个简单的客户端的的话并不需要太多代码,这个是部分代码:

    public Cassandra.Client createClient(InetAddress addr) throws TException
    {
        TTransport transport    = new TSocket(
                                    addr.getHostAddress(),
                                    CLIENT_PORT,
                                    200000);
        transport               = new TFramedTransport(transport);
        TProtocol  protocol     = new TBinaryProtocol(transport);

        Cassandra.Client client = new Cassandra.Client(protocol);
        transport.open();

        return client;
    }

这里先建了个TSocket对象,然后以此为参数建立TTransport对象,再以此TTransport对象建立TProtocol对象,最后以此TProtocol对象建立客户端。

使用这个客户端也很方便,代码如下:

	public void testInsert() throws Exception
    {

        List hosts = controller.getHosts();
        final String keyspace = "TestInsert";
        addKeyspace(keyspace, 1);
        Cassandra.Client client = controller.createClient(hosts.get(0));
        try{
        	client.set_keyspace(keyspace);

	        ByteBuffer key = newKey();

	        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
	        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);

	        // block until the column is available
	        new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
	        new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);

	        List coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
	        assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
	        assertColumnEqual("c2", "v2", 0, coscs.get(1).column);
		} catch(Exception e){
			e.printStackTrace();
		} finally {
			client.send_system_drop_keyspace(keyspace);
		}
    }

2.2 Server编写

这一部分就不是用户写的了,而是属于Cassandra的一部分,org.apache.cassandra.thrift.CassandraServer就是这个server,这个类实现了Iface中定义的各种功能,其对象将会作为参数传给Processor,由于Processor中已经做了输入输出流的读取,因此CassandraServer类只需要专注于功能实现就行了,其实还是调用其他内部接口,比如下面这个方法:

    public List get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
    throws InvalidRequestException, UnavailableException, TException, TimedOutException
    {
        logger.debug("range_slice");

        String keyspace = state().getKeyspace();
        state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);

        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
        ThriftValidation.validateColumnParent(metadata, column_parent);
        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
        ThriftValidation.validateKeyRange(range);
        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

        List rows;
        try
        {
            IPartitioner p = StorageService.getPartitioner();
            AbstractBounds bounds;
            if (range.start_key == null)
            {
                Token.TokenFactory tokenFactory = p.getTokenFactory();
                Token left = tokenFactory.fromString(range.start_token);
                Token right = tokenFactory.fromString(range.end_token);
                bounds = new Range(left, right);
            }
            else
            {
                bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
            }
            try
            {
                schedule();
                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
            }
            finally
            {
                release();
            }
            assert rows != null;
        }
        catch (TimeoutException e)
        {
        	throw new TimedOutException();
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }        return thriftifyKeySlices(rows, column_parent, predicate);
    }

这里在获得了key的bounds之后,尝试调度此线程获得资源,然后调用了StorageProxy.getRangeSlice方法。

八、总结

这篇水文从Cassandra的生成代码入手,简单介绍了.thrift文件,分析了生成代码的结构,Thrift中的Message结构,最后简单介绍了一下Thrift的结构和如何写代码去使用Thrift。

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: