基于zbus的MySQL透明代理(<100行)

项目地址 https://git.oschina.net/rushmore/zbus

我们上次讲到zbus网络通讯的核心API:

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor。

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。


完成大概不到100 行的代码, Cool?Let’s roll!

首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage。

zbus.NET的IoAdaptor提供的个性化事件如下

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据
    public IoBuffer encode(Object msg) {
        if (msg instanceof IoBuffer) {
            IoBuffer buff = (IoBuffer) msg;
            return buff;
        } else {
            throw new RuntimeException("Message Not Support");
        }
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public Object decode(IoBuffer buff) {
        if (buff.remaining() > 0) {
            byte[] data = new byte[buff.remaining()];
            buff.readBytes(data);
            return IoBuffer.wrap(data);
        } else {
            return null;
        }
    }

第二步,代理服务接入:

@Override
    protected void onSessionAccepted(Session sess) throws IOException {
        Session target = null;
        Dispatcher dispatcher = sess.getDispatcher();
        try {
            target = dispatcher.createClientSession(targetAddress, this);
        } catch (Exception e) {
            sess.asyncClose();
            return;
        }
        sess.chain = target;
        target.chain = sess;
        dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
    }

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Session的chain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Override
    public void onSessionConnected(Session sess) throws IOException {
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose();
            return;
        }
        if(sess.isActive() && chain.isActive()){
            sess.register(SelectionKey.OP_READ);
            chain.register(SelectionKey.OP_READ);
        }
    }

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Override
    protected void onMessage(Object msg, Session sess) throws IOException {
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose();
            return;
        }
        chain.write(msg);
    }

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Override
    public void onSessionToDestroy(Session sess) throws IOException {
        try {
            sess.close();
        } catch (IOException e) { //ignore
        }
        if (sess.chain == null) return;
        try {
            sess.chain.close();
            sess.chain.chain = null;
            sess.chain = null;
        } catch (IOException e) {
        }
    }

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

public static void main(String[] args) throws Exception {
        Dispatcher dispatcher = new Dispatcher();
        IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
        final Server server = new Server(dispatcher, ioAdaptor, 3306);
        server.start();
    }

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;
public class TcpProxyAdaptor extends IoAdaptor {
    private String targetAddress;

    public TcpProxyAdaptor(String targetAddress) {
        this.targetAddress = targetAddress;
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public IoBuffer encode(Object msg) {
        if (msg instanceof IoBuffer) {
            IoBuffer buff = (IoBuffer) msg;
            return buff;
        } else {
            throw new RuntimeException("Message Not Support");
        }
    }

    // 透传不需要编解码,简单返回ByteBuffer数据
    public Object decode(IoBuffer buff) {
        if (buff.remaining() > 0) {
            byte[] data = new byte[buff.remaining()];
            buff.readBytes(data);
            return IoBuffer.wrap(data);
        } else {
            return null;
        }
    }

    @Override
    protected void onSessionAccepted(Session sess) throws IOException {
        Session target = null;
        Dispatcher dispatcher = sess.getDispatcher();
        try {
            target = dispatcher.createClientSession(targetAddress, this);
        } catch (Exception e) {
            sess.asyncClose();
            return;
        }
        sess.chain = target;
        target.chain = sess;
        dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
    }

    @Override
    public void onSessionConnected(Session sess) throws IOException {
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose();
            return;
        }
        if(sess.isActive() && chain.isActive()){
            sess.register(SelectionKey.OP_READ);
            chain.register(SelectionKey.OP_READ);
        }
    }

    @Override
    protected void onMessage(Object msg, Session sess) throws IOException {
        Session chain = sess.chain;
        if(chain == null){
            sess.asyncClose();
            return;
        }
        chain.write(msg);
    }

