engines.js

772 lines, 451 LOC, 386 covered (85%)

9 1
/* ***** BEGIN LICENSE BLOCK *****
2
 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
3
 *
4
 * The contents of this file are subject to the Mozilla Public License Version
5
 * 1.1 (the "License"); you may not use this file except in compliance with
6
 * the License. You may obtain a copy of the License at
7
 * http://www.mozilla.org/MPL/
8
 *
9
 * Software distributed under the License is distributed on an "AS IS" basis,
10
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11
 * for the specific language governing rights and limitations under the
12
 * License.
13
 *
14
 * The Original Code is Bookmarks Sync.
15
 *
16
 * The Initial Developer of the Original Code is Mozilla.
17
 * Portions created by the Initial Developer are Copyright (C) 2007
18
 * the Initial Developer. All Rights Reserved.
19
 *
20
 * Contributor(s):
21
 *  Dan Mills <thunder@mozilla.com>
22
 *  Myk Melez <myk@mozilla.org>
23
 *
24
 * Alternatively, the contents of this file may be used under the terms of
25
 * either the GNU General Public License Version 2 or later (the "GPL"), or
26
 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27
 * in which case the provisions of the GPL or the LGPL are applicable instead
28
 * of those above. If you wish to allow use of your version of this file only
29
 * under the terms of either the GPL or the LGPL, and not to allow others to
30
 * use your version of this file under the terms of the MPL, indicate your
31
 * decision by deleting the provisions above and replace them with the notice
32
 * and other provisions required by the GPL or the LGPL. If you do not delete
33
 * the provisions above, a recipient may use your version of this file under
34
 * the terms of any one of the MPL, the GPL or the LGPL.
35
 *
36
 * ***** END LICENSE BLOCK ***** */
