Nodejs 实时监控文件内容的变化及按行读取文本文件
Nodejs 实时监控文件内容的变化及按行读取文本文件
需求问题
在使用nodejs开发项目的过程中,有一个需求需要实时监控指定文件的变更并按行读取最新的文件内容,在nodejs中有相应地API么?具体怎么使用呢~~~
fs.watchFile-实时监控文件变化
nodejs的fs模块提供了watchFile方法,在文件内容每次发生变化时就触发相应回调函数。回调函数提供2个参数-当前的文件状态对象fs.state和文件发生变化时的文件状态fs.state,比较2者的最后修改时间就能知道文件内容是否发生过变化。unwatchfile取消对指定文件的监控
fs.read/fs.createReadStream-读取文件内容
在nodejs中读取文件内容最基本的API为fs.read,也可以通过创建一个文件流,监听data/end事件读取文件内容
fs.read方法可以自由的控制要读取多少内容从那个位置读取,对于一个大文件来说需要反复调用才能获取全部内容。
fs.read(fd, buffer, offset, length, position, callback)
fd为打开的文件描述符,也就数文件对象
buffer指定此次read操作读取的数据写入的缓存区对象
offset读取的数据从缓冲区对象的哪个位置开始写入
length此次read操作想要读取的数据长度
position指定从fd的哪个位置开始读取数据,指定为null,那么数据此文件的当前位置开始读取
callback 函数提供3个参数err、bytesRead,buffer-是否发生了错误,本地读到的字节长度,保存读到的数据的缓冲区
fs.createReadStream 文件流的方式则用通过事件监听的方式顺序的读取文件内容,无法从指定的位置开始读取文件内容
var fileReadStream=fs.createReadStream(filename,{ encoding:'utf8'});var fileContent="";fileReadStream.on('data',functon(data){ fileContent+=data;});fileReadStream.on('end',function(){ console.log('file read ');});
Readline模块-按行读取文件内容
nodejs提供readline模块按行读取文件流数据内容的方法。通过创建一个readline的interface对象,监听readline事件就可以获取input输入流的行数据。当输入流读取完毕后将触发close事件
var readline = require('readline');var rl = readline.createInterface({ input: fs.createReadStream(filename,{ enconding:'utf8' }), output: null});rl.on('readline',function(line){ console.log('got line content:%s',line);});
解决方案
文件内容变化的监控用fs.watchFile方法实现,最新内容的读取就只有fs.read方法可以使用了。
因为无论是使用readline模块还是使用fs.createReadStream文件流接口都无法从特定的位置(也就是上次文件内容的最后更新位置)开始读取数据,只能从头开始读取文件内容,这个对于-文件监控并读取最新的内容-这个需求来说是不合适也是完全没有效率的。
使用fs.read方法需要提供一种机制能够按行解析出数据并且能够触发消息通知监听者,有行数据读取完成。并且在读取到的数据长度不够获取没有检测到换行标识符(\r\n)时,保留这些数据。
最终的方案是这样的:
fs.watchFile检查到文件内容发生了变化
调用fs.read方法读取指定的字节数据到到buffer缓冲对象中
将获取的字节数据和上次read行解析后遗留的内容合并成一个新的buffer对象
将buffer对象通过换行符解析出行数据
每解析出一条行数据,就触发一个line事件,通知监听者已读到新的行数据
保留行解析后遗留数据
如果本次read读取到的实际数据长度小于buffer缓冲区长度,说明已经到达文件的末尾,没有更多地数据能够读取到了。回到1 等到内容变化的通知
否则回到2,并从上次读取的最后位置开始读取
如果读取行数据内容为
===END===
或者行解析后的遗留内容为===END===
,那么将调用fs.unwatchFile停止文件内容的监控并不再调用fs.read
代码
var fs=require('fs');var EM = require("events").EventEmitter;var util = require('util');var EventEmitter = require('events').EventEmitter;var newlines = [ //13, // \r 10 // \n];function createLineReader(fileName) { if (!(this instanceof createLineReader)) return new createLineReader(fileName,monitorFlag); var self=this; var currentFileUpdateFlag=0; var fileOPFlag="a+"; fs.open(fileName,fileOPFlag,function(error,fd){ var buffer; var remainder = null; fs.watchFile(fileName,{ persistent: true, interval: 1000 },function(curr, prev){ //console.log('the current mtime is: ' + curr.mtime); //console.log('the previous mtime was: ' + prev.mtime); if(curr.mtime>prev.mtime){ //文件内容有变化,那么通知相应的进程可以执行相关操作。例如读物文件写入数据库等 continueReadData(); }else{ //console.log('curr.mtime<=prev.mtime'); } }); //先读取原来文件中内容 continueReadData(); function continueReadData(){//var fileUpdateFlag=fileUpdateFlagIn;buffer=new Buffer(2048);var start = 0,i=0,tmp;fs.read(fd,buffer,0,buffer.length,null,function(err, bytesRead, buffer){ var data=buffer.slice(0,bytesRead) if(remainder != null){//append newly received data chunk //console.log("remainder length:"+remainder.length); tmp = new Buffer(remainder.length+bytesRead); remainder.copy(tmp); //data=buffer.slice(0,bytesRead); data.copy(tmp,remainder.length) data = tmp; } //console.log("data length:"+data.length); for(i=0; i<data.length; i++){ if(newlines.indexOf(data[i]) >=0){ //\r \n new linevar line = data.slice(start,i);self.emit("line", line);start = i+1; } } if(start<data.length){ remainder = data.slice(start); if(remainder.toString()==='===END==='){self.emit("end");stopWatch();return; } }else{ remainder = null; } if(bytesRead<buffer.length){ return; }else{ //console.log('~~continue~~'); continueReadData(); }}); } function stopWatch(){fs.unwatchFile(fileName); } });}util.inherits(createLineReader, EventEmitter);module.exports=createLineReader;