webtail 文件读取,文件监控,websocket

文件读取

先介绍下背景,之前遇到过服务器上需要长时间tail一个日志,之前经常是通过一个终端连到服务器上,但是对于长时间观察,就不太方便:老是要开着终端,没法用手机等移动设备查看;多人共享查看比较麻烦,都需要登录到服务器上。

webtail实现类似于tail,能够持续读取一个文件,并将文件内容通过websocket,实时推送到web端。webtail文件读取基于linux的inotify,所以没有可移植性,websocket使用基于asio的websocketpp,代码维护在这里。

webtail代码维护在https://gitorious.org/webtail/webtail,目前还是一个简单可用的小应用,还有很多可以提升的地方。

本文介绍下文件读取的部分。

文件读取分为两个部分,文件读取和文件监控。

文件读取:
由于unix的文件多次打开独立维护file table,共享v-node table(参照APUE 3.10 file sharing)。因此只维护一个文件描述符和当前文件偏移,每次读取通过fstat获取文档当前大小,和维护的文件偏移进行对比,如果小于文件大小,则读取文件直到末尾,并输出。
这里有几点是模仿tailf的:

首先是不修改文件访问时间。这是通过在open系统调用中,增加O_NOATIME,按照man open的解释:Do not update the file last access time (st_atime in the inode) when the file is read(2). This flag is intended for use by indexing or backup programs, where its use can significantly reduce the amount of disk activity. This flag may not be effective on all file systems. One example is NFS, where the server main?tains the access time. 启动了这个参数,通过read调用读取文件的时候,不会修改atime了。

其次是首次读取的时候,读取文件最后10行(由于websocket发送没有做缓存,所以从web端没法看见)。这里代码也是参照了tailf:

 代码如下 复制代码
char *buffer = new char[initLen * BUFSIZ];
char *p = buffer;
char lineBuffer[BUFSIZ];
int readSize;
int lineCount = 0;
bool head = true;
int i = 0, j = 0;
while((readSize = ::read(fd, lineBuffer, BUFSIZ - 1)) > 0) {
    for(i = 0, j = 0; i < readSize; ++i) {
        // read line, save to buffer
        if(lineBuffer[i] == 'n') {
            std::memcpy(p, lineBuffer + j, i - j + 1);
            std::memset(p + i - j + 1, '', 1);
            j = i + 1;
            if(++lineCount >= initLen) {
                lineCount = 0;
                head = false;
            }
            p = buffer + (lineCount * BUFSIZ);
        }
    }
    // read break in the middle of line
    if(j < i) {
        // finished read all files
        if(readSize < BUFSIZ) {
            std::memcpy(p, lineBuffer + j, i - j + 1);
            std::memset(p + i - j + 1, '', 1);
            ++lineCount;
        } else if (j == 0){     // long line drop?
            continue;
        } else {
            // not finished, seek to line begin
            curPos = lseek(fd, j - i -1, SEEK_CUR);
        }
    }
}

std::string initReadResult;
if(head) {
    for(i = 0; i < lineCount; ++i) {
        initReadResult += (buffer + i * BUFSIZ);
    }
} else {
    for(i = lineCount; i < initLen; ++i) {
        initReadResult += (buffer + i * BUFSIZ);
    }
  
    for(i = 0; i < lineCount; ++i) {
        initReadResult += (buffer + i * BUFSIZ);
    }
}

curPos = lseek(fd, 0, SEEK_CUR);
delete buffer;

首先声明一段用来保存最终n行的缓存,缓存最长行长度是BUFSIZ(这个在linux中的定义是8192字节),然后每次读取BUFSIZ-1个字节(最后一个用来放,类似fgets的实现),解析其中的换行符(由于只打算在linux中使用,所以只解析n)。最后根据状态,从缓存中读取最后n行数据到string中。

如果不是初始读取,之前已经说过逻辑了,将fstat获取到的文件大小和保存的当前偏移做比较,如果有新的内容,则读取,没有就直接返回。

文件监控

文件监控直接通过了linux的inotify接口实现。这里没有考虑移植性,也就没像tailf那样,通过宏来判断是否支持inotify,如果不支持,降级使用循环轮寻的方式读取。

