Hycz's Blog

Life is a game. Why so serious?

Category Archives: Apache

Cassandra 0.8.0 源码分析——Thrift相关

一、引子

这两天在写一个简单的cassandra测试程序,中间参考了test/distributed下的代码,不过其中用到的whirr实在蛋疼,首先这里用到的是whirr0.4版本,跟最新的0.7版本差别太大,其次是没有详细的文档说明如何在服务器端配置能够使用whirr进行集群管理,所以我就自己写了个简单的,把whirr的部分都砍掉了。

写这个简单的测试程序的过程中,出了些开始觉得诡异,后来觉得愚蠢的错误,不过也因此给我理由步进进去看了看源码,比如libthrift-0.6里的源码,还有自动生成的代码。

二、Introduction

首先推荐一个不错的thrift介绍,http://kingxss.iteye.com/blog/1413696

其他介绍的话就不说了,那篇博客里有写,这里就说下自己的感受。Thrift就是定义了一套自己的语法、传输协议,以此提供了与具体语言无关的一系列定义,在需要的时候翻译成相应的语言,同时提供了服务器客户端之间的底层通讯功能,于是可以让使用者专注于实现自己的功能

具体的表现,就是我接触到的Cassandra中thrift的使用,在cassandra.thrift文件中定义好数据结构struct,服务功能service,以及枚举enum、异常exception等部分后,使用thrift根据这个文件中的定义,自动生成大量的代码,在Cassandra中,就是interface/thrift/gen-java下的代码。

三、真的是生成代码?

开始我一直奇怪的就是,为何.thrift文件只有650+行,却生成了Cassandra.java这个有30000+行的庞然大物,而且.thrift文件中没有业务逻辑,仅仅是一些功能的名称。所以我下了个thrift的可执行程序(http://www.apache.org/dyn/closer.cgi?path=/thrift/0.8.0/thrift-0.8.0.exe)自己生成了一次。

首先我写了个简单的定义Hello.thrift,代码如下:

namespace java service.demo   
service Hello{   
 string helloString(1:string para)   
 i32 helloInt(1:i32 para)   
 bool helloBoolean(1:bool para)   
 void helloVoid()   
 string helloNull()   
}

执行命令thrift-0.8.0.exe -gen java Hello.thrift后,查看了一下生成代码的大小,真是吓人,仅仅几行的定义就生成了几千行的代码。

再看看cassandra.thrift,本身就有650+行,执行命令thrift-0.8.0.exe -gen java cassandra.thrift之后,确实生成了在原本项目中interface/thrift/gen-java目录下的代码。

四、生成了什么代码

刚才提到.thrift文件被叫做接口定义(interface definition),使用Thrift IDL(Thrift interface description language)(http://thrift.apache.org/docs/idl/)写成,这种文件一般包括了很多thrift types 和 services。services也是thrift types的一种,不过这里定义的services还需要由服务端实现,由客户端调用

所以,重要的就是理解.thrift文件中的各种项。官网上有2处提到了这些类型,一个是Thrift IDL(http://thrift.apache.org/docs/idl/),一个是Thrift Types(http://thrift.apache.org/docs/types/),我认为前者是完整的描述,后者则是其中常用的一些类型。

先说说下面列举一些常用的Type:

  • Base Types
    就是下面的7种基本类型,不用太多解释:

    • bool: A boolean value (true or false)
    • byte: An 8-bit signed integer
    • i16: A 16-bit signed integer
    • i32: A 32-bit signed integer
    • i64: A 64-bit signed integer
    • double: A 64-bit floating point number
    • string: A text string encoded using UTF-8 encoding
  • Special Types
    只有一种binary类型,当前只是string类型的一种特殊形式。

    • binary: a sequence of unencoded bytes
  • Structs
    定义了一个通用对象,类似于OOP中的Class,但是没有继承。一个Struct有一系列的拥有单独名称标识的强类型域(strongly typed fields)。在生成java代码时,每个Struct将单独生成一个类,由于定义时不会有方法,所以生成的Class代码中只有众多的存取方法。
  • Containers
    多种编程语言中常用的强类型容器,具体有3种:

    • list:元素的有序列表。会被翻译成STL vector,Java ArrayList,native arrays in scripting languages等等。
    • set:不同元素的无序列表。会被翻译成STL set, Java HashSet, set in Python等等。PHP中不支持set,所以会被看成语List相同。
    • map:严格不同的键值对map。可以被翻译成STL map, Java HashMap, PHP associative array, Python/Ruby dictionary等等。

    在实际使用中,容器常常作为field的类型使用。

  • Exceptions
    功能上Exception和Struct是一样的,不过Exception在翻译成目标语言的时候,都会继承相应的Exception类。生成Java代码时,也会生成一个单独的Exception文件。
  • Services
    定义一个service等同于在OOP中定义一个接口(或者纯虚拟抽象类)。Thrift编译器产生完整功能的,实现了这个接口的客户端和服务端stub。
    一个service有一系列命名了的functions组成,function有一系列参数和返回类型。

更加完整的定义还是得去看Thrift IDL,这里不细说了。

五、service生成代码的分析

interface/thrift/gen-java目录下,生成了很多的java文件,其中Cassandra.java是客户端的所在,其他则是一些数据结构的类、异常的类和枚举的类,所以这些闲散的类就没什么看头了,研究一下服务的类Cassandra.java。

我觉得Cassandra.java可以分为4个部分:接口,Client,Processor,辅助类

1、接口部分

接口部分有两个,IfaceAsyncIface,两者都是根据.thrift文件中的”service Cassandra{…}”这一项中的定义生成的,两个接口的不同之处是AsyncIface中的方法都比Iface中的多一个Call back的参数。

2、Client

Client就是实现了上述接口的类,因此同样有2个,一个是Client类,一个是AsyncClient类。Client类分为了4个部分,分别是:实例属性,Factory类,功能方法,sendrecv方法

2.1 实例属性

之所以把实例属性放在其他部分之前说,是由于实例属性体现了这是一个分布式程序,并且影响着其他部分。

实例属性只有3个:iprot_, oprot_, seqid。前2个都是org.apache.thrift.protocol.TProtocol类型的,可以理解为提供了分别设置输入协议对象和输出协议对象的能力,虽然使用时可以设置为一个。seqid则表示这客户端的编号,每次收到的消息都要比较seqid是否相同。

2.2 Factory类

Factory类就是提供了2个public级别的Client的构造器,2种构造器的参数列表略有不同,一个是只传入了一个协议对象,这将会把iprot_和oprot_设为相同对象;另一个则传入了2个协议对象,分别对应iprot_和oprot_

2.3 功能方法

这里的功能方法即为接口Iface中定义的方法,有趣的地方是所有的功能方法都有着统一的形式,即顺序执行send和recv方法。比如下面这个例子

public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_add(key, column_parent, column, consistency_level);
      recv_add();
    }

2.4 sendrecv方法

这里是真正涉及到了分布式的地方,每一个功能方法都对应着一个send方法和recv方法。send方法写消息,用oprot_写,recv方法收消息,从iprot_中读。比如下面这个例子

    public void send_add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("add", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      add_args args = new add_args();
      args.setKey(key);
      args.setColumn_parent(column_parent);
      args.setColumn(column);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

    public void recv_add() throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
      if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
        org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
        iprot_.readMessageEnd();
        throw x;
      }
      if (msg.seqid != seqid_) {
        throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "add failed: out of sequence response");
      }
      add_result result = new add_result();
      result.read(iprot_);
      iprot_.readMessageEnd();
      if (result.ire != null) {
        throw result.ire;
      }
      if (result.ue != null) {
        throw result.ue;
      }
      if (result.te != null) {
        throw result.te;
      }
      return;
    }

