772 lines, 451 LOC, 386 covered (85%)
9 | 1 | /* ***** BEGIN LICENSE BLOCK ***** |
2 | * Version: MPL 1.1/GPL 2.0/LGPL 2.1 |
|
3 | * |
|
4 | * The contents of this file are subject to the Mozilla Public License Version |
|
5 | * 1.1 (the "License"); you may not use this file except in compliance with |
|
6 | * the License. You may obtain a copy of the License at |
|
7 | * http://www.mozilla.org/MPL/ |
|
8 | * |
|
9 | * Software distributed under the License is distributed on an "AS IS" basis, |
|
10 | * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License |
|
11 | * for the specific language governing rights and limitations under the |
|
12 | * License. |
|
13 | * |
|
14 | * The Original Code is Bookmarks Sync. |
|
15 | * |
|
16 | * The Initial Developer of the Original Code is Mozilla. |
|
17 | * Portions created by the Initial Developer are Copyright (C) 2007 |
|
18 | * the Initial Developer. All Rights Reserved. |
|
19 | * |
|
20 | * Contributor(s): |
|
21 | * Dan Mills <thunder@mozilla.com> |
|
22 | * Myk Melez <myk@mozilla.org> |
|
23 | * |
|
24 | * Alternatively, the contents of this file may be used under the terms of |
|
25 | * either the GNU General Public License Version 2 or later (the "GPL"), or |
|
26 | * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), |
|
27 | * in which case the provisions of the GPL or the LGPL are applicable instead |
|
28 | * of those above. If you wish to allow use of your version of this file only |
|
29 | * under the terms of either the GPL or the LGPL, and not to allow others to |
|
30 | * use your version of this file under the terms of the MPL, indicate your |
|
31 | * decision by deleting the provisions above and replace them with the notice |
|
32 | * and other provisions required by the GPL or the LGPL. If you do not delete |
|
33 | * the provisions above, a recipient may use your version of this file under |
|
34 | * the terms of any one of the MPL, the GPL or the LGPL. |
|
35 | * |
|
36 | * ***** END LICENSE BLOCK ***** */ |
|
37 | ||
117 | 38 | const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine']; |
39 | ||
36 | 40 | const Cc = Components.classes; |
36 | 41 | const Ci = Components.interfaces; |
36 | 42 | const Cr = Components.results; |
36 | 43 | const Cu = Components.utils; |
44 | ||
45 | 45 | Cu.import("resource://weave/ext/Observers.js"); |
45 | 46 | Cu.import("resource://weave/ext/Sync.js"); |
45 | 47 | Cu.import("resource://weave/log4moz.js"); |
45 | 48 | Cu.import("resource://weave/constants.js"); |
45 | 49 | Cu.import("resource://weave/util.js"); |
45 | 50 | Cu.import("resource://weave/resource.js"); |
45 | 51 | Cu.import("resource://weave/identity.js"); |
45 | 52 | Cu.import("resource://weave/stores.js"); |
45 | 53 | Cu.import("resource://weave/trackers.js"); |
54 | ||
45 | 55 | Cu.import("resource://weave/base_records/wbo.js"); |
45 | 56 | Cu.import("resource://weave/base_records/keys.js"); |
45 | 57 | Cu.import("resource://weave/base_records/crypto.js"); |
45 | 58 | Cu.import("resource://weave/base_records/collection.js"); |
59 | ||
60 | // Singleton service, holds registered engines |
|
61 | ||
63 | 62 | Utils.lazy(this, 'Engines', EngineManagerSvc); |
63 | ||
27 | 64 | function EngineManagerSvc() { |
36 | 65 | this._engines = {}; |
63 | 66 | this._log = Log4Moz.repository.getLogger("Service.Engines"); |
54 | 67 | this._log.level = Log4Moz.Level[Svc.Prefs.get( |
54 | 68 | "log.logger.service.engines", "Debug")]; |
69 | } |
|
18 | 70 | EngineManagerSvc.prototype = { |
29 | 71 | get: function EngMgr_get(name) { |
72 | // Return an array of engines if we have an array of names |
|
56 | 73 | if (Utils.isArray(name)) { |
2 | 74 | let engines = []; |
5 | 75 | name.forEach(function(name) { |
10 | 76 | let engine = this.get(name); |
4 | 77 | if (engine) |
12 | 78 | engines.push(engine); |
3 | 79 | }, this); |
4 | 80 | return engines; |
81 | } |
|
82 | ||
40 | 83 | let engine = this._engines[name]; |
30 | 84 | if (!engine) |
24 | 85 | this._log.debug("Could not get engine: " + name); |
20 | 86 | return engine; |
87 | }, |
|
26 | 88 | getAll: function EngMgr_getAll() { |
427 | 89 | return [engine for ([name, engine] in Iterator(Engines._engines))]; |
90 | }, |
|
22 | 91 | getEnabled: function EngMgr_getEnabled() { |
73 | 92 | return this.getAll().filter(function(engine) engine.enabled); |
93 | }, |
|
94 | ||
95 | /** |
|
96 | * Register an Engine to the service. Alternatively, give an array of engine |
|
97 | * objects to register. |
|
98 | * |
|
99 | * @param engineObject |
|
100 | * Engine object used to get an instance of the engine |
|
101 | * @return The engine object if anything failed |
|
102 | */ |
|
38 | 103 | register: function EngMgr_register(engineObject) { |
100 | 104 | if (Utils.isArray(engineObject)) |
18 | 105 | return engineObject.map(this.register, this); |
106 | ||
34 | 107 | try { |
51 | 108 | let engine = new engineObject(); |
34 | 109 | let name = engine.name; |
51 | 110 | if (name in this._engines) |
10 | 111 | this._log.error("Engine '" + name + "' is already registered!"); |
112 | else |
|
98 | 113 | this._engines[name] = engine; |
114 | } |
|
115 | catch(ex) { |
|
116 | let mesg = ex.message ? ex.message : ex; |
|
117 | let name = engineObject || ""; |
|
118 | name = name.prototype || ""; |
|
119 | name = name.name || ""; |
|
120 | ||
121 | let out = "Could not initialize engine '" + name + "': " + mesg; |
|
122 | dump(out); |
|
123 | this._log.error(out); |
|
124 | ||
125 | return engineObject; |
|
17 | 126 | } |
127 | }, |
|
47 | 128 | unregister: function EngMgr_unregister(val) { |
4 | 129 | let name = val; |
8 | 130 | if (val instanceof Engine) |
2 | 131 | name = val.name; |
10 | 132 | delete this._engines[name]; |
133 | } |
|
134 | }; |
|
135 | ||
60 | 136 | function Engine(name) { |
168 | 137 | this.Name = name || "Unnamed"; |
210 | 138 | this.name = name.toLowerCase(); |
139 | ||
252 | 140 | this._notify = Utils.notify("weave:engine:"); |
378 | 141 | this._log = Log4Moz.repository.getLogger("Engine." + this.Name); |
378 | 142 | let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug"); |
252 | 143 | this._log.level = Log4Moz.Level[level]; |
144 | ||
84 | 145 | this._tracker; // initialize tracker to load previously changed IDs |
294 | 146 | this._log.debug("Engine initialized"); |
147 | } |
|
18 | 148 | Engine.prototype = { |
149 | // _storeObj, and _trackerObj should to be overridden in subclasses |
|
18 | 150 | _storeObj: Store, |
18 | 151 | _trackerObj: Tracker, |
152 | ||
63 | 153 | get prefName() this.name, |
138 | 154 | get enabled() Svc.Prefs.get("engine." + this.prefName, null), |
42 | 155 | set enabled(val) Svc.Prefs.set("engine." + this.prefName, !!val), |
156 | ||
98 | 157 | get score() this._tracker.score, |
158 | ||
34 | 159 | get _store() { |
80 | 160 | let store = new this._storeObj(this.Name); |
25518 | 161 | this.__defineGetter__("_store", function() store); |
32 | 162 | return store; |
163 | }, |
|
164 | ||
60 | 165 | get _tracker() { |
210 | 166 | let tracker = new this._trackerObj(this.Name); |
18714 | 167 | this.__defineGetter__("_tracker", function() tracker); |
84 | 168 | return tracker; |
169 | }, |
|
170 | ||
20 | 171 | get displayName() { |
2 | 172 | try { |
10 | 173 | return Str.engines.get(this.name); |
10 | 174 | } catch (e) {} |
175 | ||
4 | 176 | return this.Name; |
177 | }, |
|
178 | ||
20 | 179 | sync: function Engine_sync() { |
6 | 180 | if (!this.enabled) |
2 | 181 | return; |
182 | ||
3 | 183 | if (!this._sync) |
184 | throw "engine does not implement _sync method"; |
|
185 | ||
3 | 186 | let times = {}; |
3 | 187 | let wrapped = {}; |
188 | // Find functions in any point of the prototype chain |
|
64 | 189 | for (let _name in this) { |
40 | 190 | let name = _name; |
191 | ||
192 | // Ignore certain constructors/functions |
|
120 | 193 | if (name.search(/^_(.+Obj|notify)$/) == 0) |
6 | 194 | continue; |
195 | ||
196 | // Only track functions but skip the constructors |
|
102 | 197 | if (typeof this[name] == "function") { |
24 | 198 | times[name] = []; |
36 | 199 | wrapped[name] = this[name]; |
200 | ||
201 | // Wrap the original function with a start/stop timer |
|
86 | 202 | this[name] = function() { |
4 | 203 | let start = Date.now(); |
1 | 204 | try { |
10 | 205 | return wrapped[name].apply(this, arguments); |
206 | } |
|
1 | 207 | finally { |
12 | 208 | times[name].push(Date.now() - start); |
209 | } |
|
210 | }; |
|
211 | } |
|
212 | } |
|
213 | ||
1 | 214 | try { |
11 | 215 | this._notify("sync", this.name, this._sync)(); |
216 | } |
|
2 | 217 | finally { |
218 | // Restore original unwrapped functionality |
|
72 | 219 | for (let [name, func] in Iterator(wrapped)) |
40 | 220 | this[name] = func; |
221 | ||
3 | 222 | let stats = {}; |
78 | 223 | for (let [name, time] in Iterator(times)) { |
224 | // Figure out stats on the times unless there's nothing |
|
18 | 225 | let num = time.length; |
18 | 226 | if (num == 0) |
10 | 227 | continue; |
228 | ||
229 | // Track the min/max/sum of the values |
|
1 | 230 | let stat = { |
2 | 231 | num: num, |
4 | 232 | sum: 0 |
233 | }; |
|
6 | 234 | time.forEach(function(val) { |
6 | 235 | if (stat.min == null || val < stat.min) |
3 | 236 | stat.min = val; |
6 | 237 | if (stat.max == null || val > stat.max) |
3 | 238 | stat.max = val; |
7 | 239 | stat.sum += val; |
240 | }); |
|
241 | ||
10 | 242 | stat.avg = Number((stat.sum / num).toFixed(2)); |
21 | 243 | stats[name] = stat; |
244 | } |
|
245 | ||
3 | 246 | stats.toString = function() { |
247 | let sums = []; |
|
248 | for (let [name, stat] in Iterator(this)) |
|
249 | if (stat.sum != null) |
|
250 | sums.push(name.replace(/^_/, "") + " " + stat.sum); |
|
251 | ||
252 | // Order certain functions first before any other random ones |
|
253 | let nameOrder = ["sync", "processIncoming", "uploadOutgoing", |
|
254 | "syncStartup", "syncFinish"]; |
|
255 | let getPos = function(str) { |
|
256 | let pos = nameOrder.indexOf(str.split(" ")[0]); |
|
257 | return pos != -1 ? pos : Infinity; |
|
258 | }; |
|
259 | let order = function(a, b) getPos(a) > getPos(b); |
|
260 | ||
261 | return "Total (ms): " + sums.sort(order).join(", "); |
|
262 | }; |
|
263 | ||
8 | 264 | this._log.debug(stats); |
1 | 265 | } |
266 | }, |
|
267 | ||
268 | /** |
|
269 | * Get rid of any local meta-data |
|
270 | */ |
|
21 | 271 | resetClient: function Engine_resetClient() { |
9 | 272 | if (!this._resetClient) |
273 | throw "engine does not implement _resetClient method"; |
|
274 | ||
30 | 275 | this._notify("reset-client", this.name, this._resetClient)(); |
276 | }, |
|
277 | ||
19 | 278 | _wipeClient: function Engine__wipeClient() { |
4 | 279 | this.resetClient(); |
6 | 280 | this._log.debug("Deleting all local data"); |
3 | 281 | this._tracker.ignoreAll = true; |
5 | 282 | this._store.wipe(); |
3 | 283 | this._tracker.ignoreAll = false; |
6 | 284 | this._tracker.clearChangedIDs(); |
285 | }, |
|
286 | ||
46 | 287 | wipeClient: function Engine_wipeClient() { |
10 | 288 | this._notify("wipe-client", this.name, this._wipeClient)(); |
289 | } |
|
290 | }; |
|
291 | ||
54 | 292 | function SyncEngine(name) { |
252 | 293 | Engine.call(this, name || "SyncEngine"); |
180 | 294 | this.loadToFetch(); |
295 | } |
|
18 | 296 | SyncEngine.prototype = { |
27 | 297 | __proto__: Engine.prototype, |
18 | 298 | _recordObj: CryptoWrapper, |
18 | 299 | version: 1, |
300 | ||
29130 | 301 | get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") + |
24260 | 302 | "/" + ID.get("WeaveID").username + "/storage/", |
303 | ||
308 | 304 | get engineURL() this.storageURL + this.name, |
305 | ||
16545 | 306 | get cryptoMetaURL() this.storageURL + "crypto/" + this.name, |
307 | ||
48 | 308 | get metaURL() this.storageURL + "meta/global", |
309 | ||
28 | 310 | get syncID() { |
311 | // Generate a random syncID if we don't have one |
|
90 | 312 | let syncID = Svc.Prefs.get(this.name + ".syncID", ""); |
75 | 313 | return syncID == "" ? this.syncID = Utils.makeGUID() : syncID; |
314 | }, |
|
26 | 315 | set syncID(value) { |
80 | 316 | Svc.Prefs.set(this.name + ".syncID", value); |
317 | }, |
|
318 | ||
138 | 319 | get lastSync() { |
1320 | 320 | return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0")); |
321 | }, |
|
53 | 322 | set lastSync(value) { |
323 | // Reset the pref in-case it's a number instead of a string |
|
280 | 324 | Svc.Prefs.reset(this.name + ".lastSync"); |
325 | // Store the value as a string to keep floating point precision |
|
420 | 326 | Svc.Prefs.set(this.name + ".lastSync", value.toString()); |
327 | }, |
|
28 | 328 | resetLastSync: function SyncEngine_resetLastSync() { |
90 | 329 | this._log.debug("Resetting " + this.name + " last sync time"); |
80 | 330 | Svc.Prefs.reset(this.name + ".lastSync"); |
100 | 331 | Svc.Prefs.set(this.name + ".lastSync", "0"); |
332 | }, |
|
333 | ||
579 | 334 | get toFetch() this._toFetch, |
54 | 335 | set toFetch(val) { |
108 | 336 | this._toFetch = val; |
360 | 337 | Utils.jsonSave("toFetch/" + this.name, this, val); |
338 | }, |
|
339 | ||
55 | 340 | loadToFetch: function loadToFetch() { |
341 | // Initialize to empty if there's no file |
|
111 | 342 | this._toFetch = []; |
485 | 343 | Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o) |
53 | 344 | this._toFetch = o)); |
345 | }, |
|
346 | ||
347 | // Create a new record using the store and add in crypto fields |
|
2373 | 348 | _createRecord: function SyncEngine__createRecord(id) { |
14130 | 349 | let record = this._store.createRecord(id); |
7065 | 350 | record.id = id; |
7065 | 351 | record.encryption = this.cryptoMetaURL; |
4710 | 352 | return record; |
353 | }, |
|
354 | ||
355 | // Any setup that needs to happen at the beginning of each sync. |
|
356 | // Makes sure crypto records and keys are all set-up |
|
22 | 357 | _syncStartup: function SyncEngine__syncStartup() { |
24 | 358 | this._log.trace("Ensuring server crypto records are there"); |
359 | ||
360 | // Determine if we need to wipe on outdated versions |
|
8 | 361 | let wipeServerData = false; |
20 | 362 | let metaGlobal = Records.get(this.metaURL); |
24 | 363 | let engines = metaGlobal.payload.engines || {}; |
24 | 364 | let engineData = engines[this.name] || {}; |
365 | ||
366 | // Assume missing versions are 0 and wipe the server |
|
18 | 367 | if ((engineData.version || 0) < this.version) { |
20 | 368 | this._log.debug("Old engine data: " + [engineData.version, this.version]); |
369 | ||
370 | // Prepare to clear the server and upload everything |
|
4 | 371 | wipeServerData = true; |
6 | 372 | this.syncID = ""; |
373 | ||
374 | // Set the newer version and newly generated syncID |
|
6 | 375 | engineData.version = this.version; |
6 | 376 | engineData.syncID = this.syncID; |
377 | ||
378 | // Put the new data back into meta/global and mark for upload |
|
8 | 379 | engines[this.name] = engineData; |
6 | 380 | metaGlobal.payload.engines = engines; |
8 | 381 | metaGlobal.changed = true; |
382 | } |
|
383 | // Don't sync this engine if the server has newer data |
|
7 | 384 | else if (engineData.version > this.version) { |
8 | 385 | let error = new String("New data: " + [engineData.version, this.version]); |
3 | 386 | error.failureCode = VERSION_OUT_OF_DATE; |
2 | 387 | throw error; |
388 | } |
|
389 | // Changes to syncID mean we'll need to upload everything |
|
3 | 390 | else if (engineData.syncID != this.syncID) { |
10 | 391 | this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]); |
3 | 392 | this.syncID = engineData.syncID; |
4 | 393 | this._resetClient(); |
394 | }; |
|
395 | ||
396 | // Try getting/unwrapping the crypto record |
|
15 | 397 | let meta = CryptoMetas.get(this.cryptoMetaURL); |
6 | 398 | if (meta) { |
2 | 399 | try { |
4 | 400 | let pubkey = PubKeys.getDefaultKey(); |
5 | 401 | let privkey = PrivKeys.get(pubkey.privateKeyUri); |
8 | 402 | meta.getKey(privkey, ID.get("WeaveCryptoID")); |
403 | } |
|
3 | 404 | catch(ex) { |
405 | // Remove traces of this bad cryptometa and tainted data |
|
8 | 406 | this._log.debug("Purging bad data after failed unwrap crypto: " + ex); |
5 | 407 | CryptoMetas.del(this.cryptoMetaURL); |
2 | 408 | meta = null; |
4 | 409 | wipeServerData = true; |
410 | } |
|
411 | } |
|
412 | ||
413 | // Delete all server data and reupload on bad version or meta |
|
6 | 414 | if (wipeServerData) { |
12 | 415 | new Resource(this.engineURL).delete(); |
8 | 416 | this._resetClient(); |
417 | } |
|
418 | ||
419 | // Generate a new crypto record |
|
12 | 420 | if (!meta) { |
15 | 421 | let symkey = Svc.Crypto.generateRandomKey(); |
12 | 422 | let pubkey = PubKeys.getDefaultKey(); |
12 | 423 | meta = new CryptoMeta(this.cryptoMetaURL); |
18 | 424 | meta.addUnwrappedKey(pubkey, symkey); |
12 | 425 | let res = new Resource(meta.uri); |
15 | 426 | let resp = res.put(meta); |
9 | 427 | if (!resp.success) { |
428 | this._log.debug("Metarecord upload fail:" + resp); |
|
429 | resp.failureCode = ENGINE_METARECORD_UPLOAD_FAIL; |
|
430 | throw resp; |
|
431 | } |
|
432 | ||
433 | // Cache the cryto meta that we just put on the server |
|
21 | 434 | CryptoMetas.set(meta.uri, meta); |
435 | } |
|
436 | ||
437 | // Mark all items to be uploaded, but treat them as changed from long ago |
|
9 | 438 | if (!this.lastSync) { |
18 | 439 | this._log.debug("First sync, uploading all items"); |
23 | 440 | for (let id in this._store.getAllIDs()) |
21 | 441 | this._tracker.addChangedID(id, 0); |
442 | } |
|
443 | ||
48 | 444 | let outnum = [i for (i in this._tracker.changedIDs)].length; |
24 | 445 | this._log.info(outnum + " outgoing items pre-reconciliation"); |
446 | ||
447 | // Keep track of what to delete at the end of sync |
|
15 | 448 | this._delete = {}; |
449 | }, |
|
450 | ||
451 | // Generate outgoing records |
|
46 | 452 | _processIncoming: function SyncEngine__processIncoming() { |
168 | 453 | this._log.trace("Downloading & applying server changes"); |
454 | ||
455 | // Figure out how many total items to fetch this sync; do less on mobile. |
|
456 | // 50 is hardcoded here because of URL length restrictions. |
|
457 | // (GUIDs are 10 chars) |
|
56 | 458 | let fetchNum = Infinity; |
196 | 459 | if (Svc.Prefs.get("client.type") == "mobile") |
50 | 460 | fetchNum = 50; |
461 | ||
140 | 462 | let newitems = new Collection(this.engineURL, this._recordObj); |
84 | 463 | newitems.newer = this.lastSync; |
84 | 464 | newitems.full = true; |
84 | 465 | newitems.sort = "index"; |
84 | 466 | newitems.limit = fetchNum; |
467 | ||
196 | 468 | let count = {applied: 0, reconciled: 0}; |
56 | 469 | let handled = []; |
1447 | 470 | newitems.recordHandler = Utils.bind2(this, function(item) { |
471 | // Grab a later last modified if possible |
|
8751 | 472 | if (this.lastModified == null || item.modified > this.lastModified) |
27 | 473 | this.lastModified = item.modified; |
474 | ||
475 | // Remember which records were processed |
|
6255 | 476 | handled.push(item.id); |
477 | ||
1251 | 478 | try { |
10008 | 479 | item.decrypt(ID.get("WeaveCryptoID")); |
6255 | 480 | if (this._reconcile(item)) { |
3741 | 481 | count.applied++; |
3741 | 482 | this._tracker.ignoreAll = true; |
8729 | 483 | this._store.applyIncoming(item); |
484 | } else { |
|
12 | 485 | count.reconciled++; |
1283 | 486 | this._log.trace("Skipping reconciled incoming item " + item.id); |
487 | } |
|
488 | } |
|
489 | catch(ex) { |
|
490 | this._log.warn("Error processing record: " + Utils.exceptionStr(ex)); |
|
491 | } |
|
3753 | 492 | this._tracker.ignoreAll = false; |
7506 | 493 | Sync.sleep(0); |
494 | }); |
|
495 | ||
496 | // Only bother getting data from the server if there's new things |
|
194 | 497 | if (this.lastModified == null || this.lastModified > this.lastSync) { |
24 | 498 | let resp = newitems.get(); |
18 | 499 | if (!resp.success) { |
500 | resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
|
501 | throw resp; |
|
502 | } |
|
503 | ||
504 | // Subtract out the number of items we just got |
|
36 | 505 | fetchNum -= handled.length; |
506 | } |
|
507 | ||
508 | // Check if we got the maximum that we requested; get the rest if so |
|
113 | 509 | if (handled.length == newitems.limit) { |
4 | 510 | let guidColl = new Collection(this.engineURL); |
3 | 511 | guidColl.newer = this.lastSync; |
3 | 512 | guidColl.sort = "index"; |
513 | ||
4 | 514 | let guids = guidColl.get(); |
3 | 515 | if (!guids.success) |
516 | throw guids; |
|
517 | ||
518 | // Figure out which guids weren't just fetched then remove any guids that |
|
519 | // were already waiting and prepend the new ones |
|
6 | 520 | let extra = Utils.arraySub(guids.obj, handled); |
4 | 521 | if (extra.length > 0) |
11 | 522 | this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra)); |
523 | } |
|
524 | ||
525 | // Process any backlog of GUIDs if we haven't fetched too many this sync |
|
536 | 526 | while (this.toFetch.length > 0 && fetchNum > 0) { |
527 | // Reuse the original query, but get rid of the restricting params |
|
72 | 528 | newitems.limit = 0; |
72 | 529 | newitems.newer = 0; |
530 | ||
531 | // Get the first bunch of records and save the rest for later |
|
216 | 532 | let minFetch = Math.min(150, this.toFetch.length, fetchNum); |
192 | 533 | newitems.ids = this.toFetch.slice(0, minFetch); |
168 | 534 | this.toFetch = this.toFetch.slice(minFetch); |
96 | 535 | fetchNum -= minFetch; |
536 | ||
537 | // Reuse the existing record handler set earlier |
|
96 | 538 | let resp = newitems.get(); |
72 | 539 | if (!resp.success) { |
540 | resp.failureCode = ENGINE_DOWNLOAD_FAIL; |
|
24 | 541 | throw resp; |
542 | } |
|
543 | } |
|
544 | ||
84 | 545 | if (this.lastSync < this.lastModified) |
15 | 546 | this.lastSync = this.lastModified; |
547 | ||
196 | 548 | this._log.info(["Records:", count.applied, "applied,", count.reconciled, |
336 | 549 | "reconciled,", this.toFetch.length, "left to fetch"].join(" ")); |
550 | }, |
|
551 | ||
552 | /** |
|
553 | * Find a GUID of an item that is a duplicate of the incoming item but happens |
|
554 | * to have a different GUID |
|
555 | * |
|
556 | * @return GUID of the similar item; falsy otherwise |
|
557 | */ |
|
18 | 558 | _findDupe: function _findDupe(item) { |
559 | // By default, assume there's no dupe items for the engine |
|
560 | }, |
|
561 | ||
25 | 562 | _isEqual: function SyncEngine__isEqual(item) { |
35 | 563 | let local = this._createRecord(item.id); |
49 | 564 | if (this._log.level <= Log4Moz.Level.Trace) |
565 | this._log.trace("Local record: " + local); |
|
42 | 566 | if (Utils.deepEquals(item.cleartext, local.cleartext)) { |
24 | 567 | this._log.trace("Local record is the same"); |
8 | 568 | return true; |
569 | } else { |
|
18 | 570 | this._log.trace("Local record is different"); |
6 | 571 | return false; |
572 | } |
|
573 | }, |
|
574 | ||
20 | 575 | _deleteId: function _deleteId(id) { |
12 | 576 | this._tracker.removeChangedID(id); |
577 | ||
578 | // Remember this id to delete at the end of sync |
|
10 | 579 | if (this._delete.ids == null) |
5 | 580 | this._delete.ids = [id]; |
581 | else |
|
9 | 582 | this._delete.ids.push(id); |
583 | }, |
|
584 | ||
20 | 585 | _handleDupe: function _handleDupe(item, dupeId) { |
586 | // Prefer shorter guids; for ties, just do an ASCII compare |
|
14 | 587 | let preferLocal = dupeId.length < item.id.length || |
9 | 588 | (dupeId.length == item.id.length && dupeId < item.id); |
589 | ||
4 | 590 | if (preferLocal) { |
10 | 591 | this._log.trace("Preferring local id: " + [dupeId, item.id]); |
5 | 592 | this._deleteId(item.id); |
3 | 593 | item.id = dupeId; |
8 | 594 | this._tracker.addChangedID(dupeId, 0); |
595 | } |
|
596 | else { |
|
10 | 597 | this._log.trace("Switching local id to incoming: " + [item.id, dupeId]); |
7 | 598 | this._store.changeItemID(dupeId, item.id); |
5 | 599 | this._deleteId(dupeId); |
2 | 600 | } |
601 | }, |
|
602 | ||
603 | // Reconcile incoming and existing records. Return true if server |
|
604 | // data should be applied. |
|
1269 | 605 | _reconcile: function SyncEngine__reconcile(item) { |
8757 | 606 | if (this._log.level <= Log4Moz.Level.Trace) |
607 | this._log.trace("Incoming: " + item); |
|
608 | ||
7506 | 609 | this._log.trace("Reconcile step 1: Check for conflicts"); |
6257 | 610 | if (item.id in this._tracker.changedIDs) { |
611 | // If the incoming and local changes are the same, skip |
|
10 | 612 | if (this._isEqual(item)) { |
6 | 613 | this._tracker.removeChangedID(item.id); |
4 | 614 | return false; |
615 | } |
|
616 | ||
617 | // Records differ so figure out which to take |
|
5 | 618 | let recordAge = Resource.serverTime - item.modified; |
12 | 619 | let localAge = Date.now() / 1000 - this._tracker.changedIDs[item.id]; |
10 | 620 | this._log.trace("Record age vs local age: " + [recordAge, localAge]); |
621 | ||
622 | // Apply the record if the record is newer (server wins) |
|
6 | 623 | return recordAge < localAge; |
624 | } |
|
625 | ||
7494 | 626 | this._log.trace("Reconcile step 2: Check for updates"); |
7494 | 627 | if (this._store.itemExists(item.id)) |
30 | 628 | return !this._isEqual(item); |
629 | ||
7464 | 630 | this._log.trace("Reconcile step 2.5: Don't dupe deletes"); |
2488 | 631 | if (item.deleted) |
632 | return true; |
|
633 | ||
7464 | 634 | this._log.trace("Reconcile step 3: Find dupes"); |
6220 | 635 | let dupeId = this._findDupe(item); |
2488 | 636 | if (dupeId) |
12 | 637 | this._handleDupe(item, dupeId); |
638 | ||
639 | // Apply the incoming item (now that the dupe is the right id) |
|
2488 | 640 | return true; |
641 | }, |
|
642 | ||
643 | // Upload outgoing records |
|
20 | 644 | _uploadOutgoing: function SyncEngine__uploadOutgoing() { |
14104 | 645 | let outnum = [i for (i in this._tracker.changedIDs)].length; |
6 | 646 | if (outnum) { |
18 | 647 | this._log.trace("Preparing " + outnum + " outgoing records"); |
648 | ||
649 | // collection we'll upload |
|
8 | 650 | let up = new Collection(this.engineURL); |
4 | 651 | let count = 0; |
652 | ||
653 | // Upload what we've got so far in the collection |
|
37 | 654 | let doUpload = Utils.bind2(this, function(desc) { |
275 | 655 | this._log.info("Uploading " + desc + " of " + outnum + " records"); |
100 | 656 | let resp = up.post(); |
75 | 657 | if (!resp.success) { |
658 | this._log.debug("Uploading records failed: " + resp); |
|
659 | resp.failureCode = ENGINE_UPLOAD_FAIL; |
|
660 | throw resp; |
|
661 | } |
|
662 | ||
663 | // Record the modified time of the upload |
|
100 | 664 | let modified = resp.headers["x-weave-timestamp"]; |
75 | 665 | if (modified > this.lastSync) |
75 | 666 | this.lastSync = modified; |
667 | ||
125 | 668 | up.clearRecords(); |
669 | }); |
|
670 | ||
4704 | 671 | for (let id in this._tracker.changedIDs) { |
4692 | 672 | try { |
11730 | 673 | let out = this._createRecord(id); |
16422 | 674 | if (this._log.level <= Log4Moz.Level.Trace) |
675 | this._log.trace("Outgoing: " + out); |
|
676 | ||
18768 | 677 | out.encrypt(ID.get("WeaveCryptoID")); |
16422 | 678 | up.pushData(out); |
679 | } |
|
680 | catch(ex) { |
|
681 | this._log.warn("Error creating record: " + Utils.exceptionStr(ex)); |
|
682 | } |
|
683 | ||
684 | // Partial upload |
|
11730 | 685 | if ((++count % MAX_UPLOAD_RECORDS) == 0) |
253 | 686 | doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out"); |
687 | ||
16430 | 688 | Sync.sleep(0); |
689 | } |
|
690 | ||
691 | // Final upload |
|
10 | 692 | if (count % MAX_UPLOAD_RECORDS > 0) |
17 | 693 | doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all"); |
694 | } |
|
12 | 695 | this._tracker.clearChangedIDs(); |
696 | }, |
|
697 | ||
698 | // Any cleanup necessary. |
|
699 | // Save the current snapshot so as to calculate changes at next sync |
|
21 | 700 | _syncFinish: function SyncEngine__syncFinish() { |
18 | 701 | this._log.trace("Finishing up sync"); |
15 | 702 | this._tracker.resetScore(); |
703 | ||
42 | 704 | let doDelete = Utils.bind2(this, function(key, val) { |
120 | 705 | let coll = new Collection(this.engineURL, this._recordObj); |
96 | 706 | coll[key] = val; |
120 | 707 | coll.delete(); |
708 | }); |
|
709 | ||
51 | 710 | for (let [key, val] in Iterator(this._delete)) { |
711 | // Remove the key for future uses |
|
12 | 712 | delete this._delete[key]; |
713 | ||
714 | // Send a simple delete for the property |
|
21 | 715 | if (key != "ids" || val.length <= 100) |
12 | 716 | doDelete(key, val); |
717 | else { |
|
718 | // For many ids, split into chunks of at most 100 |
|
133 | 719 | while (val.length > 0) { |
198 | 720 | doDelete(key, val.slice(0, 100)); |
110 | 721 | val = val.slice(100); |
722 | } |
|
723 | } |
|
3 | 724 | } |
725 | }, |
|
726 | ||
19 | 727 | _sync: function SyncEngine__sync() { |
1 | 728 | try { |
3 | 729 | this._syncStartup(); |
730 | Observers.notify("weave:engine:sync:status", "process-incoming"); |
|
731 | this._processIncoming(); |
|
732 | Observers.notify("weave:engine:sync:status", "upload-outgoing"); |
|
733 | this._uploadOutgoing(); |
|
734 | this._syncFinish(); |
|
735 | } |
|
3 | 736 | catch (e) { |
6 | 737 | this._log.warn("Sync failed"); |
2 | 738 | throw e; |
739 | } |
|
740 | }, |
|
741 | ||
18 | 742 | _testDecrypt: function _testDecrypt() { |
743 | // Report failure even if there's nothing to decrypt |
|
744 | let canDecrypt = false; |
|
745 | ||
746 | // Fetch the most recently uploaded record and try to decrypt it |
|
747 | let test = new Collection(this.engineURL, this._recordObj); |
|
748 | test.limit = 1; |
|
749 | test.sort = "newest"; |
|
750 | test.full = true; |
|
751 | test.recordHandler = function(record) { |
|
752 | record.decrypt(ID.get("WeaveCryptoID")); |
|
753 | canDecrypt = true; |
|
754 | }; |
|
755 | ||
756 | // Any failure fetching/decrypting will just result in false |
|
757 | try { |
|
758 | this._log.trace("Trying to decrypt a record from the server.."); |
|
759 | test.get(); |
|
760 | } |
|
761 | catch(ex) { |
|
762 | this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex)); |
|
763 | } |
|
764 | ||
765 | return canDecrypt; |
|
766 | }, |
|
767 | ||
54 | 768 | _resetClient: function SyncEngine__resetClient() { |
36 | 769 | this.resetLastSync(); |
36 | 770 | this.toFetch = []; |
771 | } |
|
9 | 772 | }; |