inotify的使用还是比较方便的基本上就是:inotify_init,inotify_add_watch,然后配合read系统调用,获取文件修改信息。因此实现也非常方便。

首先是在构造函数里面初始化inotify:

inotifyFd = inotify_init();

然后提供一个watch接口,通过传入前文描述的TFile对象和内容读取的回调函数,添加对应文件的监控和回调。

 代码如下 复制代码
void FileWatcher::watch ( boost::shared_ptr< TFile > tFile, std::list< FileWatcher::ReadCallBack > callBackList )
{
    if(!tFile->hasError() && !callBackList.empty()) {
        int wd = inotify_add_watch(inotifyFd, tFile->name().c_str(), IN_MODIFY);
        if(wd > 0) {
            tFileMap.insert(std::make_pair<int, boost::shared_ptr<TFile> >(wd, tFile));
            callBackMap.insert(std::make_pair<int, std::list<ReadCallBack> >(wd, callBackList));
          
            //init read
            std::string initContent = tFile->read();
            BOOST_FOREACH(ReadCallBack &callback, callBackList) {
                callback(initContent);
            }
        }
    }
}

这里通过TFile的文件名,向内核注册添加该文件的modify事件,并且在注册成功之后,进行初始读取(这里有个小问题,由于后面websocket端没有做缓存,所以由于初始读取的时候还没有任何websocket客户端连接,所以通过web无法读取初始内容,也就是文件最后10行)。同时,这个类维护两个hashmap,分别是监听描述符wd->tFile和wd->callbacklist。

监听完成后,就是启动监听,也就是通过读取fd,感知被监听文件的变更,由于这里只监听了文件修改,那么读取到这个事件之后,就可以对该文件进行增量读取(前文已经描述了读取方法)。

 代码如下 复制代码
char * buffer = new char[inotifyBufferSize];
while(!_interrupted) {
    if(read(inotifyFd, buffer, inotifyBufferSize) < 0) {
        if(errno == EINTR) {
            // interrupt
            delete buffer;
            return;
        }
    }
    struct inotify_event *event = ( struct inotify_event * ) buffer;
    int wd = event->wd;
    BOOST_AUTO(tFileIter, tFileMap.find(wd));
    if(tFileIter != tFileMap.end()) {
        boost::shared_ptr<TFile> tFile = tFileIter->second;
        std::string content = tFile->read();
        BOOST_AUTO(iter, callBackMap.find(wd));
        if(iter != callBackMap.end()) {
            std::list<ReadCallBack> callbacks = iter->second;
            BOOST_FOREACH(ReadCallBack &callback, callbacks) {
                callback(content);
            }
        }
    }
}
delete buffer;

这里参照inotify的文档,首先读取缓冲区大小设置为

static const int inotifyBufferSize = sizeof(struct inotify_event) + NAME_MAX + 1;

也就是inotify_event结构的长度,和名字最大值。由于inotify_event是变长字段(包含一个可选的文件名字段),所以这里采用了系统限制的文件名最大值NAME_MAX,这个宏在climits中定义,在linux中大小为255字节。
然后通过系统调用read,读取文件描述符inotifyFd,这里如果没有新的事件产生,read会进入阻塞状态,节省系统资源。如果有返回,则处理返回的inotify_event对象(注意在监听modify事件的时候,是没有文件名的)。通过结构中的wd,从之前保存的hashmap中获取对应的tFile对象进行增量读取,然后再读取wd对应的回调函数,将读取内容返回。
这里有个小问题需要处理,就是如何中断读取。之前为了在gtest中能够通过单元测试的方式进行测试,通过查看手册可以知道,如果read调用被系统信号中断,会标记错误码为EINTR。所以,当读取失败的时候,可以通过对ERRNO检查,判断是否是信号中断。

由于程序会一直运行,知道通过信号终止,所以析构变的不是很重要了。这里析构函数里面通过调用inotify_rm_watch将之前保存的wd全部去掉,然后通过close调用交inotify的文件描述符关闭即可:

 代码如下 复制代码
FileWatcher::~FileWatcher()
{
    if(inotifyFd > 0) {
        boost::unordered::unordered_map<int, boost::shared_ptr<TFile> >::iterator iter;
        for(iter = tFileMap.begin(); iter != tFileMap.end(); ++iter) {
            inotify_rm_watch(inotifyFd, iter->first);
        }
      
        close(inotifyFd);
    }
}