这里就涉及到了Thrift中的Message的结构,见下一章。

3、Processor

Processor类实现了org.apache.thrift.TProcessor接口,这个类封装了各种从输入输出流中读写数据的操作,在Thrift架构中在传输之上,应用之下,连接了底层操作和用户功能实现。Processor类大致分为这么几个部分:构造器,process方法,具体功能类

3.1 构造器

构造这个类的实例时,需要传入Iface中定义的方法的具体实现,具体来说,就是一个实现了Iface接口的对象,然后构造器中新建了各个功能类的的实例放入processMap_。具体代码如下:

public static class Processor implements org.apache.thrift.TProcessor {
  private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
  public Processor(Iface iface)
  {
    iface_ = iface;
    processMap_.put("login", new login());
    processMap_.put("set_keyspace", new set_keyspace());
    processMap_.put("get", new get());
    processMap_.put("get_slice", new get_slice());
    processMap_.put("get_count", new get_count());
    processMap_.put("multiget_slice", new multiget_slice());
    processMap_.put("multiget_count", new multiget_count());
    processMap_.put("get_range_slices", new get_range_slices());
    processMap_.put("get_indexed_slices", new get_indexed_slices());
    processMap_.put("insert", new insert());
    processMap_.put("add", new add());
    processMap_.put("remove", new remove());
    processMap_.put("remove_counter", new remove_counter());
    processMap_.put("batch_mutate", new batch_mutate());
    processMap_.put("truncate", new truncate());
    processMap_.put("describe_schema_versions", new describe_schema_versions());
    processMap_.put("describe_keyspaces", new describe_keyspaces());
    processMap_.put("describe_cluster_name", new describe_cluster_name());
    processMap_.put("describe_version", new describe_version());
    processMap_.put("describe_ring", new describe_ring());
    processMap_.put("describe_partitioner", new describe_partitioner());
    processMap_.put("describe_snitch", new describe_snitch());
    processMap_.put("describe_keyspace", new describe_keyspace());
    processMap_.put("describe_splits", new describe_splits());
    processMap_.put("system_add_column_family", new system_add_column_family());
    processMap_.put("system_drop_column_family", new system_drop_column_family());
    processMap_.put("system_add_keyspace", new system_add_keyspace());
    processMap_.put("system_drop_keyspace", new system_drop_keyspace());
    processMap_.put("system_update_keyspace", new system_update_keyspace());
    processMap_.put("system_update_column_family", new system_update_column_family());
    processMap_.put("execute_cql_query", new execute_cql_query());
  }
…
private Iface iface_;
protected final HashMap<String,ProcessFunction> processMap_ = new HashMap<String,ProcessFunction>();
…

3.2 process方法

这个process方法是使用Processor类进行处理时的入口,通过读入输入流中的Message,寻找是否有相应的功能类来处理,如果有,则转至相应功能类进行处理,否则,写回一个Exception。具体代码如下:

public boolean process(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot.readMessageBegin();
  ProcessFunction fn = processMap_.get(msg.name);
  if (fn == null) {
    org.apache.thrift.protocol.TProtocolUtil.skip(iprot, org.apache.thrift.protocol.TType.STRUCT);
    iprot.readMessageEnd();
    org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
    oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(msg.name, org.apache.thrift.protocol.TMessageType.EXCEPTION, msg.seqid));
    x.write(oprot);
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
    return true;
  }
  fn.process(msg.seqid, iprot, oprot);
  return true;
}

3.3 具体功能类

对已Iface中定义的每个方法,这里都建立了一个类来与其对应,这样的类都实现了processFunction接口,其中只实现了一个process方法,用来处理输入输出流部分的操作,比如读入输入参数,写回结果,写回异常等等当然,最后还是要使用构造时传入的具体处理功能实现。这里代码比较多,随便贴一个来看看:

    protected static interface ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException;
    }
…

    private class get_slice implements ProcessFunction {
      public void process(int seqid, org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException
      {
        get_slice_args args = new get_slice_args();
        try {
          args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
          iprot.readMessageEnd();
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.PROTOCOL_ERROR, e.getMessage());
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        iprot.readMessageEnd();
        get_slice_result result = new get_slice_result();
        try {
          result.success = iface_.get_slice(args.key, args.column_parent, args.predicate, args.consistency_level);
        } catch (InvalidRequestException ire) {
          result.ire = ire;
        } catch (UnavailableException ue) {
          result.ue = ue;
        } catch (TimedOutException te) {
          result.te = te;
        } catch (Throwable th) {
          LOGGER.error("Internal error processing get_slice", th);
          org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, "Internal error processing get_slice");
          oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.EXCEPTION, seqid));
          x.write(oprot);
          oprot.writeMessageEnd();
          oprot.getTransport().flush();
          return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_slice", org.apache.thrift.protocol.TMessageType.REPLY, seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
      }

    }

4、辅助类

我所称的辅助类其实也跟所有的功能方法有关,每一个功能方法都有2个辅助类,分别是args和result。比如对于add功能,就有add_args和add_result两个辅助类。

args类是根据.thrift文件中定义的方法中的参数列表生成的,主要是对参数列表中的每一项建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这里的代码比较冗长,主要是供Client中的send方法使用。

result类是根据.thrift文件中定义的方法返回值和异常列表生成的,注意这里如果有返回值的话,那么会储存在success属性中,主要是对返回值和异常建立各种方法:get、set、isSet、unset等等,还定义了一些特定类型的struct对象和field对象(这里见下一章),这个类主要供Client中的recv方法使用。

六、Thrift中的Message的结构

既然客户端和服务端之间的通讯的媒介是Message,那么我们就简单看下Message的结构是怎样的。

因为是生成代码,所以Cassandra.java中的代码结构就有迹可循。这一节我们先用一个例子来看一下消息是如何被一点点组织起来,完成后又被一点点读取出来的,这里用get_range_slice功能,然后总结一下Message的通用结构。

    public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, org.apache.thrift.TException
    {
      send_get_range_slices(column_parent, predicate, range, consistency_level);
      return recv_get_range_slices();
    }
    public void send_get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws org.apache.thrift.TException
    {
      oprot_.writeMessageBegin(new org.apache.thrift.protocol.TMessage("get_range_slices", org.apache.thrift.protocol.TMessageType.CALL, ++seqid_));
      get_range_slices_args args = new get_range_slices_args();
      args.setColumn_parent(column_parent);
      args.setPredicate(predicate);
      args.setRange(range);
      args.setConsistency_level(consistency_level);
      args.write(oprot_);
      oprot_.writeMessageEnd();
      oprot_.getTransport().flush();
    }

1、Message Begin和End

send方法中的第一句就是写入一个Message Begin。这里实际上给了我们2个信息,一个是这个Begin究竟包括了哪些内容,另一个是如何去写入一个Message Begin。

1.1 Message Begin包括的内容

且不论不同的协议会如何对待传入的参数,我们知道了这个Message Begin是一个org.apache.thrift.protocol.TMessage类型的。这里是这个类的构造器:

public TMessage(String n, byte t, int s) {
name = n;
type = t;
seqid = s;
}
public final String name;
public final byte type;
public final int seqid;

