深入浅出 jackrabbit 八 索引合并(上)

我们从文本提取的逻辑中走出来,回到主体流程。

在前面的文章中,我们可以看到一次索引创建的操作,可能会产生多个 persistentindex 对象,而这些对象其实代表着一个索引目录。随着创建索引的次数越来越多,那么索引目录也在增多,但是索引目录中的数据却不是很多,所以我们需要把多个目录合并,其实也就是索引的合并。

 

执行这个操作的类是 IndexMerger ,看其定义为:

1.class IndexMerger extends Thread implements IndexListener
2.
3./*由此可见它是一个线程,并且同时充当着listener的角色,看看它的构造方法:
4.*/
5.IndexMerger(MultiIndex multiIndex) {
6.
7.        this.multiIndex = multiIndex;
8.        setName("IndexMerger");
9.        setDaemon(true);
10.        try {
11.            mergerIdle.acquire();
12.
13.        } catch (InterruptedException e) {
14.            // will never happen, lock is free upon construction
15.            throw new InternalError("Unable to acquire mutex after construction");
16.        }
17.}

还是一个 deamon 线程。而且一构造就来了一个 mergerIdle.acquire(); 真是迫不及待啊。啥意思啊?得到一把锁,一把非阻塞的锁。

 

在创建完 IndexMerger ,那么就有可能把 PersistentIndex 加进来了,因为 Merger 类必须知道哪些 PersistentIndex 是需要 Merger 的,那么我们看看负责这段逻辑的代码:这段代码主要负责 3 个功能,一个是初始化 indexBuckets ,这个一个 ArrayList ,其中放的是需要 Merger 的 PersistentIndex 的列表,也就是我们可以认为 indexBucket 里放的还是 list ,这里有一个非常奇怪的设计,就是在初始化的时候将 PersistentIndex 按照 docnums 的范围分组了,一组就是一个 indexBucket 。

第二个是把需要加入的 PersistentIndex 加入到对应的分组中。

第三个是判断是否需要合并,如果需要就加到一个队列中,等待被合并。

先看第一段代码:

1.synchronized (lock) {
2.
3.            // initially create buckets
4.            if (indexBuckets.size() == 0) {
5.                long lower = 0;
6.// default minMergeDocs is 100
7.                long upper = minMergeDocs;
8.
9.//default maxMergeDocs is 2147483647
10.// IndexBucket实际上就是一个ArrayList
11.
12.                 while (upper < maxMergeDocs) {
13.
14.                    indexBuckets.add(new IndexBucket(lower, upper, true));
15.                    lower = upper + 1;
16.//default mergeFactor is 10
17.                    upper *= mergeFactor;
18.
19.                }
20.
21.                // one with upper = maxMergeDocs
22.
23.                indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
24.
25.                // and another one as overflow, just in case...
26.
27.                indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Long.MAX_VALUE, false));
28.            }
29. ············  

仔细阅读代码,我们发现,在初始化 indexBuckets 的代码中,其实按照范围来初始化的,比如当添加第一 IndexBucket 的时候 lower=0 , upper=100

new IndexBucket(0 100 , rue )

第二个则为: new IndexBucket(101, 100*10, t rue )

第三个则为: new IndexBucket(1001, 100*10*10, t rue )

第四个则为: new IndexBucket(10001, 100*10*10*10, t rue )

第五个则为: new IndexBucket(100001, 100*10*10*10*10, t rue )

````````````

 

一直持续下去直到 upper 小于 2147483647 ,且是 10 的最大幂。那么就是说 10 亿,当一个目录中有 10 亿个 document 的 index 数据时,这个目录将不再参与 merge 过程, indexBuckets 中总共有 8 个 IndexBucket, 不过在循环外面还有两个创建 IndexBucket 的语句,不过这两个都是不允许参加合并的,所以第 3 个参数是 false ,也就是说一共有 10 个,第九个是:

1.new IndexBucket(1000000001, 2147483647, false)  

那么第十个是: 

1.new IndexBucket(2147483648, 0x7fffffffffffffffL, false)  

搞清楚 indexBuckets 的 初始化之后,我们再来看看第二个步骤,把根据 docNums 把对应的 persistentindex 加入到 IndexBucket 中 : 