websocket

前面介绍了服务器端如何监听和增量读取文件,这里通过基于boost asio的websocketpp,实现了一个简单的websocket服务端,能够和浏览器进行通信,将读取到的文件通过websocket协议进行实时传送。

关于websocket的简单介绍,可以参考维基百科。websocket协议,在rfc6455中定义。

websocketpp对websocket和简单的http都进行了比较好的封装,只要实现几个handler,就可以完成对连接、消息等的操作和控制。主要需要处理的,可能有以下的handler:

 代码如下 复制代码
typedef lib::function<void(connection_hdl)> open_handler;
typedef lib::function<void(connection_hdl)> close_handler;
typedef lib::function<void(connection_hdl)> http_handler;
typedef lib::function<void(connection_hdl,message_ptr)> message_handler

分别处理连接创建,连接关闭,http请求和消息请求。其中connection_hdl是连接的weak_ptr。

这里对websocket使用很简单,唯一的需求,就是维护已经建立的连接(既创建连接的时候记录,关闭连接的时候移出),然后通过将自己的回调注册到文件监控类中,实时的将消息推送到websocket客户端。

首先,维护一个set,用来保存当前已经建立的所有连接:

typedef std::set<websocketpp::connection_hdl,boost::owner_less<websocketpp::connection_hdl> > ConnectionSet;

然后在建立连接的时候插入到这个set中:

 代码如下 复制代码
void WebSocketServer::onOpen ( websocketpp::connection_hdl hdl )
{
    boost::lock_guard<boost::mutex> lock(_mutex);
    _conns.insert(hdl);
}

在连接关闭的时候,移除连接:

 代码如下 复制代码
void WebSocketServer::onClose ( websocketpp::connection_hdl hdl )
{
    boost::lock_guard<boost::mutex> lock(_mutex);
    _conns.erase(hdl);
}

另外,定义一个让filewatcher调用的回调:

 代码如下 复制代码
void WebSocketServer::write ( const std::string& content )
{
    boost::lock_guard<boost::mutex> lock(_mutex);
    BOOST_AUTO(it, _conns.begin());
    for(; it != _conns.end(); ++it) {
        _s.send(*it, content, websocketpp::frame::opcode::text);
    }
}

这个回调很简单,就是当有读取到内容的时候,遍历连接集合,向每个连接发送具体的内容。

最后,为了方便用户访问,当发现是标准http请求的时候,我们返回一个简单的html,用于显示和建立websocket连接。如果不指定具体的http_handler,websocketpp在发现请求是http的时候,会返回http的426错误(Upgrade Required)。

 代码如下 复制代码
void WebSocketServer::httpHandler ( websocketpp::connection_hdl hdl )
{
    server::connection_ptr connPtr = _s.get_con_from_hdl(hdl);;
  
    connPtr->set_status(websocketpp::http::status_code::ok);
    connPtr->set_body(htmlContent);
}

另外,还有一个小坑。按照websocketpp的example,在启动的时候都是直接调用server的listen函数,而且使用的都是只有端口号的那个实现。实际使用过程中,发现这个只有端口号的实现,直接使用了ipv6协议。虽说如果本机同时支持ipv6和ipv4的情况下,两个协议对应的端口都会监听,但是遇到了服务器上关闭了ipv6,会导致boost asio抛出address_family_not_supported异常,导致应用被迫退出。为了兼容这种方式,对这个异常进行了抓取,重新降级尝试ipv4协议,这样能够很好的在只有ipv4的服务器上进行使用。

 代码如下 复制代码
try{
    _s.listen(port);
} catch(boost::system::system_error const& e) {
    if(e.code() == boost::asio::error::address_family_not_supported) {
        _s.listen(boost::asio::ip::tcp::v4(), port);
    }
} catch (...) {
    throw;
}
时间: 2024-09-20 08:03:55

webtail 文件读取,文件监控,websocket的相关文章

java中文件读取文件操作数据库