    @Override
    public void onSessionToDestroy(Session sess) throws IOException {
        try {
            sess.close();
        } catch (IOException e) { //ignore
        }
        if (sess.chain == null) return;
        try {
            sess.chain.close();
            sess.chain.chain = null;
            sess.chain = null;
        } catch (IOException e) {
        }
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {
        Dispatcher dispatcher = new Dispatcher();
        IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
        final Server server = new Server(dispatcher, ioAdaptor, 3306);
        server.setServerName("TcpProxyServer");
        server.start();
    }
}

文章转载自 开源中国社区[https://www.oschina.net]

时间: 2024-10-04 06:24:28

基于zbus的MySQL透明代理(<100行)的相关文章

100行代码实现最简单的基于FFMPEG+SDL的视频播放器(SDL1.x)【转】

转自:http://blog.csdn.net/leixiaohua1020/article/details/8652605 版权声明:本文为博主原创文章,未经博主允许不得转载.   目录(?)[-] 简介 流程图 simplest_ffmpeg_player标准版代码 simplest_ffmpeg_player_suSU版代码 结果 FFMPEG相关学习资料 补充问题     ===================================================== 最简单的

用100行javascript实现HTML 5的3D贪吃蛇游戏

js1k.com收集了小于1k的javascript小例子,里面有很多很炫很酷的游戏和特 效,今年规则又增加了新花样,传统的classic类型基础上又增加了WebGL类型, 以及允许增加到2K的++类型,多次想尝试提交个小游戏但总无法写出让自己满意 还能控制在这么小的字节范围. 自己写不出来,站在巨人肩膀总是有机会吧,想起<基于HTML5的电信网管3D 机房监控应用>这篇提到的threejs,babylonjs和Hightopo的几种基于WebGL的3D 引擎,突然想挑战下自己实现个100行J

深入理解tomcat是中间件、正向代理、反向代理、透明代理以及IIS、Apache、Tomcat、Weblogic、WebSphere

       中间件(middleware)是基础软件的一大类,属于可复用软件的范畴.顾名思义,中间件处于操作系统软件与用户的应用软件的中间. 中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源.中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯.是连接两个独立应用程序或独立系统的软件.相连接的系统,即使它们具有不同的接口,但通过中间件相互之间仍能交换信息.执行中间件的一个关键途径是信息传递.通过中间件,应用程序可以工作于多平台或OS环境.

substring-c# 截取字符串的问题 如果在一串字符中id 在100行

问题描述 c# 截取字符串的问题 如果在一串字符中id 在100行 c# 截取字符串的问题 如果在一串字符中id 在100行 我要获取id后面的10个数字 请问怎么截取呢 解决方案 string[] lines = s.Split(new string[] {"rn"}, StringSplitOptions.None); string s = lines[99]; string result = Regex.Match(s, "(?<=id)\d{10}")

MySQL CPU 占用 100% 的解决过程

MySQL CPU 占用 100% 的解决过程 服务器解决了 MySQL CPU 占用 100% 的问题,稍整理如下,希望对各位有所帮助. 朋友主机 (. MySQL CPU 占用 100% 的解决过程 今天早上仔细检查了一下.目前此网站的七日平均日 IP 为2000,PageView 为 3万左右.网站A 用的 database 目前有39个表,记录数 60.1万条,占空间 45MB.按这个数据,MySQL 不可能占用这么高的资源. 于是在服务器上运行命令,将 MySQL 当前的环境变量输出到

php基于单例模式封装mysql类完整实例_php技巧

本文实例讲述了php基于单例模式封装mysql类.分享给大家供大家参考,具体如下: 类: <?php header("content-type:text/html;charset=utf-8"); //封装一个类 /* 掌握满足单例模式的必要条件 (1)私有的构造方法-为了防止在类外使用new关键字实例化对象 (2)私有的成员属性-为了防止在类外引入这个存放对象的属性 (3)私有的克隆方法-为了防止在类外通过clone成生另一个对象 (4)公有的静态方法-为了让用户进行实例化对象

基于PHP+jQuery+MySql实现红蓝(顶踩)投票代码_php实例

先给大家展示效果图: 查看演示 下载源码 这是一个非常实用的投票实例,应用在双方观点对抗投票场景.用户可以选择支持代表自己观点的一方进行投票,本文以红蓝双方投票为例,通过前后台交互,直观展示红蓝双方投票数和所占比例,应用非常广泛. 本文是一篇综合知识应用类文章,需要您具备PHP.jQuery.MySQL以及html和css方面的基本知识. HTML 我们需要在页面中展示红蓝双方的观点,以及对应的投票数和比例,以及用于投票交互的手型图片,本例以#red和#blue分别表示红蓝双方..redhand

基于PHP和Mysql相结合使用jqGrid读取数据并显示_jquery

jqGrid可以动态读取和加载外部数据,本文将结合PHP和Mysql给大家讲解如何使用jqGrid读取数据并显示,以及可以通过输入关键字查询数据的ajax交互过程. 下面给大家展示效果图,喜欢的朋友可以阅读全文哦. jqGrid本身带有search和edit表格模块,但是这些模块会使得整个插件体积显得有点庞大,而且笔者认为jqGrid的搜索查询和编辑/添加功能不好用,所以笔者放弃jqGrid自有的search和edit表格模块,借助jquery利器来完成相关功能,符合项目的实际应用. XHTML

100行PHP代码实现socks5代理服务器_php技巧

前两天在B站上看到一个小伙纸100元组装个电脑打LOL画质流畅,突发奇想100行代码能(简单)实现个啥好玩的.我主要是做php开发的,于是就有了本文. 当然,由于php(不算swoole扩展)本身不擅长做网络服务端编程,所以这个代理,只是个玩具,离日常使用有点距离.如果想使用稳定可靠的加密(所以能禾斗学上网)代理,可以用这个:https://github.com/momaer/asocks-go也是100来行代码使用go实现. 写的过程中发现php多线程还是难的.比如我开始想每个连接新建一个线程