由此可见,一个Message应该包括的内容有3个:name,type,seqid

  • name:根据上面send方法中的设置来看,name被设置为功能名称,如果使用TMessage的无参数构造器的话,name会被设为空字符串””。
  • type:虽然这是一个byte类型的,但是实际上是从TMessageType中选择的,具体含义见源码,不过如果使用TMessage的无参数构造器的话,type会被置为TType.STOP。
    public final class TMessageType {
      public static final byte CALL  = 1;
      public static final byte REPLY = 2;
      public static final byte EXCEPTION = 3;
      public static final byte ONEWAY = 4;
    }
  • seqid:这个就是用来标记客户端的。

1.2 不同协议的write

语句中写的是oprot_.writeMessageBegin,所以注定不同的协议会有不用的写法。先看看0.6版本中有哪些协议:

  • TBinaryProtocol
  • TCompactProtocol
  • TJSONProtocol
  • TSimpleJSONProtocol

这些协议都继承了TProtocol这个抽象类。在源码提供的测试程序中使用的是TBinaryProtocol。

当所有的写入完成后,需要执行writeMessageEnd方法。TBinaryProtocol中这是一个空方法。TJSONProtocol中就不是空方法。

2、args写入

send方法中的第二部分就是把参数写入,这里就借助了args辅助类来执行这个操作,当然,具体执行的之后,实际上还是调用的oprot_中的write方法,具体如下:

    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      validate();

      oprot.writeStructBegin(STRUCT_DESC);
      if (this.column_parent != null) {
        oprot.writeFieldBegin(COLUMN_PARENT_FIELD_DESC);
        this.column_parent.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.predicate != null) {
        oprot.writeFieldBegin(PREDICATE_FIELD_DESC);
        this.predicate.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.range != null) {
        oprot.writeFieldBegin(RANGE_FIELD_DESC);
        this.range.write(oprot);
        oprot.writeFieldEnd();
      }
      if (this.consistency_level != null) {
        oprot.writeFieldBegin(CONSISTENCY_LEVEL_FIELD_DESC);
        oprot.writeI32(this.consistency_level.getValue());
        oprot.writeFieldEnd();
      }
      oprot.writeFieldStop();
      oprot.writeStructEnd();
    }

从这里已经可以看出Message中的大致结构了,依次是Message级,Struct级,Field级

3、Message结构

Message的结构有三级,依次是Message级,Struct级,Field级。这是Thrift中通用的逻辑结构,不同的协议实现的不同。大致的示意图如下:

Message Begin
Struct Begin
Field Begin
arg
Field End
Struct End
Message End

其中arg是包含特定数据的地方,比如String,Int,Map,List等等,这里的结构就不固定了,arg部分的写入一般是在args类中的write方法里完成的。

在Cassandra.java中,用的是TBinaryProtocol,Message结构中的很多部分没有用到,比如writeMessageEnd方法,writeStructBegin方法,writeStructEnd方法,writeFieldEnd方法都是空方法,而在Message Begin和Field Begin这两处,则写入了一些标识信息,例如Field Begin中就标识了arg中放入的数据的类型,如下:

Message Begin: name, type, seqid
Struct Begin: nothing
Field Begin: field type, field id
arg: blablabla
Field End: nothing
Struct End: nothing
Message End: nothing

TBinaryProtocol类中的具体代码如下:

  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }

  public void writeMessageEnd() {}

  public void writeStructBegin(TStruct struct) {}

  public void writeStructEnd() {}

  public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
  }

  public void writeFieldEnd() {}

  public void writeFieldStop() throws TException {
    writeByte(TType.STOP);
  }

所以Message大致就是这么回事,关于上面的代码还有几个需要解释的地方:

  • 几种type:之前提到过Message的type,这里又出现了Field的type,两者是不同的,如同字面上的意思,Message的type表示了整个Message的类型,而Field的type仅表示了这个Field的类型Message的type是在TMessageType类中定义,Field的type是在TType类中定义,具体如下:
    /**
    * Type constants in the Thrift protocol.
    */
    public final class TType {
      public static final byte STOP   = 0;
      public static final byte VOID   = 1;
      public static final byte BOOL   = 2;
      public static final byte BYTE   = 3;
      public static final byte DOUBLE = 4;
      public static final byte I16    = 6;
      public static final byte I32    = 8;
      public static final byte I64    = 10;
      public static final byte STRING = 11;
      public static final byte STRUCT = 12;
      public static final byte MAP    = 13;
      public static final byte SET    = 14;
      public static final byte LIST   = 15;
      public static final byte ENUM   = 16;
    }

    于是这里就列出了所有arg中可能出现的数据的类型,对于String及String之后的类型,都有专门的writeXXXBegin、writeXXXEnd、readXXXBegin和readXXXEnd方法。

  • 关于Field Stop:在白皮书(http://thrift.apache.org/static/files/thrift-20070401.pdf)中提到,“所有的write方法都有一个对应的read方法,除了writeFieldStop这个例外这是一个特殊的方法,用来标记一个Struct的结束”。

七、Thrift的结构和简单使用(Java版)

首先有两篇介绍Thrift的文章,跟之前的不同是没有讲代码,http://diwakergupta.github.com/thrift-missing-guide/#_versioning_compatibilityhttp://dongxicheng.org/search-engine/thrift-guide/ ,前者是鸟文的,后者是中文的,不过翻译了不少前者的内容。

1、Thrift网络栈

下面这个是我画的很丑的Thrift 网络栈:

/—————————————————————
|        Server                                         |
|        (Single-threaded, event-driven etc) |
|—————————————————————|
|        Processer                                     |
|        (compiler generated)                     |
|—————————————————————|
|        Protocol                                       |
|        (Binary, JSON, Compact etc)           |
|—————————————————————|
|        Transport                                     |
|        (raw TCP, HTTP etc)                     |
|—————————————————————/

之后的内容大部分摘自http://dongxicheng.org/search-engine/thrift-guide/

1.1 Transport

Transport层提供了一个简单的网络读写抽象层。这使得thrift底层的transport从系统其它部分(如:序列化/反序列化)解耦。以下是一些Transport接口提供的方法:

  • open
  • close
  • read
  • write
  • flush

除了以上几个接口,Thrift使用ServerTransport接口接受或者创建原始transport对象。正如名字暗示的那样,ServerTransport用在server端,为到来的连接创建Transport对象。

  • open
  • listen
  • accept
  • close

1.2 Protocol

Protocol抽象层定义了一种将内存中数据结构映射成可传输格式的机制。换句话说,Protocol定义了datatype怎样使用底层的Transport对自己进行编解码。因此,Protocol的实现要给出编码机制并负责对数据进行序列化。

Protocol接口的定义如下:

  • writeMessageBegin(name, type, seq)
  • writeMessageEnd()
  • writeStructBegin(name)
  • writeStructEnd()
  • writeFieldBegin(name, type, id)
  • writeFieldEnd()
  • writeFieldStop()
  • writeMapBegin(ktype, vtype, size)
  • writeMapEnd()
  • writeListBegin(etype, size)
  • writeListEnd()
  • writeSetBegin(etype, size)
  • writeSetEnd()
  • writeBool(bool)
  • writeByte(byte)
  • writeI16(i16)
  • writeI32(i32)
  • writeI64(i64)
  • writeDouble(double)
  • writeString(string)
  • name, type, seq = readMessageBegin()
  • readMessageEnd()
  • name = readStructBegin()
  • readStructEnd()
  • name, type, id = readFieldBegin()
  • readFieldEnd()
  • k, v, size = readMapBegin()
  • readMapEnd()
  • etype, size = readListBegin()
  • readListEnd()
  • etype, size = readSetBegin()
  • readSetEnd()
  • bool = readBool()
  • byte = readByte()
  • i16 = readI16()
  • i32 = readI32()
  • i64 = readI64()
  • double = readDouble()
  • string = readString()

下面是一些对大部分thrift支持的语言均可用的protocol:

  • binary:简单的二进制编码
  • Compact:具体见THRIFT-11
  • Json

1.3 Processor

Processor封装了从输入数据流中读数据和向输出数据流中写数据的操作。读写数据流用Protocol对象表示。Processor的结构体非常简单:

/**
 * A processor is a generic object which operates upon an input stream and
 * writes to some output stream.
 *
 */
public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}

