-
Notifications
You must be signed in to change notification settings - Fork 0
/
replicate.js
135 lines (115 loc) · 3.93 KB
/
replicate.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
'use strict';
'use strong';
const url = require('url');
const path = require('path');
const util = require('util');
const async = require('async');
const stream = require('stream');
const endpoint = require('endpoint');
const DAWARequest = require('denmark-dawa');
const dawaSchema = require('denmark-dawa-schema');
const TransformEvents = require('./lib/transform-events.js');
const TransformSnapshot = require('./lib/transform-snapshot.js');
function DAWAReplicate(fromSequence) {
stream.PassThrough.call(this, { objectMode: true });
this.currVersion = fromSequence || 0;
this.nextVersion = this.currVersion;
this._updateing = false;
}
util.inherits(DAWAReplicate, stream.PassThrough);
module.exports = DAWAReplicate;
DAWAReplicate.prototype.getLatestVersion = function (callback) {
new DAWARequest('/replikering/senestesekvensnummer')
.pipe(endpoint({ objectMode: true}, function (err, items) {
if (err) return callback(err, null);
callback(null, items[0].sekvensnummer);
}));
};
DAWAReplicate.prototype.update = function () {
const self = this;
const callback = arguments[arguments.length - 1];
// Prevent simultaniuse .update calls
if (this._updateing) {
return callback(new Error(`can't update while another update is running`));
}
this._updateing = true;
// Allow nextVersion to be specified manually or fetch automatically
if (arguments.length === 2) {
process.nextTick(() => cb(null, arguments[0]));
} else {
this.getLatestVersion(cb);
}
// The next version is now known, so fetch the new events
function cb(err, version) {
if (err) {
self._updateing = false;
return callback(err);
}
self._update(version, function (err) {
self._updateing = false;
if (err) return callback(err);
callback(null, version);
});
}
};
DAWAReplicate.prototype._update = function (nextVersion, callback) {
const self = this;
this.nextVersion = nextVersion;
// Nothing new has happend, just short circuit it
if (this.currVersion >= this.nextVersion) {
return callback(null, nextVersion);
}
this.emit('new-version', nextVersion);
// fetch the latest replication schema
dawaSchema(function (err, schema) {
if (err) return callback(err);
self.emit('update-start');
// New data exists, update all tables
async.eachSeries(
Object.keys(schema),
function (tableName, done) {
self.emit('update-table-start', tableName);
function doneAndEmit(err) {
self.emit('update-table-end', tableName);
done(err);
}
// Get the source
const table = schema[tableName];
const source = url.parse(table.source).pathname;
// Pipe events to the main stream
self._replicate(tableName, source)
.once('error', doneAndEmit)
.once('end', doneAndEmit)
.pipe(self, { end: false });
},
function (err) {
if (err) return callback(err);
// When done update the current version attribute
self.currVersion = self.nextVersion;
self.emit('update-end');
callback(null, nextVersion);
}
);
});
};
DAWAReplicate.prototype._replicate = function (tableName, source) {
if (this.currVersion >= this.nextVersion) {
throw new RangeError(`next version ${this.nextVersion} must be greater` +
` than the current version ${this.currVersion}`);
}
// If the current version is zero, no data exists in the
// database. So skip all the update events and just use the
// snapshot.
if (this.currVersion === 0) {
return new DAWARequest(source, {
sekvensnummer: this.nextVersion
}).pipe(new TransformSnapshot(tableName));
}
// Data exists in the database, fetch the update events
else {
return new DAWARequest(path.join(source, 'haendelser'), {
sekvensnummerfra: this.currVersion + 1,
sekvensnummertil: this.nextVersion
}).pipe(new TransformEvents(tableName));
}
};