谈disruptor的单线程数据库操作

对远程数据库的操作,采用disruptor能够很好解决死锁,

  首先是定义一个抽象类,实现Runnable接口


public abstract class  Task implements Runnable  {

public Task(){}

}

public class TaskEvent {

private Task tk;

public Task getTask() {

return tk;

}

public void setTask(Task tk) {

this.tk = tk;

}

public final static EventFactory<TaskEvent> EVENT_FACTORY = new EventFactory<TaskEvent>() {

public TaskEvent newInstance() {

return new TaskEvent();

}

};

public class TaskEventHandler implements EventHandler<TaskEvent> {

//  执行接口函数onEvent执行

public void onEvent(TaskEvent event, long sequence,

boolean endOfBatch) throws Exception {

event.getTask().run();

}

}

}

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import com.zhenhai.bonecp.CustomThreadFactory;

import com.zhenhai.disruptor.BatchEventProcessor;

import com.zhenhai.disruptor.RingBuffer;

import com.zhenhai.disruptor.SequenceBarrier;

import com.zhenhai.disruptor.YieldingWaitStrategy;

import com.zhenhai.disruptor.dsl.ProducerType;

/**

*     使用方法

DisruptorHelper.initAndStart();

Task tt=new Taska();

DisruptorHelper.produce(tt);

DisruptorHelper.shutdown();

*

*

*/

public class DisruptorHelper {

/**

* ringbuffer容量,最好是2的N次方

*/

private static final int BUFFER_SIZE = 1024 * 1;

private static int group=2;

private RingBuffer<TaskEvent> ringBuffer[];

private SequenceBarrier sequenceBarrier[];

private TaskEventHandler handler[];

private BatchEventProcessor<TaskEvent> batchEventProcessor[];

private  static DisruptorHelper instance;

private static boolean inited = false;

private static ScheduledExecutorService taskTimer=null;

//JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

private    ExecutorService execute[];

//启动监视线程

static {

System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!");

instance = new DisruptorHelper();

instance.init();

inited = true;

System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!");

}

**

* 静态类

* @return

*/

private DisruptorHelper(){ }

/**

* 初始化

*/

private void init(){

execute=new ExecutorService[group];

ringBuffer=new RingBuffer[group];

sequenceBarrier=new SequenceBarrier[group];

handler=new TaskEventHandler[group];

batchEventProcessor=new BatchEventProcessor[group];

////////////////定时执行////////////////

//初始化ringbuffer,存放Event

for(int i=0;i<group;i++){

ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());

sequenceBarrier[i] = ringBuffer[i].newBarrier();

handler[i] = new TaskEventHandler();

batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);

ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());

execute[i]= Executors.newSingleThreadExecutor();

execute[i].submit(instance.batchEventProcessor[i]);

}

this.taskTimer =  Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));

inited = true;

}

/**

* 执行定时器

* @param tk

*/

private void produce(int index,Task tk){

//System.out.println("index:="+index);

if(index<0||index>=group) {

System.out.println("out of group index:="+index);

return;

}

// if capacity less than 10%, don't use ringbuffer anymore

System.out.println("capacity:="+ringBuffer[index].remainingCapacity());

if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {

System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");

// do something

}else {

long sequence = ringBuffer[index].next();

//将状态报告存入ringBuffer的该序列号中

ringBuffer[index].get(sequence).setTask(tk);

//通知消费者该资源可以消费

ringBuffer[index].publish(sequence);

}

}

/**

* 获得容器的capacity的数量

* @param index

* @return

*/

private long  remainingcapacity(int index){

//System.out.println("index:="+index);

if(index<0||index>=group) {

System.out.println("out of group index:="+index);

return 0L;

}

long capacity= ringBuffer[index].remainingCapacity();

return capacity;

}

private void shutdown0(){

for(int i=0;i<group;i++){

execute[i].shutdown();

}

}

////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////

/**

* 直接消费

* @param tk

*/

public static void addTask(int priority,Task tk){

instance.produce(priority,tk);

}

/**

* 定时消费

* @param tk

* @param delay

* @param period

*/

public static void scheduleTask(int priority,Task tk,long delay,long period){

Runnable timerTask = new ScheduledTask(priority, tk);

taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);

}

/**

* 定点执行

* @param tk

* @param hourse

* @param minus

* @param sec

* @return

*/

public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)

{

Runnable timerTask = new ScheduledTask(priority, tk);

//每天2:30分执行

long delay = Helper.calcDelay(hourse,minus,sec);

long period = Helper.ONE_DAY;

System.out.println("delay:"+(delay/1000)+"secs");

taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);

return timerTask;

}

//对定时执行的程序进行分装

private static class ScheduledTask implements Runnable

{

private int priority;

private Task task;

ScheduledTask(int priority, Task task)

{

this.priority = priority;

this.task = task;

}

public void run()

{

try{

instance.produce(priority,task);

}catch(Exception e){

System.out.println("catch exception in DisruptorHelper!");

}

}

}

public static long getRemainingCapatiye(int index){

return instance.getRemainingCapatiye(index);

}

public static void shutdown(){

if(!inited){

throw new RuntimeException("Disruptor还没有初始化!");

}

instance.shutdown0();

}

}

最新内容请见作者的GitHub页:http://qaseven.github.io/

时间: 2024-08-01 23:58:40

谈disruptor的单线程数据库操作的相关文章