1.// put index in bucket
2.
3.            IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
4.
5.            for (int i = 0; i < indexBuckets.size(); i++) {
6.
7.                bucket = (IndexBucket) indexBuckets.get(i);
8.
9.                if (bucket.fits(numDocs)) {
10.                    break;
11.                }
12.            }
13.
14./*如果indexBuckets 没有值,那么就把Index 添加到第10个IndexBucket中,否则就从indexBuckets 的第一IndexBucket开始匹配,根据numDocs的值放到对应的IndexBucket中。*/
15.
16.            bucket.add(new Index(name, numDocs));
17.
18.            if (log.isDebugEnabled()) {
19.                log.debug("index added: name=" + name + ", numDocs=" + numDocs);
20.
21.            }
22.
23.            // if bucket does not allow merge, we don't have to continue
24.
25.//如果是最后两个IndexBucket,那么即刻退出
26.            if (!bucket.allowsMerge()) {
27.                return;
28.            }
29.
30.
31.
32./*这段代码没有什么难的,接着看第3个步骤:
33.*/            // check if we need a merge
34.//超过indexbucket中超过10个元素<其实就是10个目录>则开始合并
35.
36.            if (bucket.size() >= mergeFactor) {
37.
38.                long targetMergeDocs = bucket.upper;
39.                targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
40.
41.                // sum up docs in bucket
42.
43.                List indexesToMerge = new ArrayList();
44.
45.                int mergeDocs = 0;
46.
47.                for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs;) {
48.
49.                    indexesToMerge.add(it.next());
50.
51.                }
52./* 结合上下文,indexesToMerge.size()这值会小于2吗?????*/
53.                if (indexesToMerge.size() > 2) {
54.
55.                    // found merge
56.
57.                    Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]);
58.
59.                    bucket.removeAll(indexesToMerge);
60.
61.                    if (log.isDebugEnabled()) {
62.
63.                        log.debug("requesting merge for " + indexesToMerge);
64.
65.                    }
66.
67.                    mergeTasks.add(new Merge(idxs));
68.
69.                    log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
70.                }
71.            }

这段代码的主要功能是把 indexbucket 里的 persistentindex 信息拿出来,而且量超过 2 的话就把他们加入到一个队列中,并将它们从该 indexbucket 里删除。通过这个步骤,那么 mergeTasks 队列中就存在一些需要合并的 index 了。

 

中场总结:

通过上面的方法和前面的索引提交的文章我们得到一些重要信息:当用户把 ramdirectory 中超过 100 的 docs 的 index data 刷到 fsdirectory 中时,新建一个目录,作为这个新 fsdirectory 的目录,接着把这个 fsdirectory 对应的 PersistentIndex 加到 IndexMerger 类的某个 IndexBucket 中,接着当某个 IndexBucket 中的 PersistentIndex 数量(即这些目录的数量)超过 10 ( mergefactor )的时候,就会执行合并的操作。

那么下面的问题是,合并之后,这 10 个目录将会何去何从,它们是把另外 9 个合并到其中一个中去呢还是怎么滴?接着看吧。

 

显然,这里又用到生产消费模型,任何调用 indexAdded 方法的都属性生产者,生产者根据一些条件,有选择的把需要合并的 persistentindex 放到 mergeTasks 的队列中,有了生产者肯定存在消费者,文章开头提过, IndexMerger 类是一个 deamon 线程,看看它的 run 方法,那么就发现,其实它就是消费者。它主要完成以下几个功能:

1 判断消费者是否空闲

2 判断队列中是否有退出命令

3 如果空闲则进入 wait 状态

4 根据 persistentindex 的名字取到所有的 persistentindex

   的 IndexReader 对象

5 再创建一个新的 PersistentIndex, , 原来的 index 文件合并到这个新的目录中

 

6 将前面的 IndexReader 对象添加到 PersistentIndex 的 indexwriter 方法中,并执行 optimize 。

7 关闭这些 readers

8 根据名字删除已经被合并的 PersistentIndex 的索引文件和目录等。

我们再来看看代码,代码中已经加入了 ahuaxuan 的注释:

