refactor: Sync exists package from cnpmcore changes stream (#1707)

closes https://github.com/cnpm/cnpmjs.org/issues/1704
This commit is contained in:
fengmk2
2022-02-12 01:06:45 +08:00
committed by GitHub
parent f2c4b9f1c8
commit 9f8dca4ac0
3 changed files with 94 additions and 57 deletions

View File

@@ -153,7 +153,7 @@ exports.getAllToday = function* (timeout) {
};
exports.getShort = function* (timeout) {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://r.cnpmjs.org';
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
var r = yield request('/-/short', {
timeout: timeout || 300000,
// registry.npmjs.org/-/short is 404 now therefore have a fallback
@@ -198,3 +198,43 @@ exports.getPopular = function* (top, timeout) {
return r[0];
});
};
exports.getChangesUpdateSeq = function* () {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
const r = yield request('/', {
timeout: 30000,
registry: registry,
});
const data = r.data || {};
if (r.status !== 200) {
if (data.code && data.message) {
const url = registry + '/';
const err = new Error(data.message + ', url: ' + url);
err.name = data.code;
err.url = url;
throw err;
}
}
return data.update_seq || 0;
};
exports.listChanges = function* (updateSeq) {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
const changesUrl = `/_changes?since=${updateSeq}`;
const r = yield request(changesUrl, {
timeout: 30000,
registry: registry,
});
const data = r.data || {};
if (r.status !== 200) {
if (data.code && data.message) {
const url = registry + changesUrl;
const err = new Error(data.message + ', url: ' + url);
err.name = data.code;
err.url = url;
throw err;
}
}
// {"results":[{"seq":1988653,"type":"PACKAGE_VERSION_ADDED","id":"dsr-package-mercy-magot-thorp-sward","changes":[{"version":"1.0.1"}]},
return data.results || [];
};

View File

@@ -34,35 +34,38 @@ module.exports = function* sync() {
throw new Error('can not found total info');
}
var allPackages;
if (!info.last_exist_sync_time) {
var pkgs = yield npmService.getShort();
debug('First time sync all packages from official registry, got %d packages', pkgs.length);
if (info.last_sync_module) {
// start from last success
var lastIndex = pkgs.indexOf(info.last_sync_module);
if (lastIndex > 0) {
pkgs = pkgs.slice(lastIndex);
debug('recover from %d: %s', lastIndex, info.last_sync_module);
}
}
allPackages = pkgs;
} else {
debug('sync new module from last exist sync time: %s', info.last_exist_sync_time);
var result = yield npmService.fetchUpdatesSince(info.last_exist_sync_time);
allPackages = result.names;
syncTime = result.lastModified;
var lastSeq = info.last_exist_sync_time;
if (lastSeq && lastSeq > 132897820073) {
// ignore exists timestamp
lastSeq = null;
}
var packages = intersection(existPackages, allPackages);
if (!packages.length) {
if (!lastSeq) {
lastSeq = yield npmService.getChangesUpdateSeq();
}
if (!lastSeq) {
debug('no packages need be sync');
return {
successes: [],
fails: []
};
}
var updatesPackages = [];
var changes = yield npmService.listChanges(lastSeq);
changes.forEach(change => {
updatesPackages.push(change.id);
lastSeq = change.seq;
});
var packages = intersection(existPackages, updatesPackages);
debug('Total %d packages to sync, top 10: %j', packages.length, packages.slice(0, 10));
if (!packages.length) {
yield totalService.setLastExistSyncTime(lastSeq);
debug('no packages need be sync, lastSeq: %s, changes: %s', lastSeq, changes.length);
return {
successes: [],
fails: []
};
}
var worker = new SyncModuleWorker({
username: 'admin',
@@ -75,10 +78,10 @@ module.exports = function* sync() {
var end = thunkify.event(worker);
yield end();
debug('All packages sync done, successes %d, fails %d',
worker.successes.length, worker.fails.length);
debug('All packages sync done, successes %d, fails %d, lastSeq: %s, changes: %s',
worker.successes.length, worker.fails.length, lastSeq, changes.length);
yield totalService.setLastExistSyncTime(syncTime);
yield totalService.setLastExistSyncTime(lastSeq);
return {
successes: worker.successes,
fails: worker.fails

View File

@@ -1,19 +1,5 @@
/*!
* cnpmjs.org - test/sync/sync_exist.test.js
*
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <dead_horse@qq.com> (http://deadhorse.me)
*/
'use strict';
/**
* Module dependencies.
*/
var mm = require('mm');
var config = require('../../config');
var sync = require('../../sync/sync_exist');
@@ -34,37 +20,45 @@ describe('test/sync/sync_exist.test.js', function () {
});
it('should sync first time ok', function *() {
mm.data(npmService, 'getShort', ['byte']);
mm.data(npmService, 'listChanges', [
{
seq: 1,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: 0});
var data = yield sync();
data.successes[0].should.equal('byte');
});
it('should sync common ok', function *() {
mm.data(npmService, 'getAllSince', {
_updated: Date.now(),
byte: {},
});
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: Date.now()});
var data = yield sync();
data.successes[0].should.equal('byte');
mm.data(npmService, 'getAllSince', []);
var data = yield sync();
data.successes.should.eql([]);
});
it('should sync with array format data', function *() {
mm.data(npmService, 'getAllSince', [
mm.data(npmService, 'listChanges', [
{
name: 'byte',
seq: 2,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: Date.now()});
var data = yield sync();
data.successes[0].should.equal('byte');
mm.data(npmService, 'getAllSince', []);
mm.data(npmService, 'listChanges', []);
var data = yield sync();
data.successes.should.eql([]);
});
it('should sync with array format data', function *() {
mm.data(npmService, 'listChanges', [
{
seq: 3,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: Date.now()});
var data = yield sync();
data.successes[0].should.equal('byte');
mm.data(npmService, 'listChanges', []);
var data = yield sync();
data.successes.should.eql([]);
});