dubbox 增加google-gprc/protobuf支持

好久没写东西了,今年实在太忙,基本都在搞业务开发,晚上来补一篇,作为今年的收官博客。google-rpc 正式发布以来,受到了不少人的关注,这么知名的rpc框架,不集成到dubbox中有点说不过去。

但是grpc的思路与其它rpc(比如:avro/thrift)有些不一样,并非直接采用 "接口定义+服务实现"的套路,而是采用了"抽象类派生"的做法,见下面的示例:

 1 syntax = "proto3";
 2
 3 option java_multiple_files = true;
 4 option java_package = "com.cnblogs.yjmyzz.demo.service.api.grpc";
 5 option java_outer_classname = "GrpcHelloServiceProto";
 6
 7 package hello;
 8
 9 service GrpcHelloService {
10     rpc ping (PingRequest) returns (PingResponse) {}
11 }
12
13 message PingRequest{}
14
15 message PingResponse {
16     string message = 1;
17 }

View Code

这是一段protobuf的定义文件,最终生成的java代码为:

  1 package com.cnblogs.yjmyzz.demo.service.api.grpc;
  2
  3 import static io.grpc.stub.ClientCalls.asyncUnaryCall;
  4 import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
  5 import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
  6 import static io.grpc.stub.ClientCalls.asyncBidiStreamingCall;
  7 import static io.grpc.stub.ClientCalls.blockingUnaryCall;
  8 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
  9 import static io.grpc.stub.ClientCalls.futureUnaryCall;
 10 import static io.grpc.MethodDescriptor.generateFullMethodName;
 11 import static io.grpc.stub.ServerCalls.asyncUnaryCall;
 12 import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
 13 import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
 14 import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
 15 import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
 16 import static io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall;
 17
 18 /**
 19  */
 20 @javax.annotation.Generated(
 21     value = "by gRPC proto compiler (version 1.0.1)",
 22     comments = "Source: hello.proto")
 23 public class GrpcHelloServiceGrpc {
 24
 25   private GrpcHelloServiceGrpc() {}
 26
 27   public static final String SERVICE_NAME = "hello.GrpcHelloService";
 28
 29   // Static method descriptors that strictly reflect the proto.
 30   @io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/1901")
 31   public static final io.grpc.MethodDescriptor<com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
 32       com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> METHOD_PING =
 33       io.grpc.MethodDescriptor.create(
 34           io.grpc.MethodDescriptor.MethodType.UNARY,
 35           generateFullMethodName(
 36               "hello.GrpcHelloService", "ping"),
 37           io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest.getDefaultInstance()),
 38           io.grpc.protobuf.ProtoUtils.marshaller(com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse.getDefaultInstance()));
 39
 40   /**
 41    * Creates a new async stub that supports all call types for the service
 42    */
 43   public static GrpcHelloServiceStub newStub(io.grpc.Channel channel) {
 44     return new GrpcHelloServiceStub(channel);
 45   }
 46
 47   /**
 48    * Creates a new blocking-style stub that supports unary and streaming output calls on the service
 49    */
 50   public static GrpcHelloServiceBlockingStub newBlockingStub(
 51       io.grpc.Channel channel) {
 52     return new GrpcHelloServiceBlockingStub(channel);
 53   }
 54
 55   /**
 56    * Creates a new ListenableFuture-style stub that supports unary and streaming output calls on the service
 57    */
 58   public static GrpcHelloServiceFutureStub newFutureStub(
 59       io.grpc.Channel channel) {
 60     return new GrpcHelloServiceFutureStub(channel);
 61   }
 62
 63   /**
 64    */
 65   public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {
 66
 67     /**
 68      */
 69     public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
 70         io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
 71       asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
 72     }
 73
 74     @java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
 75       return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
 76           .addMethod(
 77             METHOD_PING,
 78             asyncUnaryCall(
 79               new MethodHandlers<
 80                 com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
 81                 com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
 82                   this, METHODID_PING)))
 83           .build();
 84     }
 85   }
 86
 87   /**
 88    */
 89   public static final class GrpcHelloServiceStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceStub> {
 90     private GrpcHelloServiceStub(io.grpc.Channel channel) {
 91       super(channel);
 92     }
 93
 94     private GrpcHelloServiceStub(io.grpc.Channel channel,
 95         io.grpc.CallOptions callOptions) {
 96       super(channel, callOptions);
 97     }
 98
 99     @java.lang.Override
100     protected GrpcHelloServiceStub build(io.grpc.Channel channel,
101         io.grpc.CallOptions callOptions) {
102       return new GrpcHelloServiceStub(channel, callOptions);
103     }
104
105     /**
106      */
107     public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
108         io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
109       asyncUnaryCall(
110           getChannel().newCall(METHOD_PING, getCallOptions()), request, responseObserver);
111     }
112   }
113
114   /**
115    */
116   public static final class GrpcHelloServiceBlockingStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceBlockingStub> {
117     private GrpcHelloServiceBlockingStub(io.grpc.Channel channel) {
118       super(channel);
119     }
120
121     private GrpcHelloServiceBlockingStub(io.grpc.Channel channel,
122         io.grpc.CallOptions callOptions) {
123       super(channel, callOptions);
124     }
125
126     @java.lang.Override
127     protected GrpcHelloServiceBlockingStub build(io.grpc.Channel channel,
128         io.grpc.CallOptions callOptions) {
129       return new GrpcHelloServiceBlockingStub(channel, callOptions);
130     }
131
132     /**
133      */
134     public com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
135       return blockingUnaryCall(
136           getChannel(), METHOD_PING, getCallOptions(), request);
137     }
138   }
139
140   /**
141    */
142   public static final class GrpcHelloServiceFutureStub extends io.grpc.stub.AbstractStub<GrpcHelloServiceFutureStub> {
143     private GrpcHelloServiceFutureStub(io.grpc.Channel channel) {
144       super(channel);
145     }
146
147     private GrpcHelloServiceFutureStub(io.grpc.Channel channel,
148         io.grpc.CallOptions callOptions) {
149       super(channel, callOptions);
150     }
151
152     @java.lang.Override
153     protected GrpcHelloServiceFutureStub build(io.grpc.Channel channel,
154         io.grpc.CallOptions callOptions) {
155       return new GrpcHelloServiceFutureStub(channel, callOptions);
156     }
157
158     /**
159      */
160     public com.google.common.util.concurrent.ListenableFuture<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> ping(
161         com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request) {
162       return futureUnaryCall(
163           getChannel().newCall(METHOD_PING, getCallOptions()), request);
164     }
165   }
166
167   private static final int METHODID_PING = 0;
168
169   private static class MethodHandlers<Req, Resp> implements
170       io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
171       io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
172       io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
173       io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
174     private final GrpcHelloServiceImplBase serviceImpl;
175     private final int methodId;
176
177     public MethodHandlers(GrpcHelloServiceImplBase serviceImpl, int methodId) {
178       this.serviceImpl = serviceImpl;
179       this.methodId = methodId;
180     }
181
182     @java.lang.Override
183     @java.lang.SuppressWarnings("unchecked")
184     public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
185       switch (methodId) {
186         case METHODID_PING:
187           serviceImpl.ping((com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest) request,
188               (io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>) responseObserver);
189           break;
190         default:
191           throw new AssertionError();
192       }
193     }
194
195     @java.lang.Override
196     @java.lang.SuppressWarnings("unchecked")
197     public io.grpc.stub.StreamObserver<Req> invoke(
198         io.grpc.stub.StreamObserver<Resp> responseObserver) {
199       switch (methodId) {
200         default:
201           throw new AssertionError();
202       }
203     }
204   }
205
206   public static io.grpc.ServiceDescriptor getServiceDescriptor() {
207     return new io.grpc.ServiceDescriptor(SERVICE_NAME,
208         METHOD_PING);
209   }
210
211 }