与服务相关的processor实现由编译器产生。Processor主要工作流程如下:从连接中读取数据(使用输入protocol),将处理授权给handler(由用户实现),最后将结果写到连接上(使用输出protocol)。

1.4 Server

Server将以上所有特性集成在一起:

  1. 创建一个transport对象
  2. 为transport对象创建输入输出protocol
  3. 基于输入输出protocol创建processor
  4. 等待连接请求并将之交给processor处理

2、Client 和 Server的编写

2.1 Client编写

好吧, 这个Client不是指Cassandra.java中的Client,而是指单独写的一个可以通过使用Cassandra.java来对服务器进行操作的客户端。其实只是一个简单的客户端的的话并不需要太多代码,这个是部分代码:

    public Cassandra.Client createClient(InetAddress addr) throws TException
    {
        TTransport transport    = new TSocket(
                                    addr.getHostAddress(),
                                    CLIENT_PORT,
                                    200000);
        transport               = new TFramedTransport(transport);
        TProtocol  protocol     = new TBinaryProtocol(transport);

        Cassandra.Client client = new Cassandra.Client(protocol);
        transport.open();

        return client;
    }

这里先建了个TSocket对象,然后以此为参数建立TTransport对象,再以此TTransport对象建立TProtocol对象,最后以此TProtocol对象建立客户端。

使用这个客户端也很方便,代码如下:

	public void testInsert() throws Exception
    {

        List hosts = controller.getHosts();
        final String keyspace = "TestInsert";
        addKeyspace(keyspace, 1);
        Cassandra.Client client = controller.createClient(hosts.get(0));
        try{
        	client.set_keyspace(keyspace);

	        ByteBuffer key = newKey();

	        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
	        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);

	        // block until the column is available
	        new Get(client, "Standard1", key).name("c1").value("v1").perform(ConsistencyLevel.ONE);
	        new Get(client, "Standard1", key).name("c2").value("v2").perform(ConsistencyLevel.ONE);

	        List coscs = get_slice(client, key, "Standard1", ConsistencyLevel.ONE);
	        assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
	        assertColumnEqual("c2", "v2", 0, coscs.get(1).column);
		} catch(Exception e){
			e.printStackTrace();
		} finally {
			client.send_system_drop_keyspace(keyspace);
		}
    }

2.2 Server编写

这一部分就不是用户写的了,而是属于Cassandra的一部分,org.apache.cassandra.thrift.CassandraServer就是这个server,这个类实现了Iface中定义的各种功能,其对象将会作为参数传给Processor,由于Processor中已经做了输入输出流的读取,因此CassandraServer类只需要专注于功能实现就行了,其实还是调用其他内部接口,比如下面这个方法:

    public List get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
    throws InvalidRequestException, UnavailableException, TException, TimedOutException
    {
        logger.debug("range_slice");

        String keyspace = state().getKeyspace();
        state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);

        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
        ThriftValidation.validateColumnParent(metadata, column_parent);
        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
        ThriftValidation.validateKeyRange(range);
        ThriftValidation.validateConsistencyLevel(keyspace, consistency_level);

        List rows;
        try
        {
            IPartitioner p = StorageService.getPartitioner();
            AbstractBounds bounds;
            if (range.start_key == null)
            {
                Token.TokenFactory tokenFactory = p.getTokenFactory();
                Token left = tokenFactory.fromString(range.start_token);
                Token right = tokenFactory.fromString(range.end_token);
                bounds = new Range(left, right);
            }
            else
            {
                bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
            }
            try
            {
                schedule();
                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
            }
            finally
            {
                release();
            }
            assert rows != null;
        }
        catch (TimeoutException e)
        {
        	throw new TimedOutException();
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }        return thriftifyKeySlices(rows, column_parent, predicate);
    }

这里在获得了key的bounds之后,尝试调度此线程获得资源,然后调用了StorageProxy.getRangeSlice方法。

八、总结

这篇水文从Cassandra的生成代码入手,简单介绍了.thrift文件,分析了生成代码的结构,Thrift中的Message结构,最后简单介绍了一下Thrift的结构和如何写代码去使用Thrift。

Advertisements

[转]使用Apache Avro

源地址:http://www.infoq.com/cn/articles/ApacheAvro

作者 Boris Lublinsky 译者 王恒涛 发布于 2011年3月11日

Avro[1]是最近加入到Apache的Hadoop家族的项目之一。为支持数据密集型应用,它定义了一种数据格式并在多种编程语言中支持这种格式。

Avro提供的功能类似于其他编组系统,如Thrift、Protocol Buffers等。而Avro的主要不同之处在于[2]:

  • “动态类型:Avro无需生成代码。数据总是伴以模式定义,这样就可以在不生成代码、静态数据类型的情况下对数据进行所有处理。这样有利于构建通用的数据处理系统和语言。
  • 无标记数据:由于在读取数据时有模式定义,这就大大减少了数据编辑所需的类型信息,从而减少序列化空间。
  • 不用手动分配的字段ID:当数据模式发生变化,处理数据时总是同时提供新旧模式,差异就可以用字段名来做符号化的分析。”

由于性能高、基本代码少和产出数据量精简等特点,Avro周围展开了众多活动——许多NoSQL实现,包括Hadoop、Cssandra等,都把Avro整合到它们的客户端API和储存功能中;已经有人对Avro与其他流行序列化框架做了Benchmark测试并得到结果[3],但是,目前尚无可供人们学习使用Avro的代码示例[4]。

在这篇文章中我将试着描述我使用Avro的经验,特别是:

  • 如何建立组件化Avro模式,使用组件搭建整体模式,分别保存在多个文件中
  • 在Avro中实现继承
  • 在Avro中实现多态
  • Avro文档的向后兼容性。

组件化Apache Avro模式

如Avro规范所述[5]Avro文档模式定义成JSON文件。在当前Avro实现中,模式类需要一个文件(或字符串)来表示内部模式。同XML模式不一样,Avro当前版本不支持向模式文档中导入(一个或多个)子模式,这往往迫使开发者编写非常复杂的模式定义[6],并大大复杂化了模式的重用。下面的代码示例给出了一个有趣的拆分和组合模式文件的例子。它基于模式类提供的一个toString()方法,该方法返回一个JSON字符串以表示给定的模式定义。用这种办法,我提供了一个简单AvroUtils,能够自动完成上述功能:

package com.navteq.avro.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;

public class AvroUtils {

        private static Map<String, Schema> schemas = new HashMap<String, Schema>();