1.
2.public void run() {
3.
4.        for (;;) {
5.            boolean isIdle = false;
6.
7.      //队列长度为0,表示消费者处于空闲状态,那么会进入wait状态
8.            if (mergeTasks.size() == 0) {
9.
10.                mergerIdle.release();
11.
12.                isIdle = true;
13.
14.            }
15.
16./*2判断队列中是否有退出命令
17.
18.*/
19.
20.            Merge task = (Merge) mergeTasks.remove();
21.
22.            if (task == QUIT) {
23.                mergerIdle.release();
24.                break;
25.            }
26.
27.            if (isIdle) {
28.
29.                try {
30.                    mergerIdle.acquire();
31.                } catch (InterruptedException e) {
32.
33.                    Thread.interrupted();
34.                    log.warn("Unable to acquire mergerIdle sync");
35.                }
36.            }
37.
38.             log.debug("accepted merge request");
39.
40.             // reset deleted documents
41.            deletedDocuments.clear();
42.
43.            // get readers
44.
45./*4 根据persistentindex的名字取到所有的persistentindex
46.
47.   的IndexReader对象
48.
49.*/
50.
51.            String[] names = new String[task.indexes.length];
52.
53.            for (int i = 0; i < task.indexes.length; i++) {
54.                names[i] = task.indexes[i].name;
55.            }
56.
57.            try {
58.
59.                log.debug("create new index");
60.
61./*再创建一个新的PersistentIndex,原来的index文件合并到这个新的目录中
62.
63.*/
64.                PersistentIndex index = multiIndex.getOrCreateIndex(null);
65.
66.                boolean success = false;
67.
68.                try {
69.
70.                    log.debug("get index readers from MultiIndex");
71.
72.                    IndexReader[] readers = multiIndex.getIndexReaders(names, this);
73.
74.                    try {
75.
76.                        // do the merge
77.
78.                        long time = System.currentTimeMillis();
79.
80./*6 将前面的IndexReader对象添加到PersistentIndex的indexwriter方法中,并执行optimize。
81.
82.*/
83.                        index.addIndexes(readers);
84.
85.                        time = System.currentTimeMillis() - time;
86.
87.                        int docCount = 0;
88.
89.                        for (int i = 0; i < readers.length; i++) {
90.                            docCount += readers[i].numDocs();
91.                        }
92.
93.                        log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
94.
95.                    } finally {
96.                        for (int i = 0; i < readers.length; i++) {
97.
98./*7 关闭这些readers
99.
100.*/
101.
102.                            try {
103.                                readers[i].close();
104.                            } catch (IOException e) {
105.                                log.warn("Unable to close IndexReader: " + e);
106.                            }
107.                        }
108.                    }
109.
110.                    // inform multi index
111.
112.                    // if we cannot get the sync immediately we have to quit
113.
114.                    if (!indexReplacement.attempt(0)) {
115.                        log.debug("index merging canceled");
116.                        break;
117.                    }
118.
119.                    try {
120.                        log.debug("replace indexes");
121.                        multiIndex.replaceIndexes(names, index, deletedDocuments);
122.                    } finally {
123.                        indexReplacement.release();
124.                    }
125.                     success = true;
126.                 } finally {
127.
128.                    if (!success) {
129.
130.                        // delete index
131.
132.                        log.debug("deleting index " + index.getName());
133.
134./*8 根据名字删除已经被合并的PersistentIndex的索引文件和目录等。
135.
136.*/
137.                        multiIndex.deleteIndex(index);
138.                    }
139.                }
140.            } catch (Throwable e) {
141.                log.error("Error while merging indexes: " + e);
142.            }
143.        }
144.        log.info("IndexMerger terminated");
145.    }
146.

看到这里爱思考的同学们一定会意识到这里还漏了什么,是什么呢?前面讲到,一个 bucket 中超过 10 个目录,会被合并一个新的目录,那么也就是说这个新目录中至少有 1000 个 document 的索引数据,这样下来,如果我有 100000 个节点,而且恰好每个目录中之后 1000 个 document 的数据,那么就得用 100 个目录来存储数据了。这样带来的问题是,每做一次查询,都需要把 100 个 indexReader 传给 search ,即使使用多线程并行搜索,那目录数也还是太多了,而且如果是 100w 个节点,那就更不得了了,所以 jackrabbit 中一定还有机制会把这些目录合并成更大目录的逻辑。为什么这么说,因为之前在创建 indexbucket 中的时候,分了 8 个允许合并的段,而上面的逻辑只会用到前面一个 bucket ,后面的几个肯定是有用处的,那么是谁来触发它们的,它们在哪里呢?

 

我们看到在上面的 run 方法中,我们有一个方法没有讲到: multiIndex .replaceIndexes(names, index, deletedDocuments );

我们将会在这个方法中寻找到真相,同样, ahuaxuan 在代码中加入了自己的注释

/* obsoleteIndexes 是需要被删除的 dir ,因为他们的数据已经被合并到新的目录里, index 参数则表示那个对应那个新目录的 PersistentIndex , deleted 表示需要被删除的类 */