View Code

其中:

  public static abstract class GrpcHelloServiceImplBase implements io.grpc.BindableService {

    /**
     */
    public void ping(com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest request,
        io.grpc.stub.StreamObserver<com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_PING, responseObserver);
    }

    @java.lang.Override public io.grpc.ServerServiceDefinition bindService() {
      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
          .addMethod(
            METHOD_PING,
            asyncUnaryCall(
              new MethodHandlers<
                com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest,
                com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse>(
                  this, METHODID_PING)))
          .build();
    }
  }

就是一个抽象类,而且调用时要借助stub来实现,而stub的生成,又要借助channel,所以在集成到dubbox中时,要花点心思。 

先定义一个辅助接口:

package com.alibaba.dubbo.rpc.protocol.grpc;

import io.grpc.BindableService;
import io.grpc.Channel;

/**
 * Created by yangjunming on 16/10/7.
 */
public interface GrpcBindableService extends BindableService {

    Channel getChannel();

    void setChannel(Channel channel);
}

这个接口的目的,是为了最终调用时,能拿到channel,进而生成stub.

然后在实现具体gprc服务时,实现这个接口:

package com.cnblogs.yjmyzz.demo.service.impl.grpc;

import com.alibaba.dubbo.rpc.protocol.grpc.GrpcBindableService;
import com.cnblogs.yjmyzz.demo.service.api.grpc.GrpcHelloServiceGrpc;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingRequest;
import com.cnblogs.yjmyzz.demo.service.api.grpc.PingResponse;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;