        private AvroUtils(){}

        public static void addSchema(String name, Schema schema){
               schemas.put(name, schema);

        }

        public static Schema getSchema(String name){
               return schemas.get(name);

        }

        public static String resolveSchema(String sc){

                String result = sc;
                for(Map.Entry<String, Schema> entry : schemas.entrySet())
                      result = replace(result, entry.getKey(),
                                        entry.getValue().toString());
                return result;

        }

        static String replace(String str, String pattern, String replace) {

                int s = 0;
                int e = 0;
                StringBuffer result = new StringBuffer();
                while ((e = str.indexOf(pattern, s)) >= 0) {
                result.append(str.substring(s, e));
                result.append(replace);
                s = e+pattern.length();

        }
        result.append(str.substring(s));
        return result.toString();

}

public static Schema parseSchema(String schemaString){

        String completeSchema = resolveSchema(schemaString);
        Schema schema = Schema.parse(completeSchema);
        String name = schema.getFullName();
        schemas.put(name, schema);
        return schema;

}

public static Schema parseSchema(InputStream in)throws IOException {

    StringBuffer out = new StringBuffer();
    byte[] b = new byte[4096];
    for (int n; (n = in.read(b)) != -1;) {
     out.append(new String(b, 0, n));
    }
    return parseSchema(out.toString());

}

public static Schema parseSchema(File file)throws IOException {

        FileInputStream fis = new FileInputStream(file);
        return parseSchema(fis);
    }
}

清单1 AvroUtils类

这个简单实现基于全局(静态)模式注册表,它由完全限定的模式名及与之对应的对象构成。对于每一个要解析的新模式,该实现在注册表中搜索已保存的完全限定模式名,并且在给定的模式中做字符串替换。模式字符串被解析之后,它的全名和模式名都存储在注册表中。

下面是一个简单的测试,展示如何使用这个类:

package com.navteq.avro.common;

import java.io.File;

import org.junit.Test;

public class AvroUtilsTest {

       private static final String schemaDescription =
         "{ \n" +
            " \"namespace\": \"com.navteq.avro\", \n" +
            " \"name\": \"FacebookUser\", \n" +
            " \"type\": \"record\",\n" +
            " \"fields\": [\n" +
                     " {\"name\": \"name\", \"type\": [\"string\", \"null\"] },\n" +
                     " {\"name\": \"num_likes\", \"type\": \"int\"},\n" +
                     " {\"name\": \"num_photos\", \"type\": \"int\"},\n" +
            " {\"name\": \"num_groups\", \"type\": \"int\"} ]\n" +
         "}";

       private static final String schemaDescriptionExt =
         " { \n" +
             " \"namespace\": \"com.navteq.avro\", \n" +
             " \"name\": \"FacebookSpecialUser\", \n" +
             " \"type\": \"record\",\n" +
             " \"fields\": [\n" +
                      " {\"name\": \"user\", \"type\": com.navteq.avro.FacebookUser },\n" +
                      " {\"name\": \"specialData\", \"type\": \"int\"} ]\n" +
          "}";

       @Test
       public void testParseSchema() throws Exception{

               AvroUtils.parseSchema(schemaDescription);
               Schema extended = AvroUtils.parseSchema(schemaDescriptionExt);
               System.out.println(extended.toString(true));
       }
}

清单2 AvroUtils测试

在这个测试中,第一个模式的完全限定名是com.navteq.avro.FacebookUser,替换正常运行并打印出以下结果:

{
  "type" : "record",
  "name" : "FacebookSpecialUser",
  "namespace" : "com.navteq.avro",
  "fields" : [ {
    "name" : "user",
    "type" : {
      "type" : "record",
      "name" : "FacebookUser",
      "fields" : [ {
        "name" : "name",
        "type" : [ "string", "null" ]
      }, {
        "name" : "num_likes",
        "type" : "int"
      }, {
        "name" : "num_photos",
        "type" : "int"
      }, {
        "name" : "num_groups",
        "type" : "int"
      } ]
    }
  }, {
    "name" : "specialData",
    "type" : "int"
  } ]
}

清单3 AvroUtilsTest的执行结果

使用Apache Avro实现继承

一种常见的定义数据的方法是通过继承——使用现有的数据定义并添加参数。虽然技术上Avro不支持继承[7],但要是实现一个类继承的结构非常简单。

如果我们有一个基类的定义——FacebookUser,如下:

{
"namespace": "com.navteq.avro",
"name": "FacebookUser",
"type": "record",
"fields": [
  {"name": "name", "type": ["string", "null"] },
  {"name": "num_likes", "type": "int"},
  {"name": "num_photos", "type": "int"},
  {"name": "num_groups", "type": "int"} ]
}

清单4 Facebook用户记录的定义

要创建一个FacebookSpecialUser定义非常简单,它大概是这样的:

{
    "namespace": "com.navteq.avro",
  "name": "FacebookSpecialUser",
  "type": "record",
  "fields": [
    {"name": "user", "type": com.navteq.avro.FacebookUser },
      {"name": "specialData", "type": "int"}
    ]
}

清单5 Facebook特殊的用户记录的定义

一个特殊的用户定义包含两个字段——Facebook的用户类型的用户和一个int类型的数据字段。

特殊Facebook用户的简单测试类如下:

package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestSimpleInheritance {

        private Schema schema;
        private Schema subSchema;

        @Before
        public void setUp() throws Exception {

                subSchema = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
                schema = AvroUtils.parseSchema(new File("resources/FacebookSpecialUser.avro"));

        }

        @Test
        public void testSimpleInheritance() throws Exception{
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                GenericDatumWriter writer =
                            new GenericDatumWriter(schema);
                Encoder encoder = new BinaryEncoder(outputStream);

                GenericRecord subRecord1 = new GenericData.Record(subSchema);
                subRecord1.put("name", new Utf8("Doctor Who"));
                subRecord1.put("num_likes", 1);
                subRecord1.put("num_photos", 0);
                subRecord1.put("num_groups", 423);
                GenericRecord record1 = new GenericData.Record(schema);
                record1.put("user", subRecord1);
                record1.put("specialData", 1);

                writer.write(record1, encoder);

                GenericRecord subRecord2 = new GenericData.Record(subSchema);
                subRecord2.put("name", new org.apache.avro.util.Utf8("Doctor WhoWho"));
                subRecord2.put("num_likes", 2);
                subRecord2.put("num_photos", 0);
                subRecord2.put("num_groups", 424);
                GenericRecord record2 = new GenericData.Record(schema);
                record2.put("user", subRecord2);
                record2.put("specialData", 2);

                writer.write(record2, encoder);

                encoder.flush();

                ByteArrayInputStream inputStream =
                        new ByteArrayInputStream(outputStream.toByteArray());
                Decoder decoder = DecoderFactory.defaultFactory().
                        createBinaryDecoder(inputStream, null);
                GenericDatumReader reader =
                        new GenericDatumReader(schema);
                while(true){
                        try{
                              GenericRecord result = reader.read(null, decoder);
                              System.out.println(result);
                        }
                        catch(EOFException eof){
                                break;
                        }
                        catch(Exception ex){
                                ex.printStackTrace();
                        }
                }
        }
}[8]

清单6 一个特殊的Facebook用户的测试类

运行这个测试类产生预期的结果:

{"user": {"name": "Doctor Who", "num_likes": 1, "num_photos": 0,
"num_groups": 423}, "specialData": 1}
{"user": {"name": "Doctor WhoWho", "num_likes": 2, "num_photos": 0,
"num_groups": 424}, "specialData": 2}

清单7 Facebook特殊用户的测试结果

如果唯一需要的是有包含基础数据和其他参数的记录,此代码工作正常,但它不提供多态性——读取相同记录时,没办法知道到底读的是哪个类型的记录。

使用ApacheAvro实现多态性

与谷歌protocol buffers不同[9],Avro不支持可选参数[10],上述继承的实现不适应于多态性的实现——这是由于必须具备特殊的数据参数。幸运的是,Avro支持联合体,允许省略某些记录的参数。下面的定义可用于创建一个多态的纪录。对于基准纪录,我将使用清单4中描述的例子。为了扩展我们将使用以下两个定义:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension1",
   "type": "record",
   "fields": [
      {"name": "specialData1", "type": "int"}
     ]
}

清单8 首条扩展记录的定义

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension2",
   "type": "record",
   "fields": [
      {"name": "specialData2", "type": "int"}
     ]
}

清单9 第二条扩展记录的定义

有了以上两个定义一个多态记录可以定义如下:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension1", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1, "null"]},
        {"name": "extension2", "type":
            [com.navteq.avro.FacebookSpecialUserExtension2, "null"]}
      ]
}

清单10 Facebook特殊用户的多态定义

这里扩展1和扩展2都是可选的且二者皆可。为了使处理更简单,我添加了一个类型字段,可以用来明确定义的记录类型。

下面给出一个更好的多态记录的定义:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser1",
   "type": "record",
   "fields": [
      {"name": "type", "type": "string" },
      {"name": "user", "type": com.navteq.avro.FacebookUser },
        {"name": "extension", "type":
            [com.navteq.avro.FacebookSpecialUserExtension1,
            com.navteq.avro.FacebookSpecialUserExtension2,
            "null"]}
      ]
}

清单11 Facebook特殊用户的改进多态定义

下面给出一个多态Facebook特殊用户的简单测试类:

package com.navteq.avro.inheritance;
package com.navteq.avro.inheritance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.util.Utf8;
import org.junit.Before;
import org.junit.Test;

import com.navteq.avro.common.AvroUtils;

public class TestInheritance {

        private Schema FBUser;
        private Schema base;
        private Schema ext1;
        private Schema ext2;

        @Before
        public void setUp() throws Exception {

                 base = AvroUtils.parseSchema(new File("resources/facebookUser.avro"));
                 ext1 = AvroUtils.parseSchema(
                         new File("resources/FacebookSpecialUserExtension1.avro"));
                 ext2 = AvroUtils.parseSchema(
                         new File("resources/FacebookSpecialUserExtension2.avro"));
                 FBUser = AvroUtils.parseSchema(new File("resources/FacebooklUserInheritance.avro"));
}

        @Test
        public void testInheritanceBinary() throws Exception{
                 ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                 GenericDatumWriter writer =
                         new GenericDatumWriter(FBUser);
                 Encoder encoder = new BinaryEncoder(outputStream);

                 GenericRecord baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new Utf8("Doctor Who"));
                 baseRecord.put("num_likes", 1);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 423);
                 GenericRecord FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "base");
                 FBrecord.put("user", baseRecord);

                 writer.write(FBrecord, encoder);

                 baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new Utf8("Doctor WhoWho"));
                 baseRecord.put("num_likes", 1);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 423);
                 GenericRecord extRecord = new GenericData.Record(ext1);
                 extRecord.put("specialData1", 1);
                 FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "extension1");
                 FBrecord.put("user", baseRecord);
                 FBrecord.put("extension", extRecord);

                 writer.write(FBrecord, encoder);

                 baseRecord = new GenericData.Record(base);
                 baseRecord.put("name", new org.apache.avro.util.Utf8("Doctor WhoWhoWho"));
                 baseRecord.put("num_likes", 2);
                 baseRecord.put("num_photos", 0);
                 baseRecord.put("num_groups", 424);
                 extRecord = new GenericData.Record(ext2);
                 extRecord.put("specialData2", 2);
                 FBrecord = new GenericData.Record(FBUser);
                 FBrecord.put("type", "extension2");
                 FBrecord.put("user", baseRecord);
                 FBrecord.put("extension", extRecord);

                 writer.write(FBrecord, encoder);

                 encoder.flush();                 byte[] data = outputStream.toByteArray();
                 ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
                 Decoder decoder =
                        DecoderFactory.defaultFactory().createBinaryDecoder(inputStream, null);
                 GenericDatumReader reader =
                        new GenericDatumReader(FBUser);
                 while(true){
                        try{
                               GenericRecord result = reader.read(null, decoder);
                               System.out.println(result);
                        }
                        catch(EOFException eof){
                               break;
                        }
                        catch(Exception ex){
                               ex.printStackTrace();
                        }
                 }
        }
}

清单12 一条多态Facebook用户记录的测试类

运行这个测试类产生的预期结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}}

清单13 多态Facebook用户记录测试的执行结果

使用ApacheAvro的向后兼容性

XML的优势之一就是当模式定义使用可选参数扩展时具备向后兼容性。我们介绍一个第三扩展记录的定义来测试Avro的这个特性:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUserExtension3",
   "type": "record",
   "fields": [
      {"name": "specialData3", "type": "int"}
   ]
}

清单14 第三扩展记录的定义

多态记录的变更定义如下:

{
     "namespace": "com.navteq.avro",
   "name": "FacebookSpecialUser11",
   "type": "record",
   "fields": [
     {"name": "type", "type": "string" },
     {"name": "user", "type": com.navteq.avro.FacebookUser },
       {"name": "extension", "type":
          [com.navteq.avro.FacebookSpecialUserExtension1,
          com.navteq.avro.FacebookSpecialUserExtension2,
          com.navteq.avro.FacebookSpecialUserExtension3,
          "null"]}
     ]
}

清单15 Facebook特殊用户的改进多态定义

为了能读取清单15中记录定义中的记录,清单12中的代码在修改后(但仍然用清单11中的记录定义来写数据)生成下列结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": {"specialData3": 10}}
java.lang.ArrayIndexOutOfBoundsException
      at java.lang.System.arraycopy(Native Method)
      at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:331)       at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:265)       at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:99)       at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:318)       at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:312)       at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:120)       at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:142)       at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:114)       at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105)       at com.navteq.avro.inheritance.TestInheritance.testInheritanceBinary(TestInheritance.java:119)       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)       at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)       at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)       at java.lang.reflect.Method.invoke(Unknown Source)

清单16 多态Facebook用户记录对扩展定义测试的执行结果

虽然Avro提供了一个能够解决这个问题的API——GenericDatumReader<GenericRecord>构造函数可以使用两个参数——分别用来写记录与读记录的模式,但这不总是解决向后兼容问题的一定可行的方法,因为它必须要记住用来写每条记录的所有模式。

一个更合适的解决方案是:从二进制编码器/解码器(它建立记录的二进制表象)切换到JSON编码器/解码器。在这种情况下代码有效,并产生以下结果:

{"type": "base", "user": {"name": "Doctor Who", "num_likes": 1, "num_photos":
0, "num_groups": 423}, "extension": null}
{"type": "extension1", "user": {"name": "Doctor WhoWho", "num_likes": 1,
"num_photos": 0, "num_groups": 423}, "extension": {"specialData1": 1}}
{"type": "extension2", "user": {"name": "Doctor WhoWhoWho", "num_likes": 2,
"num_photos": 0, "num_groups": 424}, "extension": {"specialData2": 2}}