1.void replaceIndexes(String[] obsoleteIndexes,
2.                        PersistentIndex index,
3.                        Collection deleted)
4.
5.            throws IOException {
6.
7./*在multiIndex中,到处都是synchronized ,而且都是锁定multiindex对象,为啥呢? 详见后文*/
8.
9.        synchronized (this) {
10.
11./*这段代码在multiIndex#update方法中也出现过,你知道它的用途吗,其实可以猜出来*/
12.
13.            synchronized (updateMonitor) {
14.                updateInProgress = true;
15.            }
16.            try {
17.
18.                // if we are reindexing there is already an active transaction
19.
20.                if (!reindexing) {
21.
22.                    executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
23.
24.                }
25.
26.                // delete obsolete indexes
27.
28./*10个目录已经合并成一个了,那这个10个目录该删的就删,不需要犹豫*/
29.
30.                Set names = new HashSet(Arrays.asList(obsoleteIndexes));
31.
32.                for (Iterator it = names.iterator(); it.hasNext();) {
33.
34.                    // do not try to delete indexes that are already gone
35.
36.                    String indexName = (String) it.next();
37.
38.                    if (indexNames.contains(indexName)) {
39.                        executeAndLog(new DeleteIndex(getTransactionId(), indexName));
40.                    }
41.
42.                }
43.
44.                 // Index merger does not log an action when it creates the target
45.
46.                // index of the merge. We have to do this here.
47.
48./*还记得CreateIndex的作用吗?复习一下:根据名字获取PersistentIndex对象,如果名字不存在或者为null,则新建一个PersistentIndex对象,罗嗦一句,一个PersistentIndex代表一个目录*/
49.
50.                executeAndLog(new CreateIndex(getTransactionId(), index.getName()));
51.
52./*又来了AddIndex对象,还记得它的作用吗,将这个persistentIndex加入到*/
53.
54.                executeAndLog(new AddIndex(getTransactionId(), index.getName()));
55.
56.                // delete documents in index
57.
58.                for (Iterator it = deleted.iterator(); it.hasNext();) {
59.                    Term id = (Term) it.next();
60.                    index.removeDocument(id);
61.                }
62.
63.                index.commit();
64.
65.                if (!reindexing) {
66.                    // only commit if we are not reindexing
67.                    // when reindexing the final commit is done at the very end
68.                    executeAndLog(new Commit(getTransactionId()));
69.                }
70.
71.            } finally {
72.                synchronized (updateMonitor) {
73.                    updateInProgress = false;
74.                    updateMonitor.notifyAll();
75.                    releaseMultiReader();
76.                }
77.            }
78.        }
79.
80.        if (reindexing) {
81.            // do some cleanup right away when reindexing
82.            attemptDelete();
83.        }
84.    }

看完这段方法,我们发现,小的目录合并成大目录之后,这个大目录又被加到 indexbucket 等待下一次被合并,如此递归,一直当一个目录的 document 的 index 数据超过 10 亿,那么就不会再合并了.因为在前面的流程中,我们看到,ramdirectory中的数据只有满100才会加入到fsdirectory中,这意味着一开始用到的目录就是101-1000级别的目录(101-1000的目录表示这些目录中的document的index数据也只有101-1000个这个范围。)。这种目录超过10个就会合并成一个新目录。依次类推高层目录。见图中ahuaxuan的注释

说到这里,大部分人都知道了,很多参数可以控制合并的调优,这些参数在前文已经讲过了,不再赘述。
 
到这里,IndexMerger的主体流程基本上完成了,其实就是一个生产-消费模型+小目录生产大目录,大目录生成更大目录的算法,这样做的好处是什么?当然是尽量少改动索引文件,应该说是便于分布式的查询架构。但是在后文中,我们会详细分析jackrabbit还没有为分布式查询准备好的原因,它的这块设计还有待改进,人无完人,框架亦是如此,不用过于苛求,也不必抱怨,用的不爽,那么就---改它,再不行---重新实现(某个模块或者全部)。

TO BE CONTINUE 

时间: 2024-10-29 20:23:02

深入浅出 jackrabbit 八 索引合并(上)的相关文章

深入浅出 jackrabbit 九 索引合并(下)

在上文中,ahuaxuan讲到了索引创建的主体流程,但是索引合并其实还有一个较为重要的细节ahuaxuan没有详细阐述.本文中,ahuaxuan将会详细阐述这个问题          本文分成两部分内容          1  考虑应用拓机时的数据正确性问题.          2  jackrabbit是如何解决这些问题的.            而这个细节将会直接影响我们对query module的改造,这个细节虽然不难,但是却很重要,是jackrabbit中一个比较重要的设计.下面让我们一

mysql索引合并:一条sql可以使用多个索引

前言 mysql的索引合并并不是什么新特性.早在mysql5.0版本就已经实现.之所以还写这篇博文,是因为好多人还一直保留着一条sql语句只能使用一个索引的错误观念.本文会通过一些示例来说明如何使用索引合并. 什么是索引合并 下面我们看下mysql文档中对索引合并的说明: The Index Merge method is used to retrieve rows with several range scans and to merge their results into one. The

