feat: add changes delay (#1739)
* 新增 changesDelay 配置,调用 /-/all/changes 接口时默认返回 delay 之前的 changes * 防止出现 since 和当前时间接近时,changes 异步插入,导致 changes 计算失败的问题
This commit is contained in:
2
.github/workflows/nodejs.yml
vendored
2
.github/workflows/nodejs.yml
vendored
@@ -28,7 +28,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
node-version: [10, 12, 14, 16]
|
node-version: [14, 16]
|
||||||
os: [ubuntu-latest]
|
os: [ubuntu-latest]
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
@@ -201,6 +201,11 @@ var config = {
|
|||||||
officialNpmReplicate: 'https://replicate.npmjs.com',
|
officialNpmReplicate: 'https://replicate.npmjs.com',
|
||||||
cnpmRegistry: 'https://r.cnpmjs.com',
|
cnpmRegistry: 'https://r.cnpmjs.com',
|
||||||
|
|
||||||
|
// /-/all/changes
|
||||||
|
// since different changes are aggregated through many tables
|
||||||
|
// prevent changesStream changes collisions
|
||||||
|
changesDelay: 5000,
|
||||||
|
|
||||||
// sync source, upstream registry
|
// sync source, upstream registry
|
||||||
// If you want to directly sync from official npm's registry
|
// If you want to directly sync from official npm's registry
|
||||||
// please drop them an email first
|
// please drop them an email first
|
||||||
|
|||||||
@@ -203,16 +203,23 @@ exports.listPublicModuleNamesByUser = function* (username) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
exports.listModelSince = function(Model, attributes, mapper) {
|
exports.listModelSince = function(Model, attributes, mapper) {
|
||||||
|
|
||||||
return function*(since, limit) {
|
return function*(since, limit) {
|
||||||
|
|
||||||
var start = ensureSinceIsDate(since);
|
var start = ensureSinceIsDate(since);
|
||||||
var findCondition = {
|
var findCondition = {
|
||||||
attributes: attributes,
|
attributes: attributes,
|
||||||
where: {
|
where: {
|
||||||
gmt_modified: {
|
gmt_modified: {
|
||||||
gte: start
|
gte: start,
|
||||||
|
// 添加延时,防止同一时间多个数据未同步
|
||||||
|
lte: new Date(Date.now() - config.changesDelay || 5000),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
order: [['gmt_modified', 'ASC'], ['id', 'ASC']],
|
order: [
|
||||||
|
["gmt_modified", "ASC"],
|
||||||
|
["id", "ASC"],
|
||||||
|
],
|
||||||
};
|
};
|
||||||
if (limit) {
|
if (limit) {
|
||||||
findCondition.limit = limit;
|
findCondition.limit = limit;
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
var should = require('should');
|
var should = require('should');
|
||||||
var request = require('supertest');
|
var request = require('supertest');
|
||||||
var mm = require('mm');
|
var mm = require('mm');
|
||||||
|
var config = require('../../../../config');
|
||||||
var app = require('../../../../servers/registry');
|
var app = require('../../../../servers/registry');
|
||||||
var utils = require('../../../utils');
|
var utils = require('../../../utils');
|
||||||
var CHANGE_TYPE = require('../../../../services/common').CHANGE_TYPE;
|
var CHANGE_TYPE = require('../../../../services/common').CHANGE_TYPE;
|
||||||
@@ -12,6 +13,7 @@ describe('test/controllers/registry/package/changes.test.js', function () {
|
|||||||
|
|
||||||
var since;
|
var since;
|
||||||
before(function (done) {
|
before(function (done) {
|
||||||
|
mm(config, 'changesDelay', 0);
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
since = Date.now();
|
since = Date.now();
|
||||||
var pkg = utils.getPackage('@cnpmtest/test_changes', '0.0.1', utils.admin, 'alpha');
|
var pkg = utils.getPackage('@cnpmtest/test_changes', '0.0.1', utils.admin, 'alpha');
|
||||||
@@ -39,6 +41,7 @@ describe('test/controllers/registry/package/changes.test.js', function () {
|
|||||||
.expect(200, function (err, res) {
|
.expect(200, function (err, res) {
|
||||||
should.not.exist(err);
|
should.not.exist(err);
|
||||||
res.body.results.should.be.an.Array();
|
res.body.results.should.be.an.Array();
|
||||||
|
|
||||||
res.body.results
|
res.body.results
|
||||||
.filter(function (item) {
|
.filter(function (item) {
|
||||||
return item.type === CHANGE_TYPE.PACKAGE_VERSION_ADDED;
|
return item.type === CHANGE_TYPE.PACKAGE_VERSION_ADDED;
|
||||||
@@ -53,6 +56,22 @@ describe('test/controllers/registry/package/changes.test.js', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('changes delay should work', function(done) {
|
||||||
|
mm(config, 'changesDelay', 10000);
|
||||||
|
request(app)
|
||||||
|
.get("/-/all/changes?since=" + since)
|
||||||
|
.expect(200, function (err, res) {
|
||||||
|
should.not.exist(err);
|
||||||
|
res.body.results.should.be.an.Array();
|
||||||
|
res.body.results
|
||||||
|
.filter(function (item) {
|
||||||
|
return item.type === CHANGE_TYPE.PACKAGE_VERSION_ADDED;
|
||||||
|
})
|
||||||
|
.length.should.equal(0);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
})
|
||||||
|
|
||||||
it('since should work', function (done) {
|
it('since should work', function (done) {
|
||||||
var now = Date.now();
|
var now = Date.now();
|
||||||
request(app)
|
request(app)
|
||||||
@@ -66,6 +85,7 @@ describe('test/controllers/registry/package/changes.test.js', function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('limit should work', function (done) {
|
it('limit should work', function (done) {
|
||||||
|
mm(config, 'changesDelay', 0);
|
||||||
request(app)
|
request(app)
|
||||||
.get('/-/all/changes?limit=1&since=' + since)
|
.get('/-/all/changes?limit=1&since=' + since)
|
||||||
.expect(200, function (err, res) {
|
.expect(200, function (err, res) {
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ var sleep = require('co-sleep');
|
|||||||
var Package = require('../../services/package');
|
var Package = require('../../services/package');
|
||||||
var utils = require('../utils');
|
var utils = require('../utils');
|
||||||
var common = require('../../services/common');
|
var common = require('../../services/common');
|
||||||
|
var config = require('../../config');
|
||||||
|
var mm = require('mm');
|
||||||
|
|
||||||
describe('test/services/package.test.js', function () {
|
describe('test/services/package.test.js', function () {
|
||||||
describe('addModuleTag()', function () {
|
describe('addModuleTag()', function () {
|
||||||
@@ -132,6 +134,7 @@ describe('test/services/package.test.js', function () {
|
|||||||
|
|
||||||
describe('listModelSince()', function () {
|
describe('listModelSince()', function () {
|
||||||
it('list tags since', function* () {
|
it('list tags since', function* () {
|
||||||
|
mm(config, 'changesDelay', 0);
|
||||||
yield utils.createModule('test-listModuleSince-module-0', '1.0.0');
|
yield utils.createModule('test-listModuleSince-module-0', '1.0.0');
|
||||||
yield sleep(2100);
|
yield sleep(2100);
|
||||||
var start = Date.now() - 1000;
|
var start = Date.now() - 1000;
|
||||||
@@ -162,6 +165,7 @@ describe('test/services/package.test.js', function () {
|
|||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
it('list package version since', function* () {
|
it('list package version since', function* () {
|
||||||
|
mm(config, 'changesDelay', 0);
|
||||||
yield utils.createModule('test-listModuleSince-module-0', '1.0.0');
|
yield utils.createModule('test-listModuleSince-module-0', '1.0.0');
|
||||||
yield sleep(2100);
|
yield sleep(2100);
|
||||||
var start = Date.now() - 1000;
|
var start = Date.now() - 1000;
|
||||||
|
|||||||
Reference in New Issue
Block a user