清单17 应用JSON编码多态Facebook用户记录对扩展定义测试的执行结果

通过JSON的编码器,实际的数据转换成JSON:

{"type":"base","user":{"name":{"string":"Doctor
Who"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":null}
{"type":"extension1","user":{"name":{"string":"Doctor
WhoWho"},"num_likes":1,"num_photos":0,"num_groups":423},"extension":{"FacebookSpecialUserExtension1":{"specialData1":1}}}
{"type":"extension2","user":{"name":{"string":"Doctor
WhoWhoWho"},"num_likes":2,"num_photos":0,"num_groups":424},"extension":{"FacebookSpecialUserExtension2":{"specialData2":2}}}

清单18 JSON编码下所转换的数据

还有一个需要考虑的问题,在我的测试中,同样的数据在二进制编码下产生的Avro记录的大小为89字节,而在JSON编码下产生了473字节。

结论

当前实现的Avro不直接支持模式的组件化或模式组件重用,但像本文中描述的一个简单的框架能够为这些特性提供支持。尽管Avro不直接支持多态性,文中利用适当的模式设计可以简单地实现多态数据模式。至于真正意义上向后兼容性问题,只有使用JSON编码的时候Avro才支持[11]。最后一点和Avro的特性没有多大关系,更多的是来自JSON。最后一点严重限制了Avro适用性(如果向后兼容性是必须的),使其使用范围局限为一种高级的JSON编组和处理API。

除了一般的(这里所用到的)Avro方法,也可以使用一个特定的Avro。这时候,可通过(Avro)生产特定的记录而非普通的记录。尽管有些说法指出[12]Avro的特定应用能够获得性能提升,以我使用当前Avro版本(1.4.1)的经验来看,两者有着同样的性能表现。


[1] http://hadoop.apache.org/avro/

[2] http://avro.apache.org/docs/1.4.1/

[3] http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

[4] 我在Avro编组Avro Map Reduce发现的几篇

[5] http://avro.apache.org/docs/current/spec.html

[6] 很有趣,Avro IDL支持子IDL

[7] 与明确支持类型定义中的基类型的XML不同

[8] 关于上面的代码需要指出的一点是,模式解析是在构造函数中完成的,原因在于构造解析是Avro实现中最昂贵的操作。

[9] http://code.google.com/p/protobuf/

[10] Avro支持“Null”,这不同于可选参数,在Avro中“Null”表示某个属性没有值

[11] 或者如果有旧版本的模式

[12] http://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking

查看英文原文:Using Apache Avro


感谢马国耀对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家加入到InfoQ中文站用户讨论组中与我们的编辑和其他读者朋友交流。

[转]Log4j使用总结

一、介绍

Log4j是Apache的一个开放源代码项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件、甚至是套接口服务 器、NT的事件记录器、UNIX Syslog守护进程等;我们也可以控制每一条日志的输出格式;通过定义每一条日志信息的级别,我们能够更加细致地控制日志的生成过程。

Log4j由三个重要的组件构成:日志信息的优先级,日志信息的输出目的地,日志信息的输出格式。日志信息的优先级从高到低有ERROR、WARN、 INFO、DEBUG,分别用来指定这条日志信息的重要程度;日志信息的输出目的地指定了日志将打印到控制台还是文件中;而输出格式则控制了日志信息的显示内容。

二、配置文件

其实您也可以完全不使用配置文件,而是在代码中配置Log4j环境。但是,使用配置文件将使您的应用程序更加灵活。

Log4j支持两种配置文件格式,一种是XML格式的文件,一种是properties格式的文件。下面我们介绍使用properties格式做为配置文件的方法:

示例:

log4j.rootLogger=INFO, A1

log4j.appender.A1=org.apache.log4j.ConsoleAppender

log4j.appender.A1.layout=org.apache.log4j.PatternLayout

log4j.appender.A1.layout.ConversionPattern=%-4r %-5p [%t] %37c %3x – %m%n

1. 配置根Logger,其语法为:

log4j.rootLogger = [ level ] , appenderName, appenderName, …

其中,level 是日志记录的优先级,分为OFF、FATAL、ERROR、WARN、INFO、DEBUG、ALL或者您定义的级别。Log4j建议只使用四个级别,优先级从高到低分别是ERROR、WARN、INFO、DEBUG。通过在这里定义的级别,您可以控制到应用程序中相应级别的日志信息的开关。比如在这里定义了INFO级别,则应用程序中所有DEBUG级别的日志信息将不被打印出来。

appenderName就是指定日志信息输出到哪个地方。您可以同时指定多个输出目的地。

2. 配置日志信息输出目的地Appender,其语法为:

log4j.appender.appenderName = fully.qualified.name.of.appender.class

log4j.appender.appenderName.option1 = value1

log4j.appender.appenderName.option = valueN

其中,Log4j提供的appender有以下几种:

org.apache.log4j.ConsoleAppender(控制台),

org.apache.log4j.FileAppender(文件),

org.apache.log4j.DailyRollingFileAppender(每天产生一个日志文件),

org.apache.log4j.RollingFileAppender(文件大小到达指定尺寸的时候产生一个新的文件),

org.apache.log4j.WriterAppender(将日志信息以流格式发送到任意指定的地方)

(1).ConsoleAppender选项

Threshold=WARN:指定日志消息的输出最低层次。

ImmediateFlush=true:默认值是true,意谓着所有的消息都会被立即输出。

Target=System.err:默认情况下是:System.out,指定输出控制台

(2).FileAppender 选项

Threshold=WARN:指定日志消息的输出最低层次。

ImmediateFlush=true:默认值是true,意谓着所有的消息都会被立即输出。

File=mylog.txt:指定消息输出到mylog.txt文件。

Append=false:默认值是true,即将消息增加到指定文件中,false指将消息覆盖指定的文件内容。

(3).DailyRollingFileAppender 选项

Threshold=WARN:指定日志消息的输出最低层次。

ImmediateFlush=true:默认值是true,意谓着所有的消息都会被立即输出。

File=mylog.txt:指定消息输出到mylog.txt文件。

Append=false:默认值是true,即将消息增加到指定文件中,false指将消息覆盖指定的文件内容。

DatePattern=’.’yyyy-ww:每周滚动一次文件,即每周产生一个新的文件。当然也可以指定按月、周、天、时和分。即对应的格式如下:

1)’.’yyyy-MM: 每月

2)’.’yyyy-ww: 每周

3)’.’yyyy-MM-dd: 每天

4)’.’yyyy-MM-dd-a: 每天两次

5)’.’yyyy-MM-dd-HH: 每小时

6)’.’yyyy-MM-dd-HH-mm: 每分钟

(4).RollingFileAppender 选项

Threshold=WARN:指定日志消息的输出最低层次。

ImmediateFlush=true:默认值是true,意谓着所有的消息都会被立即输出。

File=mylog.txt:指定消息输出到mylog.txt文件。

Append=false:默认值是true,即将消息增加到指定文件中,false指将消息覆盖指定的文件内容。

MaxFileSize=100KB: 后缀可以是KB, MB 或者是 GB. 在日志文件到达该大小时,将会自动滚动,即将原来的内容移到mylog.log.1文件。

MaxBackupIndex=2:指定可以产生的滚动文件的最大数。

3. 配置日志信息的布局,其语法为:

log4j.appender.appenderName.layout = fully.qualified.name.of.layout.class

log4j.appender.appenderName.layout.option1 = value1

log4j.appender.appenderName.layout.option = valueN

其中,Log4j提供的layout有以下几种:

org.apache.log4j.HTMLLayout(以HTML表格形式布局),

org.apache.log4j.PatternLayout(可以灵活地指定布局模式),

org.apache.log4j.SimpleLayout(包含日志信息的级别和信息字符串),

org.apache.log4j.TTCCLayout(包含日志产生的时间、线程、类别等等信息)

4、输出格式设置

在配置文件中可以通过log4j.appender.A1.layout.ConversionPattern设置日志输出格式。

参数:

%p: 输出日志信息优先级,即DEBUG,INFO,WARN,ERROR,FATAL,

%d: 输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,比如:%d{yyy MMM dd HH:mm:ss,SSS},输出类似:2002年10月18日 22:10:28,921

%r: 输出自应用启动到输出该log信息耗费的毫秒数

%c: 输出日志信息所属的类目,通常就是所在类的全名

%t: 输出产生该日志事件的线程名

%l: 输出日志事件的发生位置,相当于%C.%M(%F:%L)的组合,包括类目名、发生的线程,以及在代码中的行数。举例:Testlog4.main(TestLog4.java:10)

%x: 输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。

%%: 输出一个”%”字符

%F: 输出日志消息产生时所在的文件名称

%L: 输出代码中的行号

%m: 输出代码中指定的消息,产生的日志具体信息

%n: 输出一个回车换行符,Windows平台为”\r\n”,Unix平台为”\n”输出日志信息换行

可以在%与模式字符之间加上修饰符来控制其最小宽度、最大宽度、和文本的对齐方式。如:

1)%20c:指定输出category的名称,最小的宽度是20,如果category的名称小于20的话,默认的情况下右对齐。

2)%-20c:指定输出category的名称,最小的宽度是20,如果category的名称小于20的话,”-”号指定左对齐。