使用JAVA实现高并发无锁数据库操作步骤分享_java

1. 并发中如何无锁.一个很简单的思路,把并发转化成为单线程.Java的Disruptor就是一个很好的例子.如果用java的concurrentCollection类去做,原理就是启动一个线程,跑一个Queue,并发的时候,任务压入Queue,线程轮训读取这个Queue,然后一个个顺序执行. 在这个设计模式下,任何并发都会变成了单线程操作,而且速度非常快.现在的node.js, 或者比较普通的ARPG服务端都是这个设计,"大循环"架构.这样,我们原来的系统就有了2个环境:并发环境 +

利用对象池优化数据库操作

说到对象池,大家都不陌生.很多人都实现过,网上的代码也满天飞.说到连接池,更是谁人不知, 哪家不晓.也有不少人自己实现了连接池,试图对数据访问进行优化.归纳了一下,比较常见的思路如下 : 1.数据库连接的打开比较耗费资源,如果能避免重复的开关,可以提高效率. 2.如果有一个一直打开数据库连接,直到程序结束其生命才随之结束.长时间打开并重复使用将导致 连接对象的不稳定. 3.无法保证打开的连接一点问题都不出. 4.自动化管理数据库连接,解决频繁创建.分配.释放带来的问题 然而本人认为,这些担心相对

【技术干货】缓存随谈系列之一:数据库缓存

本文作者:   乔锐杰    现担任上海驻云信息科技有限公司运维总监/架构师.曾任职过黑客讲师.java软件工程师/网站架构师.高级运维.阿里云架构师等职位.维护过上千台服务器,主导过众安保险.新华社等千万级上云架构.在云端运维.分布式集群架构等方面有着丰富的经验. 以下正文 ​ 我是个很懒的人,喜欢自己偷着练"葵花宝典",唯一可以看到我之前网上写的安全方面的文章,还是好几年前的事情了.公司最近来了一群美女,可是热闹了,写稿奖励美女,我老兴奋了. 说起缓存相关技术,老多了, memca

自己动手写ASP.NET ORM框架(二):AdoHelper支持多数据库操作的封装(2)

在上一篇文章中已经分析了AdoHelper的部分代码,接下来将继续分析剩余的部分代码,这里分析ExecuteNonQuery方法的实现,代码块1-1: // <summary>//通过提供的参数,执行无结果集的数据库操作命令// 并返回执行数据库操作所影响的行数.// </summary>// <param name="connectionString">数据库连接字符串</param>// <param name="co

教您使用XML封装数据库操作语句的实现

xml|封装|数据|数据库|语句 在项目开发的过程当中,项目组开发成员的编程风格差异和数据库操作语句SQL的灵活性给项目组带来了越来越多的操作和维护难度. 比如: 从user表中取出所有数据,有的人会写成"select * from user",有的人会写成"select all from user",虽然在操作中不会有任何的错误,但在其他人读程序的过程时就会产生不好的感觉. 如果这种程序差异在项目中的数量级很多,那么在开发的过程当中程序就会出现各种各样的风格,在维

PHP实现的一个简单的数据库操作类

PHP实现的一个简单的数据库操作类 实现的功能: - 在实例化的时候能设置连接字符集 - 在实例化的时候能连接数据库 - 在实例化的时候能选择默认数据库 - 销毁对象时关闭数据库 代码如下: <?php // 数据库操作类MySQLDB class MySQLDB { // 声明属性 private $server; private $username; private $password; public $default_db; public $link; // 声明构造函数 public f

[原创] EasyASP v1.5简化ASP开发,包含数据库操作类

EasyASP v1.5简化ASP开发,包含数据库操作类 EasyASP是一个方便快速开发ASP的类,其中包含了一个数据库控制类(原clsDbCtrl.asp,对原代码作了优化和修改,包含对数据库的各类操作及存储过程的调用,全部封装在Easp.db中,使用起来会更方便,调用也更简单).而Easp类中提供了大量实用的ASP通用过程及方法,可以简化大部分的ASP操作.目前只提供了VBScript版,JScript版将来可能会提供.详细说明请下载帮助手册,里面有非常详细的使用方法说明及源码范例. 源码

简化你的ASP编写工作 (原数据库操作类clsdbCtrl.asp)

首先,认为ASP已经过时的人,请忽略此帖,谢谢.俺这里只是提供一种思路,希望能给很多还在学习和使用ASP的朋友一种方便.尽管俺现在也很少用ASP了,但俺相信只要能满足应用,就没有什么过时不过时的,也许它的语言没那么高级,但做出来的应用不一定就低级,呵呵.     EasyASP是一个方便快速开发ASP的类,其中包含了一个数据库控制类(原clsDbCtrl.asp,对原代码作了优化和修改,包含对数据库的各类操作及存储过程的调用,全部封装在Easp.db中,使用起来会更方便,调用也更简单).而Eas

Python数据库操作手册

数据|数据库 数据库的操作在现在的Python里面已经变得十分的好用,有了一套API标准.下面的就是讲讲如何的去使用这套框架定义.此框架包含以下部分模块接口 连接对象 游标对象 DBI辅助对象 数据类型与定义 如何实现的提示 从1.0到2.0的变化 例子 模块接口connect(parameters...) 其中的参数格式如下: dsn 数据源名称user 用户名(可选)password 密码(可选)host 主机名(可选)database 数据库名(可选)举个例子: connect(dsn='