问题描述 java中文件读取文件操作数据库 给个例子 谢谢 解决方案 连接数据库:public class DBConnect {public Connection getConnect(String driver, String url, String user, String pwd) {Connection conn = null;try {Class.forName(driver);conn = DriverManager.getConnection(url, user, pwd);}

ASP教程:读取文件和写文件

ASP教程:读取文件和写文件 读取文件操作: '------------------------------------------------- '函数名称:ReadTextFile '作用:利用AdoDb.Stream对象来读取UTF-8格式的文本文件 '---------------------------------------------------- Function ReadFromTextFile (FileUrl,CharSet)     dim str     set stm

android从资源文件中读取文件流并显示的方法_Android

本文实例讲述了android从资源文件中读取文件流并显示的方法.分享给大家供大家参考.具体如下: 在android中,假如有的文本文件,比如TXT放在raw下,要直接读取出来,放到屏幕中显示,可以这样: private void doRaw(){ InputStream is = this.getResources().openRawResource(R.raw.ziliao); try{ doRead(is); }catch(IOException e){ e.printStackTrace(

.NET Core的文件系统[1]:读取并监控文件的变化

ASP.NET Core 具有很多针对文件读取的应用.比如我们倾向于采用JSON文件来定义配置,所以应用就会涉及针对配置文件读取.如果用户发送一个针对物理文件的HTTP请求,应用会根据指定的路径读取目标文件的内容并对请求予以响应.在一个ASP.NET Core MVC应用中,针对View的动态编译会涉及到根据预定义的路径映射关系来读取目标View.这些不同应用场景都会出现一个FileProvider对象的身影,以此对象为核心的文件系统提供了统一的API来读取文件的内容并监控内容的改变. [ 本文

监控一个文件夹,如果有图片创建,就读取文件流

问题描述 本人用的FileSystemWatcher,但是文件创建的时候读取只能读取到一部分,貌似文件还没有创建完成就读取了,请教各位大神有解决方法吗?谢谢!高分悬赏 解决方案 解决方案二:绑定元以下!解决方案三:延迟一定的时间后再读取解决方案四:///<summary>///当文件夹内监控内容发生变化时///</summary>///<paramname="sender"></param>///<paramname="e

linux内核-linux io 读取文件问题

问题描述 linux io 读取文件问题 我遇到一个困扰了我很久的问题, 到时我项目挂掉好多次, 每次读硬盘过高时,项目就会挂, 我现在想问的是: 谁能告诉我, 是读哪些文件名字? linux 下面怎么查看呢...我不需要实时监控的vmstat,iostat命令哦.. 解决方案 http://www.51know.info/system_security/inotify.htmlhttp://www.infoq.com/cn/articles/inotify-linux-file-system-

Java中读取文件进度条的实现

实现功能描述: 当读取一个大文件时,一时半会儿无法看到读取结果,就需要显示一个进度条,是程序员明白已经读了多少文件,可以估算读取还需要多少时间. 实现这个功能比较简单,用到的类有两个:ProgressMonitorInputStream(主要是整个类) 和 ProgressMonitor ,它们在javax.swing中 大体思路,你要首先知道整个文件的大小,和当前已经读取文件的大小,获得整个文件大小的方法 ProgressMonitorInputStream monitor; /** * @p

java读取文件显示进度条的实现方法_java

实现这个功能比较简单,用到的类有两个:ProgressMonitorInputStream(主要是整个类) 和 ProgressMonitor ,它们在javax.swing中大体思路,你要首先知道整个文件的大小,和当前已经读取文件的大小,获得整个文件大小的方法 复制代码 代码如下: ProgressMonitorInputStream monitor;/*** @param 表示此进度条要依附在哪个组件上* @param 显示在此进度条上的消息* @param 需要监控的输入流*/monito

通过 File API 使用 JavaScript 读取文件

通过 File API 使用 JavaScript 读取文件 By Eric Bidelman 已发布: 六月 18th, 2010Comments: 273 简介 HTML5 终于为我们提供了一种通过 File API 规范与本地文件交互的标准方式.为了举例说明其功能,可使用 File API 在向服务器发送图片的过程中创建图片的缩略图预览,或者允许应用程序在用户离线时保存文件引用.另外,您可以使用客户端逻辑来验证上传内容的 mimetype 与其文件扩展名是否匹配,或者限制上传内容的大小.