-
Notifications
You must be signed in to change notification settings - Fork 15
/
data_file_parser.js
106 lines (87 loc) · 2.68 KB
/
data_file_parser.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var constants = require('./constants');
var DataEntry = require('./data_entry');
var headerOffsets = constants.headerOffsets;
var sizes = constants.sizes;
var DataFileParser = module.exports = function(file) {
this.file = file;
};
util.inherits(DataFileParser, EventEmitter);
DataFileParser.prototype.parse = function() {
var self = this;
var stream = fs.createReadStream(this.file.filename, { autoClose: false });
stream.on('error', function(err) {
self.emit('error', err);
});
stream.on('end', function() {
self.emit('end');
});
var waiting = new Buffer(0);
var curlen = 0;
var lastHeaderBuf;
var lastKVLen = -1;
var state = 'findingHeader';
var posCount = 0;
stream.on('data', function(chunk) {
curlen = chunk.length;
if (waiting.length) {
/*if (!chunk) {
chunk = new Buffer(0);
}*/
chunk = Buffer.concat([waiting, chunk]);
curlen = chunk.length;
waiting = new Buffer(0);
}
while (curlen) {
if (state === 'entryFound') {
state = 'findingHeader';
}
if (curlen < sizes.header && state === 'findingHeader') {
waiting = chunk;
chunk = new Buffer(0);
curlen = 0;
return;
}
if (state === 'headerFound' && lastKVLen > -1 && curlen < lastKVLen) {
waiting = chunk;
chunk = new Buffer(0);
curlen = 0;
return;
}
if (curlen >= sizes.header && state === 'findingHeader') {
var headerBuf = chunk.slice(0, sizes.header);
var keylen = headerBuf.readUInt16BE(headerOffsets.keysize);
var vallen = headerBuf.readUInt32BE(headerOffsets.valsize);
lastKVLen = keylen + vallen;
lastHeaderBuf = headerBuf;
chunk = chunk.slice(headerBuf.length);
waiting = new Buffer(0);
curlen = chunk.length;
posCount += headerBuf.length + keylen;
state = 'headerFound';
} else if (curlen >= lastKVLen && state === 'headerFound') {
var kvBuf = chunk.slice(0, lastKVLen);
var bufs = Buffer.concat([lastHeaderBuf, kvBuf]);
var entry = DataEntry.fromBuffer(bufs);
entry.valuePosition = posCount;
self.emit('entry', entry);
posCount += entry.valueSize;
chunk = chunk.slice(lastKVLen);
curlen = chunk.length;
lastKVLen = -1;
lastHeaderBuf = null;
state = 'entryFound';
} else {
if (state === 'entryFound') {
state = 'findingHeader';
} else {
waiting = chunk;
chunk = new Buffer(0);
curlen = 0;
}
}
}
});
};