《深入理解ElasticSearch》——3.6 控制索引合并

3.6 控制索引合并 读者知道(我们已经在第1章中讨论过),在ElasticSearch中每个索引都会创建一到多个分片以及零到多个副本,也知道这些分片或副本本质上都是Lucene索引,而Lucene索引又基于多个索引段构建(至少一个索引段).索引文件中绝大部分数据都是只写一次,读多次,而只有用于保存文档删除信息的文件才会被多次更改.在某些时刻,当某种条件满足时,多个索引段会被拷贝合并到一个更大的索引段,而那些旧的索引段会被抛弃并从磁盘中删除,这个操作称为段合并(segment merging).

Oracle表段和索引段上的LOGGING与NOLOGGING

在有些情况下,对于表段和索引段可以采用记录日志的模式,也可以使用不记录日志的模式.如在对表段.索引段使用数据泵导入时,可以 使用NOLOGGING模式,而使用DATA GUARD或对可用性较高的场景中需要记录日志,甚至使用强制记录日志.本文介绍了在表段,索引段使用 LOGGING与NOLOGGING时产生redo的大小以及DIRECT INSERT APPEND 的使用方法. NOLOGGING跟数据库的运行模式有关,i和i的默认安装都是非归档模式,并且自动归档默认是禁用.在安装g.g时,可以选

第十二章——SQLServer统计信息(2)——非索引键上统计信息的影响

原文:第十二章--SQLServer统计信息(2)--非索引键上统计信息的影响 前言:         索引对性能方面总是扮演着一个重要的角色,实际上,查询优化器首先检查谓词上的统计信息,然后才决定用什么索引.一般情况下,默认会在创建索引时,索引列上均创建统计信息.但是不代表在非索引键上的统计信息对性能没有用.         如果表上的所有列都有索引,那么将会是数据库负担不起,同时也不是一个好想法,包括谓词中用到的所有列加索引同样也不是好方法.因为索引会带来负载.因为需要空间存放索引,且每个D

SQL Server 深入解析索引存储(上)

原文:SQL Server 深入解析索引存储(上) 标签:SQL SERVER/MSSQL SERVER/数据库/DBA/索引体系结构/堆 概述      最近要分享一个课件就重新把这块知识整理了一遍出来,篇幅有点长,想要理解的透彻还是要上机实践.       正文 聚集索引 --创建测试数据库 CREATE DATABASE Ixdata GO USE [Ixdata] GO ---创建测试表 CREATE TABLE Orders (ID INT PRIMARY KEY IDENTITY(1

深入浅出jackrabbit之十五 文档提取优化2.docx

/** *author:ahuaxuan *2009-10-22 */ 在上一篇文章中,我们讲到为什么要优化jackrabbit中的文档提取,同时也分析了进程模型和线程模型在分布式文档提取中的优劣. 在本文中,ahuaxuan将会介绍分布式文档提取的架构模型,以及它在整个非结构化数据库中的地位. 第二部分ahuaxuan将介绍几个用来提取文本的工具,然后将这些工具用在分布式文档提取中,以减轻jackrabbit的负担, 从这个角度看,本文是对上文的补充,这样从原因,到解决方案,以及所用到的技术工

深入浅出jackrabbit之十四 分布式文档提取

/** *author:ahuaxuan *2009-09-24 */ 前言: 本来针对jackrabbit这一系列的文章其实都是有顺序的,比如先讲索引的创建,然后讲索引的查询,等等,但是无奈总是有些横生的枝节,这些横生的枝节又让ahuaxuan有了一些新的想法.所以只能将这篇文章写到后面来了. 切入正题,今天这篇文章其实是对前面文本提取的一个补充.前面讲到文本提取的逻辑,逻辑有点小复杂,但是复杂的逻辑并不是这篇文章的关键,今天的话题是进程模型和线程模型,也许童鞋们会感到非常的奇怪,一个jack

SQL Server 2008中的nchar和nvarchar在索引方式上的一个区别

其实这两个类型最大的研究点就是在索引上的区别.现将我在使用SQL Server2008时的一个发现写下 .(注:本讨论未建立任何全文索引) 通常我们认为 like '%服装%' 这样的查询是无法利用索引的,基本就是主键扫描. 但是当我们为某个"定长字符型"字段建立 "非聚集" "唯一" 索引时发现,如果我们的查询结果只有一条的话便会利用这个索引.如果结果超过一条或者使用" 可变长字符型"字段建立的索引则不会使用索引. 另外,值