37
117 38
const EXPORTED_SYMBOLS = ['Engines', 'Engine', 'SyncEngine'];
39
36 40
const Cc = Components.classes;
36 41
const Ci = Components.interfaces;
36 42
const Cr = Components.results;
36 43
const Cu = Components.utils;
44
45 45
Cu.import("resource://weave/ext/Observers.js");
45 46
Cu.import("resource://weave/ext/Sync.js");
45 47
Cu.import("resource://weave/log4moz.js");
45 48
Cu.import("resource://weave/constants.js");
45 49
Cu.import("resource://weave/util.js");
45 50
Cu.import("resource://weave/resource.js");
45 51
Cu.import("resource://weave/identity.js");
45 52
Cu.import("resource://weave/stores.js");
45 53
Cu.import("resource://weave/trackers.js");
54
45 55
Cu.import("resource://weave/base_records/wbo.js");
45 56
Cu.import("resource://weave/base_records/keys.js");
45 57
Cu.import("resource://weave/base_records/crypto.js");
45 58
Cu.import("resource://weave/base_records/collection.js");
59
60
// Singleton service, holds registered engines
61
63 62
Utils.lazy(this, 'Engines', EngineManagerSvc);
63
27 64
function EngineManagerSvc() {
36 65
  this._engines = {};
63 66
  this._log = Log4Moz.repository.getLogger("Service.Engines");
54 67
  this._log.level = Log4Moz.Level[Svc.Prefs.get(
54 68
    "log.logger.service.engines", "Debug")];
69
}
18 70
EngineManagerSvc.prototype = {
29 71
  get: function EngMgr_get(name) {
72
    // Return an array of engines if we have an array of names
56 73
    if (Utils.isArray(name)) {
2 74
      let engines = [];
5 75
      name.forEach(function(name) {
10 76
        let engine = this.get(name);
4 77
        if (engine)
12 78
          engines.push(engine);
3 79
      }, this);
4 80
      return engines;
81
    }
82
40 83
    let engine = this._engines[name];
30 84
    if (!engine)
24 85
      this._log.debug("Could not get engine: " + name);
20 86
    return engine;
87
  },
26 88
  getAll: function EngMgr_getAll() {
427 89
    return [engine for ([name, engine] in Iterator(Engines._engines))];
90
  },
22 91
  getEnabled: function EngMgr_getEnabled() {
73 92
    return this.getAll().filter(function(engine) engine.enabled);
93
  },
94
95
  /**
96
   * Register an Engine to the service. Alternatively, give an array of engine
97
   * objects to register.
98
   *
99
   * @param engineObject
100
   *        Engine object used to get an instance of the engine
101
   * @return The engine object if anything failed
102
   */
38 103
  register: function EngMgr_register(engineObject) {
100 104
    if (Utils.isArray(engineObject))
18 105
      return engineObject.map(this.register, this);
106
34 107
    try {
51 108
      let engine = new engineObject();
34 109
      let name = engine.name;
51 110
      if (name in this._engines)
10 111
        this._log.error("Engine '" + name + "' is already registered!");
112
      else
98 113
        this._engines[name] = engine;
114
    }
115
    catch(ex) {
116
      let mesg = ex.message ? ex.message : ex;
117
      let name = engineObject || "";
118
      name = name.prototype || "";
119
      name = name.name || "";
120
121
      let out = "Could not initialize engine '" + name + "': " + mesg;
122
      dump(out);
123
      this._log.error(out);
124
125
      return engineObject;
17 126
    }
127
  },
47 128
  unregister: function EngMgr_unregister(val) {
4 129
    let name = val;
8 130
    if (val instanceof Engine)
2 131
      name = val.name;
10 132
    delete this._engines[name];
133
  }
134
};
135
60 136
function Engine(name) {
168 137
  this.Name = name || "Unnamed";
210 138
  this.name = name.toLowerCase();
139
252 140
  this._notify = Utils.notify("weave:engine:");
378 141
  this._log = Log4Moz.repository.getLogger("Engine." + this.Name);
378 142
  let level = Svc.Prefs.get("log.logger.engine." + this.name, "Debug");
252 143
  this._log.level = Log4Moz.Level[level];
144
84 145
  this._tracker; // initialize tracker to load previously changed IDs
294 146
  this._log.debug("Engine initialized");
147
}
18 148
Engine.prototype = {
149
  // _storeObj, and _trackerObj should to be overridden in subclasses
18 150
  _storeObj: Store,
18 151
  _trackerObj: Tracker,
152
63 153
  get prefName() this.name,
138 154
  get enabled() Svc.Prefs.get("engine." + this.prefName, null),
42 155
  set enabled(val) Svc.Prefs.set("engine." + this.prefName, !!val),
156
98 157
  get score() this._tracker.score,
158
34 159
  get _store() {
80 160
    let store = new this._storeObj(this.Name);
25518 161
    this.__defineGetter__("_store", function() store);
32 162
    return store;
163
  },
164
60 165
  get _tracker() {
210 166
    let tracker = new this._trackerObj(this.Name);
18714 167
    this.__defineGetter__("_tracker", function() tracker);
84 168
    return tracker;
169
  },
170
20 171
  get displayName() {
2 172
    try {
10 173
      return Str.engines.get(this.name);
10 174
    } catch (e) {}
175
4 176
    return this.Name;
177
  },
178
20 179
  sync: function Engine_sync() {
6 180
    if (!this.enabled)
2 181
      return;
182
3 183
    if (!this._sync)
184
      throw "engine does not implement _sync method";
185
3 186
    let times = {};
3 187
    let wrapped = {};
188
    // Find functions in any point of the prototype chain
64 189
    for (let _name in this) {
40 190
      let name = _name;
191
192
      // Ignore certain constructors/functions
120 193
      if (name.search(/^_(.+Obj|notify)$/) == 0)
6 194
        continue;
195
196
      // Only track functions but skip the constructors
102 197
      if (typeof this[name] == "function") {
24 198
        times[name] = [];
36 199
        wrapped[name] = this[name];
200
201
        // Wrap the original function with a start/stop timer
86 202
        this[name] = function() {
4 203
          let start = Date.now();
1 204
          try {
10 205
            return wrapped[name].apply(this, arguments);
206
          }
1 207
          finally {
12 208
            times[name].push(Date.now() - start);
209
          }
210
        };
211
      }
212
    }
213
1 214
    try {
11 215
      this._notify("sync", this.name, this._sync)();
216
    }
2 217
    finally {
218
      // Restore original unwrapped functionality
72 219
      for (let [name, func] in Iterator(wrapped))
40 220
        this[name] = func;
221
3 222
      let stats = {};
78 223
      for (let [name, time] in Iterator(times)) {
224
        // Figure out stats on the times unless there's nothing
18 225
        let num = time.length;
18 226
        if (num == 0)
10 227
          continue;
228
229
        // Track the min/max/sum of the values
1 230
        let stat = {
2 231
          num: num,
4 232
          sum: 0
233
        };
6 234
        time.forEach(function(val) {
6 235
          if (stat.min == null || val < stat.min)
3 236
            stat.min = val;
6 237
          if (stat.max == null || val > stat.max)
3 238
            stat.max = val;
7 239
          stat.sum += val;
240
        });
241
10 242
        stat.avg = Number((stat.sum / num).toFixed(2));
21 243
        stats[name] = stat;
244
      }
245
3 246
      stats.toString = function() {
247
        let sums = [];
248
        for (let [name, stat] in Iterator(this))
249
          if (stat.sum != null)
250
            sums.push(name.replace(/^_/, "") + " " + stat.sum);
251
252
        // Order certain functions first before any other random ones
253
        let nameOrder = ["sync", "processIncoming", "uploadOutgoing",
254
          "syncStartup", "syncFinish"];
255
        let getPos = function(str) {
256
          let pos = nameOrder.indexOf(str.split(" ")[0]);
257
          return pos != -1 ? pos : Infinity;
258
        };
259
        let order = function(a, b) getPos(a) > getPos(b);
260
261
        return "Total (ms): " + sums.sort(order).join(", ");
262
      };
263
8 264
      this._log.debug(stats);
1 265
    }
266
  },
267
268
  /**
269
   * Get rid of any local meta-data
270
   */
21 271
  resetClient: function Engine_resetClient() {
9 272
    if (!this._resetClient)
273
      throw "engine does not implement _resetClient method";
274
30 275
    this._notify("reset-client", this.name, this._resetClient)();
276
  },
277
19 278
  _wipeClient: function Engine__wipeClient() {
4 279
    this.resetClient();
6 280
    this._log.debug("Deleting all local data");
3 281
    this._tracker.ignoreAll = true;
5 282
    this._store.wipe();
3 283
    this._tracker.ignoreAll = false;
6 284
    this._tracker.clearChangedIDs();
285
  },
286
46 287
  wipeClient: function Engine_wipeClient() {
10 288
    this._notify("wipe-client", this.name, this._wipeClient)();
289
  }
290
};
291
54 292
function SyncEngine(name) {
252 293
  Engine.call(this, name || "SyncEngine");
180 294
  this.loadToFetch();
295
}
18 296
SyncEngine.prototype = {
27 297
  __proto__: Engine.prototype,
18 298
  _recordObj: CryptoWrapper,
18 299
  version: 1,
300
29130 301
  get storageURL() Svc.Prefs.get("clusterURL") + Svc.Prefs.get("storageAPI") +
24260 302
    "/" + ID.get("WeaveID").username + "/storage/",
303
308 304
  get engineURL() this.storageURL + this.name,
305
16545 306
  get cryptoMetaURL() this.storageURL + "crypto/" + this.name,
307
48 308
  get metaURL() this.storageURL + "meta/global",
309
28 310
  get syncID() {
311
    // Generate a random syncID if we don't have one
90 312
    let syncID = Svc.Prefs.get(this.name + ".syncID", "");
75 313
    return syncID == "" ? this.syncID = Utils.makeGUID() : syncID;
314
  },
26 315
  set syncID(value) {
80 316
    Svc.Prefs.set(this.name + ".syncID", value);
317
  },
318
138 319
  get lastSync() {
1320 320
    return parseFloat(Svc.Prefs.get(this.name + ".lastSync", "0"));
321
  },
53 322
  set lastSync(value) {
323
    // Reset the pref in-case it's a number instead of a string
280 324
    Svc.Prefs.reset(this.name + ".lastSync");
325
    // Store the value as a string to keep floating point precision
420 326
    Svc.Prefs.set(this.name + ".lastSync", value.toString());
327
  },
28 328
  resetLastSync: function SyncEngine_resetLastSync() {
90 329
    this._log.debug("Resetting " + this.name + " last sync time");
80 330
    Svc.Prefs.reset(this.name + ".lastSync");
100 331
    Svc.Prefs.set(this.name + ".lastSync", "0");
332
  },
333
579 334
  get toFetch() this._toFetch,
54 335
  set toFetch(val) {
108 336
    this._toFetch = val;
360 337
    Utils.jsonSave("toFetch/" + this.name, this, val);
338
  },
339
55 340
  loadToFetch: function loadToFetch() {
341
    // Initialize to empty if there's no file
111 342
    this._toFetch = [];
485 343
    Utils.jsonLoad("toFetch/" + this.name, this, Utils.bind2(this, function(o)
53 344
      this._toFetch = o));
345
  },
346
347
  // Create a new record using the store and add in crypto fields
2373 348
  _createRecord: function SyncEngine__createRecord(id) {
14130 349
    let record = this._store.createRecord(id);
7065 350
    record.id = id;
7065 351
    record.encryption = this.cryptoMetaURL;
4710 352
    return record;
353
  },
354
355
  // Any setup that needs to happen at the beginning of each sync.
356
  // Makes sure crypto records and keys are all set-up
22 357
  _syncStartup: function SyncEngine__syncStartup() {
24 358
    this._log.trace("Ensuring server crypto records are there");
359
360
    // Determine if we need to wipe on outdated versions
8 361
    let wipeServerData = false;
20 362
    let metaGlobal = Records.get(this.metaURL);
24 363
    let engines = metaGlobal.payload.engines || {};
24 364
    let engineData = engines[this.name] || {};
365
366
    // Assume missing versions are 0 and wipe the server
18 367
    if ((engineData.version || 0) < this.version) {
20 368
      this._log.debug("Old engine data: " + [engineData.version, this.version]);
369
370
      // Prepare to clear the server and upload everything
4 371
      wipeServerData = true;
6 372
      this.syncID = "";
373
374
      // Set the newer version and newly generated syncID
6 375
      engineData.version = this.version;
6 376
      engineData.syncID = this.syncID;
377
378
      // Put the new data back into meta/global and mark for upload
8 379
      engines[this.name] = engineData;
6 380
      metaGlobal.payload.engines = engines;
8 381
      metaGlobal.changed = true;
382
    }
383
    // Don't sync this engine if the server has newer data
7 384
    else if (engineData.version > this.version) {
8 385
      let error = new String("New data: " + [engineData.version, this.version]);
3 386
      error.failureCode = VERSION_OUT_OF_DATE;
2 387
      throw error;
388
    }
389
    // Changes to syncID mean we'll need to upload everything
3 390
    else if (engineData.syncID != this.syncID) {
10 391
      this._log.debug("Engine syncIDs: " + [engineData.syncID, this.syncID]);
3 392
      this.syncID = engineData.syncID;
4 393
      this._resetClient();
394
    };
395
396
    // Try getting/unwrapping the crypto record
15 397
    let meta = CryptoMetas.get(this.cryptoMetaURL);
6 398
    if (meta) {
2 399
      try {
4 400
        let pubkey = PubKeys.getDefaultKey();
5 401
        let privkey = PrivKeys.get(pubkey.privateKeyUri);
8 402
        meta.getKey(privkey, ID.get("WeaveCryptoID"));
403
      }
3 404
      catch(ex) {
405
        // Remove traces of this bad cryptometa and tainted data
8 406
        this._log.debug("Purging bad data after failed unwrap crypto: " + ex);
5 407
        CryptoMetas.del(this.cryptoMetaURL);
2 408
        meta = null;
4 409
        wipeServerData = true;
410
      }
411
    }
412
413
    // Delete all server data and reupload on bad version or meta
6 414
    if (wipeServerData) {
12 415
      new Resource(this.engineURL).delete();
8 416
      this._resetClient();
417
    }
418
419
    // Generate a new crypto record
12 420
    if (!meta) {
15 421
      let symkey = Svc.Crypto.generateRandomKey();
12 422
      let pubkey = PubKeys.getDefaultKey();
12 423
      meta = new CryptoMeta(this.cryptoMetaURL);
18 424
      meta.addUnwrappedKey(pubkey, symkey);
12 425
      let res = new Resource(meta.uri);
15 426
      let resp = res.put(meta);
9 427
      if (!resp.success) {
428
        this._log.debug("Metarecord upload fail:" + resp);
429
        resp.failureCode = ENGINE_METARECORD_UPLOAD_FAIL;
430
        throw resp;
431
      }
432
433
      // Cache the cryto meta that we just put on the server
21 434
      CryptoMetas.set(meta.uri, meta);
435
    }
436
437
    // Mark all items to be uploaded, but treat them as changed from long ago
9 438
    if (!this.lastSync) {
18 439
      this._log.debug("First sync, uploading all items");
23 440
      for (let id in this._store.getAllIDs())
21 441
        this._tracker.addChangedID(id, 0);
442
    }
443
48 444
    let outnum = [i for (i in this._tracker.changedIDs)].length;
24 445
    this._log.info(outnum + " outgoing items pre-reconciliation");
446
447
    // Keep track of what to delete at the end of sync
15 448
    this._delete = {};
449
  },
450
451
  // Generate outgoing records
46 452
  _processIncoming: function SyncEngine__processIncoming() {
168 453
    this._log.trace("Downloading & applying server changes");
454
455
    // Figure out how many total items to fetch this sync; do less on mobile.
456
    // 50 is hardcoded here because of URL length restrictions.
457
    // (GUIDs are 10 chars)
56 458
    let fetchNum = Infinity;
196 459
    if (Svc.Prefs.get("client.type") == "mobile")
50 460
      fetchNum = 50;
461
140 462
    let newitems = new Collection(this.engineURL, this._recordObj);
84 463
    newitems.newer = this.lastSync;
84 464
    newitems.full = true;
84 465
    newitems.sort = "index";
84 466
    newitems.limit = fetchNum;
467
196 468
    let count = {applied: 0, reconciled: 0};
56 469
    let handled = [];
1447 470
    newitems.recordHandler = Utils.bind2(this, function(item) {
471
      // Grab a later last modified if possible
8751 472
      if (this.lastModified == null || item.modified > this.lastModified)
27 473
        this.lastModified = item.modified;
474
475
      // Remember which records were processed
6255 476
      handled.push(item.id);
477
1251 478
      try {
10008 479
        item.decrypt(ID.get("WeaveCryptoID"));
6255 480
        if (this._reconcile(item)) {
3741 481
          count.applied++;
3741 482
          this._tracker.ignoreAll = true;
8729 483
          this._store.applyIncoming(item);
484
        } else {
12 485
          count.reconciled++;
1283 486
          this._log.trace("Skipping reconciled incoming item " + item.id);
487
        }
488
      }
489
      catch(ex) {
490
        this._log.warn("Error processing record: " + Utils.exceptionStr(ex));
491
      }
3753 492
      this._tracker.ignoreAll = false;
7506 493
      Sync.sleep(0);
494
    });
495
496
    // Only bother getting data from the server if there's new things
194 497
    if (this.lastModified == null || this.lastModified > this.lastSync) {
24 498
      let resp = newitems.get();
18 499
      if (!resp.success) {
500
        resp.failureCode = ENGINE_DOWNLOAD_FAIL;
501
        throw resp;
502
      }
503
504
      // Subtract out the number of items we just got
36 505
      fetchNum -= handled.length;
506
    }
507
508
    // Check if we got the maximum that we requested; get the rest if so
113 509
    if (handled.length == newitems.limit) {
4 510
      let guidColl = new Collection(this.engineURL);
3 511
      guidColl.newer = this.lastSync;
3 512
      guidColl.sort = "index";
513
4 514
      let guids = guidColl.get();
3 515
      if (!guids.success)
516
        throw guids;
517
518
      // Figure out which guids weren't just fetched then remove any guids that
519
      // were already waiting and prepend the new ones
6 520
      let extra = Utils.arraySub(guids.obj, handled);
4 521
      if (extra.length > 0)
11 522
        this.toFetch = extra.concat(Utils.arraySub(this.toFetch, extra));
523
    }
524
525
    // Process any backlog of GUIDs if we haven't fetched too many this sync
536 526
    while (this.toFetch.length > 0 && fetchNum > 0) {
527
      // Reuse the original query, but get rid of the restricting params
72 528
      newitems.limit = 0;
72 529
      newitems.newer = 0;
530
531
      // Get the first bunch of records and save the rest for later
216 532
      let minFetch = Math.min(150, this.toFetch.length, fetchNum);
192 533
      newitems.ids = this.toFetch.slice(0, minFetch);
168 534
      this.toFetch = this.toFetch.slice(minFetch);
96 535
      fetchNum -= minFetch;
536
537
      // Reuse the existing record handler set earlier
96 538
      let resp = newitems.get();
72 539
      if (!resp.success) {
540
        resp.failureCode = ENGINE_DOWNLOAD_FAIL;
24 541
        throw resp;
542
      }
543
    }
544
84 545
    if (this.lastSync < this.lastModified)
15 546
      this.lastSync = this.lastModified;
547
196 548
    this._log.info(["Records:", count.applied, "applied,", count.reconciled,
336 549
      "reconciled,", this.toFetch.length, "left to fetch"].join(" "));
550
  },
551
552
  /**
553
   * Find a GUID of an item that is a duplicate of the incoming item but happens
554
   * to have a different GUID
555
   *
556
   * @return GUID of the similar item; falsy otherwise
557
   */
18 558
  _findDupe: function _findDupe(item) {
559
    // By default, assume there's no dupe items for the engine
560
  },
561
25 562
  _isEqual: function SyncEngine__isEqual(item) {
35 563
    let local = this._createRecord(item.id);
49 564
    if (this._log.level <= Log4Moz.Level.Trace)
565
      this._log.trace("Local record: " + local);
42 566
    if (Utils.deepEquals(item.cleartext, local.cleartext)) {
24 567
      this._log.trace("Local record is the same");
8 568
      return true;
569
    } else {
18 570
      this._log.trace("Local record is different");
6 571
      return false;
572
    }
573
  },
574
20 575
  _deleteId: function _deleteId(id) {
12 576
    this._tracker.removeChangedID(id);
577
578
    // Remember this id to delete at the end of sync
10 579
    if (this._delete.ids == null)
5 580
      this._delete.ids = [id];
581
    else
9 582
      this._delete.ids.push(id);
583
  },
584
20 585
  _handleDupe: function _handleDupe(item, dupeId) {
586
    // Prefer shorter guids; for ties, just do an ASCII compare
14 587
    let preferLocal = dupeId.length < item.id.length ||
9 588
      (dupeId.length == item.id.length && dupeId < item.id);
589
4 590
    if (preferLocal) {
10 591
      this._log.trace("Preferring local id: " + [dupeId, item.id]);
5 592
      this._deleteId(item.id);
3 593
      item.id = dupeId;
8 594
      this._tracker.addChangedID(dupeId, 0);
595
    }
596
    else {
10 597
      this._log.trace("Switching local id to incoming: " + [item.id, dupeId]);
7 598
      this._store.changeItemID(dupeId, item.id);
5 599
      this._deleteId(dupeId);
2 600
    }
601
  },
602
603
  // Reconcile incoming and existing records.  Return true if server
604
  // data should be applied.
1269 605
  _reconcile: function SyncEngine__reconcile(item) {
8757 606
    if (this._log.level <= Log4Moz.Level.Trace)
607
      this._log.trace("Incoming: " + item);
608
7506 609
    this._log.trace("Reconcile step 1: Check for conflicts");
6257 610
    if (item.id in this._tracker.changedIDs) {
611
      // If the incoming and local changes are the same, skip
10 612
      if (this._isEqual(item)) {
6 613
        this._tracker.removeChangedID(item.id);
4 614
        return false;
615
      }
616
617
      // Records differ so figure out which to take
5 618
      let recordAge = Resource.serverTime - item.modified;
12 619
      let localAge = Date.now() / 1000 - this._tracker.changedIDs[item.id];
10 620
      this._log.trace("Record age vs local age: " + [recordAge, localAge]);
621
622
      // Apply the record if the record is newer (server wins)
6 623
      return recordAge < localAge;
624
    }
625
7494 626
    this._log.trace("Reconcile step 2: Check for updates");
7494 627
    if (this._store.itemExists(item.id))
30 628
      return !this._isEqual(item);
629
7464 630
    this._log.trace("Reconcile step 2.5: Don't dupe deletes");
2488 631
    if (item.deleted)
632
      return true;
633
7464 634
    this._log.trace("Reconcile step 3: Find dupes");
6220 635
    let dupeId = this._findDupe(item);
2488 636
    if (dupeId)
12 637
      this._handleDupe(item, dupeId);
638
639
    // Apply the incoming item (now that the dupe is the right id)
2488 640
    return true;
641
  },
642
643
  // Upload outgoing records
20 644
  _uploadOutgoing: function SyncEngine__uploadOutgoing() {
14104 645
    let outnum = [i for (i in this._tracker.changedIDs)].length;
6 646
    if (outnum) {
18 647
      this._log.trace("Preparing " + outnum + " outgoing records");
648
649
      // collection we'll upload
8 650
      let up = new Collection(this.engineURL);
4 651
      let count = 0;
652
653
      // Upload what we've got so far in the collection
37 654
      let doUpload = Utils.bind2(this, function(desc) {
275 655
        this._log.info("Uploading " + desc + " of " + outnum + " records");
100 656
        let resp = up.post();
75 657
        if (!resp.success) {
658
          this._log.debug("Uploading records failed: " + resp);
659
          resp.failureCode = ENGINE_UPLOAD_FAIL;
660
          throw resp;
661
        }
662
663
        // Record the modified time of the upload
100 664
        let modified = resp.headers["x-weave-timestamp"];
75 665
        if (modified > this.lastSync)
75 666
          this.lastSync = modified;
667
125 668
        up.clearRecords();
669
      });
670
4704 671
      for (let id in this._tracker.changedIDs) {
4692 672
        try {
11730 673
          let out = this._createRecord(id);
16422 674
          if (this._log.level <= Log4Moz.Level.Trace)
675
            this._log.trace("Outgoing: " + out);
676
18768 677
          out.encrypt(ID.get("WeaveCryptoID"));
16422 678
          up.pushData(out);
679
        }
680
        catch(ex) {
681
          this._log.warn("Error creating record: " + Utils.exceptionStr(ex));
682
        }
683
684
        // Partial upload
11730 685
        if ((++count % MAX_UPLOAD_RECORDS) == 0)
253 686
          doUpload((count - MAX_UPLOAD_RECORDS) + " - " + count + " out");
687
16430 688
        Sync.sleep(0);
689
      }
690
691
      // Final upload
10 692
      if (count % MAX_UPLOAD_RECORDS > 0)
17 693
        doUpload(count >= MAX_UPLOAD_RECORDS ? "last batch" : "all");
694
    }
12 695
    this._tracker.clearChangedIDs();
696
  },
697
698
  // Any cleanup necessary.
699
  // Save the current snapshot so as to calculate changes at next sync
21 700
  _syncFinish: function SyncEngine__syncFinish() {
18 701
    this._log.trace("Finishing up sync");
15 702
    this._tracker.resetScore();
703
42 704
    let doDelete = Utils.bind2(this, function(key, val) {
120 705
      let coll = new Collection(this.engineURL, this._recordObj);
96 706
      coll[key] = val;
120 707
      coll.delete();
708
    });
709
51 710
    for (let [key, val] in Iterator(this._delete)) {
711
      // Remove the key for future uses
12 712
      delete this._delete[key];
713
714
      // Send a simple delete for the property
21 715
      if (key != "ids" || val.length <= 100)
12 716
        doDelete(key, val);
717
      else {
718
        // For many ids, split into chunks of at most 100
133 719
        while (val.length > 0) {
198 720
          doDelete(key, val.slice(0, 100));
110 721
          val = val.slice(100);
722
        }
723
      }
3 724
    }
725
  },
726
19 727
  _sync: function SyncEngine__sync() {
1 728
    try {
3 729
      this._syncStartup();
730
      Observers.notify("weave:engine:sync:status", "process-incoming");
731
      this._processIncoming();
732
      Observers.notify("weave:engine:sync:status", "upload-outgoing");
733
      this._uploadOutgoing();
734
      this._syncFinish();
735
    }
3 736
    catch (e) {
6 737
      this._log.warn("Sync failed");
2 738
      throw e;
739
    }
740
  },
741
18 742
  _testDecrypt: function _testDecrypt() {
743
    // Report failure even if there's nothing to decrypt
744
    let canDecrypt = false;
745
746
    // Fetch the most recently uploaded record and try to decrypt it
747
    let test = new Collection(this.engineURL, this._recordObj);
748
    test.limit = 1;
749
    test.sort = "newest";
750
    test.full = true;
751
    test.recordHandler = function(record) {
752
      record.decrypt(ID.get("WeaveCryptoID"));
753
      canDecrypt = true;
754
    };
755
756
    // Any failure fetching/decrypting will just result in false
757
    try {
758
      this._log.trace("Trying to decrypt a record from the server..");
759
      test.get();
760
    }
761
    catch(ex) {
762
      this._log.debug("Failed test decrypt: " + Utils.exceptionStr(ex));
763
    }
764
765
    return canDecrypt;
766
  },
767
54 768
  _resetClient: function SyncEngine__resetClient() {
36 769
    this.resetLastSync();
36 770
    this.toFetch = [];
771
  }
9 772
};