/**
 * Created by yangjunming on 2016/11/3.
 */
@Service("grpcService")
public class HelloServiceImpl extends GrpcHelloServiceGrpc.GrpcHelloServiceImplBase implements GrpcBindableService {

    private Channel channel;

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void ping(PingRequest request,
                     StreamObserver<PingResponse> responseObserver) {
        PingResponse reply = PingResponse.newBuilder().setMessage("grpc is running").build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

这样处理后,dubbox中添加grpc的协议就方便了:

package com.alibaba.dubbo.rpc.protocol.grpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * 为dubbo-rpc添加"google-gRPC"支持
 * by 杨俊明(http://yjmyzz.cnblogs.com/)
 */
public class GrpcProtocol extends AbstractProxyProtocol {
    public static final int DEFAULT_PORT = 50051;
    private static final Logger logger = LoggerFactory.getLogger(GrpcProtocol.class);

    public int getDefaultPort() {
        return DEFAULT_PORT;
    }

    public GrpcProtocol() {
        super(IOException.class, RpcException.class);
    }

    @Override
    protected <T> Runnable doExport(T impl, Class<T> type, URL url)
            throws RpcException {

        logger.info("impl => " + impl.getClass());
        logger.info("type => " + type.getName());
        logger.info("url => " + url);

        try {
            String clsName = url.getParameter("class");
            Class<?> cls = Class.forName(clsName);
            GrpcBindableService service = (GrpcBindableService) cls.newInstance();
            final Server grpcServer = ServerBuilder.forPort(url.getPort())
                    .addService(service)
                    .build()
                    .start();
            logger.info("grpc server started !");
            return new Runnable() {
                public void run() {
                    try {
                        logger.info("Close gRPC Server");
                        grpcServer.shutdown();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            };
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException(e.getMessage(), e);
        }
    }

    @Override
    protected <T> T doRefer(Class<T> type, URL url) throws RpcException {
        logger.info("type => " + type.getName());
        logger.info("url => " + url);
        final ManagedChannel channel = ManagedChannelBuilder.forAddress(url.getHost(), url.getPort())
                .usePlaintext(true)
                .build();
        try {
            DefaultBindableService service = new DefaultBindableService();
            service.setChannel(channel);
            return (T) service;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RpcException(e.getMessage(), e);
        }
    }

}

在doExport暴露grpc服务时,通过类型转换成我们刚才定义的接口GrpcBindableService,解决了grpc服务的启动问题。

再来看如何引用这个服务,此为还要再定义一个辅助类:

package com.alibaba.dubbo.rpc.protocol.grpc;

import io.grpc.Channel;
import io.grpc.ServerServiceDefinition;

/**
 * Created by yangjunming on 16/10/7.
 */
public class DefaultBindableService implements GrpcBindableService {

    private Channel channel;

    @Override
    public Channel getChannel() {
        return channel;
    }

    @Override
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override
    public ServerServiceDefinition bindService() {
        return null;
    }
}

这个类就是刚才定义的新接口GrpcBindableService的默认实现,目的是为了能将生成的channel通过setter方法保存下来。doRefer方法利用这个类,拿到了channel,最终给到grpc服务的调用方。

客户端调用示例:

    private static void testGrpc(ConfigurableApplicationContext ctx) throws InterruptedException {
        GrpcBindableService service = ctx.getBean(GrpcBindableService.class, "grpcService");
        AbstractStub stub = GrpcHelloServiceGrpc.newBlockingStub(service.getChannel());
        PingRequest request = PingRequest.newBuilder().build();
        logger.info("\n---------gprc协议测试开始---------");
        logger.info(stub.getClass().toString());
        PingResponse response = ((GrpcHelloServiceGrpc.GrpcHelloServiceBlockingStub) stub).ping(request);
        logger.info("\tping=>" + response.getMessage());
        ((ManagedChannel) stub.getChannel()).shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

完整示例代码请参考github上我重写的dubbo-demo示例 

最后:祝大家圣诞快乐!

时间: 2024-10-30 07:26:56

dubbox 增加google-gprc/protobuf支持的相关文章

dubbo/dubbox 增加原生thrift及avro支持

(facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议. github上虽然也有朋友对dubbo做了扩展支持原生thrift,但是代码实在太多了,只需要一个类即可: Thrift2Protocal.java: package com.alibaba.dubbo.rpc

谷歌为其云数据库增加原生 MySQL 连接支持

Google 今天 宣布 已经为其云 SQL 服务 增加 原生 MySQL 连接支持.因此你可以在谷歌云平台上通过 MySQL Wire Protocol 完全管理 MySQL 服务. 该协议 (MySQL Wire Protocol) 可以让开发者在任意应用中访问复制的.受管的云 SQL 数据库,最主要的特性包括: 实现在 Google 计算引擎和 App Engine 中连接数据库的低延迟 使用你熟悉的 MySQL 管理工具 使用标准的驱动程序,例如 Connector/J, Connect

Google 为什么不支持Rss

     看到不少人发表关于Google为什么不支持Rss的问题和看法,这个问题以前不止一个人问起过我,我坚持的看法是Google在有新的赢利基础替代搜索之前是不会支持Rss的,而且我也没有看出来Google需要支持Rss的必要.「虽然我会去Hack google的服务,使得自己有Rss可用」     因为Rss太简单了,简单到将搜索引擎的门坎到了一种令Google感觉到一种压力的地步.     利用rss,可以简单的绕过搜索引擎里面最复杂的一个环节:HTML parse的过程,而这个过程,是众

google的protobuf比这样java原生的方式更有效率吗?

问题描述 google的protobuf比这样java原生的方式更有效率吗? @Override public void write(java.io.DataOutput do) throws IOException { do.writeUTF(this.string1); do.writeUTF(this.string1); do.writeLong(this.long1); } @Override public void readFields(java.io.DataInput di) th

Metasploit物联网安全渗透测试增加对硬件的支持

本文讲的是Metasploit物联网安全渗透测试增加对硬件的支持,开源的Metasploit渗透测试框架新增加了对硬件的支持,使研究人员能够借此研究物联网设备,汽车被选作首个研究用例. 自2009年10月起,安全厂商Rapid7就在引领开源Metasploit渗透测试框架项目,之前的研究以软件为主.然而,Rapid7于2月2日宣布了一项新的Metasploit的扩展功能,使安全研究人员直接链接到硬件从而进行漏洞检测. 新硬件兼容功能已被添加到开源Metasploit框架里,用户可以通过GitHu

Google Babel将支持PC、Android和iOS多个平台

[科技讯]4月11日消息,据国外媒体报道,一份谷歌关于Google Bable的备忘录显示,Google Babel除了支持PC平台以及Android平台外,iOS用户也将享受到这一应用服务, 不过遗憾的是备忘录同时也指出Google Babel将不会整合Google Voice功能.       下面则是来自备忘录的主要信息:     采用全 新的UI--我们设计了一个全新的UI,它将可以在所 有的客户端使用,而且用户的 对话体验质量也得到了提升:     时刻包括同步状态--只要用户在某台设

Google下架不支持国家的IAP应用

在中国,即使你的智能手机或平板安装了官方的Google Play应用商店,你也只能下载免费应用,因为Google Play不支持中国的应用内购买(IAP)或付费应用.Google Play只支持向32个国家和地区出售付费应用或IAP应用.现在,许多位于不支持国家内的开发者报告,他们的IAP应用被下架了.Goran Kukurin称他的应用在Google Play上架了4年之久,由于他所在国家不支持购买付费应用,所以他为付费版本实现了PayPal支付选项,但这款应用如今却被Google以违反开发商

Google 在 25 个新国家增加 Android 本地付款支持

Google近日宣布在25个新国家中为Android应用增加本地付款支持.这项重大的改变不仅意味着这些国家的用户通过本土的货币来购买应用或者内购程序,而且还意味着开发者能够更加高效从应用中盈利. http://static.oschina.net/uploads/img/201402/08080324_oRNc.jpg" > 根 据Android Police报道称本次新增的25个国家将会在未来1周内上线,这25个国家分别为澳大利亚,玻利维亚,保加利亚,智利,哥伦比亚,哥斯达黎加,埃及,印

Mac版迅雷增加前进后退手势支持

迅雷 for Mac 2.4.0已正式发布了,此次更新主要新增了Thunder Store软件卸载和用户评论等功能,同时也对基础下载速度和下载成功率进行了优化. 相关信息如下: 版本号:2.4.0 更新日期:2014.02.27 更新内容概要: Thunder: 1.新增离线任务批量取回功能: 2.新增有任务正在下载时阻止电脑休眠功能: 3. 修复当磁力链未完成种子下载时,进入离线速度为0的问题: 4.修复当BT子任务处于未运行状态时,进入离线速度为0的问题: 5. 修复部分BT和Emule任务