3)%.30c:指定输出category的名称,最大的宽度是30,如果category的名称大于30的话,就会将左边多出的字符截掉,但小于30的话也不会有空格。

4)%20.30c:如果category的名称小于20就补空格,并且右对齐,如果其名称长于30字符,就从左边交远销出的字符截掉。

三、在程序中的使用

在程序中使用Log4j之前,首先要将commons-logging.jar和logging-log4j-1.2.9.jar导入到classpath中,并将log4j.properties放于src根目录中。接下来就可以使用了。

1.得到记录器

使用Log4j,第一步就是获取日志记录器,这个记录器将负责控制日志信息。其语法为:

public static Logger getLogger( String name),

通过指定的名字获得记录器,如果必要的话,则为这个名字创建一个新的记录器。Name一般取本类的名字,比如:

static Logger logger = Logger.getLogger ( ServerWithLog4j.class.getName () ) ;

注:推荐使用commons-logging结合log4j进行日志记录

private static Log logger = LogFactory.getLog(Yourclass.class);

2.插入记录信息(格式化日志信息)

当上两个必要步骤执行完毕,您就可以轻松地使用不同优先级别的日志记录语句插入到您想记录日志的任何地方,其语法如下:

Logger.debug ( Object message ) ;

Logger.info ( Object message ) ;

Logger.warn ( Object message ) ;

Logger.error ( Object message ) ;

四、Log4j比较全面的配置

LOG4J的配置之简单使它遍及于越来越多的应用中了:Log4J配置文件实现了输出到控制台、文件、回滚文件、发送日志邮件、输出到数据库日志表、自定义标签等全套功能。择其一二使用就够用了。

log4j.rootLogger=DEBUG,CONSOLE,A1,im
log4j.addivity.org.apache=true
# 应用于控制台
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.Threshold=DEBUG
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
#log4j.appender.CONSOLE.layout.ConversionPattern=[start]%d{DATE}[DATE]%n%p[PRIORITY]%n%x[NDC]%n%t[thread] n%c[CATEGORY]%n%m[MESSAGE]%n%n
#应用于文件
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File=file.log
log4j.appender.FILE.Append=false
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# Use this layout for LogFactor 5 analysis
# 应用于文件回滚
log4j.appender.ROLLING_FILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLING_FILE.Threshold=ERROR
log4j.appender.ROLLING_FILE.File=rolling.log //文件位置,也可以用变量${java.home}、rolling.log
log4j.appender.ROLLING_FILE.Append=true //true:添加 false:覆盖
log4j.appender.ROLLING_FILE.MaxFileSize=10KB //文件最大尺寸
log4j.appender.ROLLING_FILE.MaxBackupIndex=1 //备份数
log4j.appender.ROLLING_FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLING_FILE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n

#应用于socket
log4j.appender.SOCKET=org.apache.log4j.RollingFileAppender
log4j.appender.SOCKET.RemoteHost=localhost
log4j.appender.SOCKET.Port=5001
log4j.appender.SOCKET.LocationInfo=true
# Set up for Log Facter 5
log4j.appender.SOCKET.layout=org.apache.log4j.PatternLayout
log4j.appender.SOCET.layout.ConversionPattern=[start]%d{DATE}[DATE]%n%p[PRIORITY]%n%x[NDC]%n%t[thread]%n%c[CATEGORY]%n%m[MESSAGE]%n%n

# Log Factor 5 Appender
log4j.appender.LF5_APPENDER=org.apache.log4j.lf5.LF5Appender
log4j.appender.LF5_APPENDER.MaxNumberOfRecords=2000
# 发送日志给邮件
log4j.appender.MAIL=org.apache.log4j.net.SMTPAppender
log4j.appender.MAIL.Threshold=FATAL
log4j.appender.MAIL.BufferSize=10
log4j.appender.MAIL.From=web@www.wuset.com
log4j.appender.MAIL.SMTPHost=www.wusetu.com
log4j.appender.MAIL.Subject=Log4J Message
log4j.appender.MAIL.To=web@www.wusetu.com
log4j.appender.MAIL.layout=org.apache.log4j.PatternLayout
log4j.appender.MAIL.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 用于数据库
log4j.appender.DATABASE=org.apache.log4j.jdbc.JDBCAppender
log4j.appender.DATABASE.URL=jdbc:mysql://localhost:3306/test
log4j.appender.DATABASE.driver=com.mysql.jdbc.Driver
log4j.appender.DATABASE.user=root
log4j.appender.DATABASE.password=
log4j.appender.DATABASE.sql=INSERT INTO LOG4J (Message) VALUES (’[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n’)
log4j.appender.DATABASE.layout=org.apache.log4j.PatternLayout
log4j.appender.DATABASE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n

log4j.appender.A1=org.apache.log4j.DailyRollingFileAppender
log4j.appender.A1.File=SampleMessages.log4j
log4j.appender.A1.DatePattern=yyyyMMdd-HH’.log4j’
log4j.appender.A1.layout=org.apache.log4j.xml.XMLLayout
#自定义Appender
log4j.appender.im = net.cybercorlin.util.logger.appender.IMAppender
log4j.appender.im.host = mail.cybercorlin.net
log4j.appender.im.username = username
log4j.appender.im.password = password
log4j.appender.im.recipient = corlin@cybercorlin.net
log4j.appender.im.layout=org.apache.log4j.PatternLayout
log4j.appender.im.layout.ConversionPattern =[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n