Browse code

Fix: add owner as map key

Thomas Fricker authored on 03/06/2019 14:41:32
Showing 7 changed files
... ...
@@ -59,7 +59,7 @@ public class EmlBuilder {
59 59
 
60 60
 	private static final Logger logger = LoggerFactory.getLogger(EmlBuilder.class);
61 61
 
62
-	public static Message of(MessageBody mb) {
62
+	public static Message of(MessageBody mb, String owner) {
63 63
 
64 64
 		MessageImpl msg = new MessageImpl();
65 65
 		msg.setDate(mb.date);
... ...
@@ -75,7 +75,7 @@ public class EmlBuilder {
75 75
 
76 76
 		Part structure = mb.structure;
77 77
 		try {
78
-			Body body = createBody(bbf, mb.structure);
78
+			Body body = createBody(bbf, mb.structure, owner);
79 79
 			if (body instanceof MultipartImpl) {
80 80
 				msg.setMultipart((MultipartImpl) body);
81 81
 			} else {
... ...
@@ -133,23 +133,23 @@ public class EmlBuilder {
133 133
 		}
134 134
 	}
135 135
 
136
-	private static Body createBody(BasicBodyFactory bbf, Part structure) throws IOException {
136
+	private static Body createBody(BasicBodyFactory bbf, Part structure, String owner) throws IOException {
137 137
 		Body body = null;
138 138
 		if (structure.children.isEmpty()) {
139 139
 			switch (structure.mime) {
140 140
 			case "text/plain":
141 141
 			case "text/html":
142
-				body = bbf.textBody(inputStream(null, null, null, structure).input, "utf-8");
142
+				body = bbf.textBody(inputStream(null, null, null, structure, owner).input, "utf-8");
143 143
 				break;
144 144
 			default:
145
-				body = bbf.binaryBody(inputStream(null, null, null, structure).input);
145
+				body = bbf.binaryBody(inputStream(null, null, null, structure, owner).input);
146 146
 			}
147 147
 		} else {
148 148
 			// multipart
149 149
 			MultipartImpl mp = new MultipartImpl(structure.mime.substring("multipart/".length()));
150 150
 			for (Part p : structure.children) {
151 151
 				logger.info("Adding part {}", p.mime);
152
-				Body childBody = createBody(bbf, p);
152
+				Body childBody = createBody(bbf, p, owner);
153 153
 				BodyPart bp = new BodyPart();
154 154
 				if (childBody instanceof MultipartImpl) {
155 155
 					bp.setMultipart((MultipartImpl) childBody);
... ...
@@ -184,7 +184,7 @@ public class EmlBuilder {
184 184
 		}
185 185
 	}
186 186
 
187
-	public static SizedStream inputStream(Long id, String previousBody, Date date, Part structure) {
187
+	public static SizedStream inputStream(Long id, String previousBody, Date date, Part structure, String owner) {
188 188
 		File emlInput = new File(Bodies.STAGING, structure.address + ".part");
189 189
 		if (!emlInput.exists()) {
190 190
 			throw ServerFault.notFound("Missing staging file " + emlInput.getAbsolutePath());
... ...
@@ -203,7 +203,7 @@ public class EmlBuilder {
203 203
 		try (InputStream in = new FileInputStream(emlInput)) {
204 204
 			Message asMessage = Mime4JHelper.parse(in);
205 205
 			net.bluemind.backend.mail.api.MessageBody.Header idHeader = net.bluemind.backend.mail.api.MessageBody.Header
206
-					.create(MailApiHeaders.X_BM_INTERNAL_ID, InstallationId.getIdentifier() + ":" + id);
206
+					.create(MailApiHeaders.X_BM_INTERNAL_ID, owner + "#" + InstallationId.getIdentifier() + ":" + id);
207 207
 			List<net.bluemind.backend.mail.api.MessageBody.Header> toAdd = Arrays.asList(idHeader);
208 208
 			if (previousBody != null) {
209 209
 				net.bluemind.backend.mail.api.MessageBody.Header prevHeader = net.bluemind.backend.mail.api.MessageBody.Header
... ...
@@ -120,7 +120,7 @@ public class BodyStreamProcessorTests {
120 120
 		out.close();
121 121
 		MessageBodyData result = BodyStreamProcessor.processBody(stream).get(2, TimeUnit.SECONDS);
122 122
 		updateAddr(partAddr, result.body.structure);
123
-		Message rebuilt = EmlBuilder.of(result.body);
123
+		Message rebuilt = EmlBuilder.of(result.body, "owner");
124 124
 		ByteArrayOutputStream msgOut = new ByteArrayOutputStream();
125 125
 		Mime4JHelper.serialize(rebuilt, msgOut);
126 126
 		System.out.println("Re-created:\n" + msgOut.toString());
... ...
@@ -312,13 +312,23 @@ public class ReplicationStackTests extends AbstractRollingReplicationTests {
312 312
 	}
313 313
 
314 314
 	protected ItemValue<MailboxItem> addDraft(ItemValue<MailboxFolder> inbox) throws IOException, InterruptedException {
315
-		IOfflineMgmt idAllocator = provider().instance(IOfflineMgmt.class, domainUid, userUid);
315
+		return addDraft(inbox, userUid);
316
+	}
317
+
318
+	protected ItemValue<MailboxItem> addDraft(ItemValue<MailboxFolder> inbox, String owner)
319
+			throws IOException, InterruptedException {
320
+		IOfflineMgmt idAllocator = provider().instance(IOfflineMgmt.class, domainUid, owner);
316 321
 		IdRange oneId = idAllocator.allocateOfflineIds(1);
317
-		return addDraft(inbox, oneId.globalCounter);
322
+		return addDraft(inbox, oneId.globalCounter, owner);
318 323
 	}
319 324
 
320 325
 	protected ItemValue<MailboxItem> addDraft(ItemValue<MailboxFolder> inbox, long id)
321 326
 			throws IOException, InterruptedException {
327
+		return addDraft(inbox, id, userUid);
328
+	}
329
+
330
+	protected ItemValue<MailboxItem> addDraft(ItemValue<MailboxFolder> inbox, long id, String owner)
331
+			throws IOException, InterruptedException {
322 332
 		assertNotNull(inbox);
323 333
 		IMailboxItems recordsApi = provider().instance(IMailboxItems.class, inbox.uid);
324 334
 		try (InputStream in = testEml()) {
... ...
@@ -342,7 +352,7 @@ public class ReplicationStackTests extends AbstractRollingReplicationTests {
342 352
 			Optional<Header> idHeader = reloaded.value.body.headers.stream()
343 353
 					.filter(h -> h.name.equals(MailApiHeaders.X_BM_INTERNAL_ID)).findAny();
344 354
 			assertTrue(idHeader.isPresent());
345
-			assertEquals(InstallationId.getIdentifier() + ":" + expectedId, idHeader.get().firstValue());
355
+			assertEquals(owner + "#" + InstallationId.getIdentifier() + ":" + expectedId, idHeader.get().firstValue());
346 356
 			return reloaded;
347 357
 		}
348 358
 	}
... ...
@@ -369,7 +379,7 @@ public class ReplicationStackTests extends AbstractRollingReplicationTests {
369 379
 		Optional<Header> idHeader = reloaded.value.body.headers.stream()
370 380
 				.filter(h -> h.name.equals(MailApiHeaders.X_BM_INTERNAL_ID)).findAny();
371 381
 		assertTrue(idHeader.isPresent());
372
-		assertEquals(InstallationId.getIdentifier() + ":" + expectedId, idHeader.get().firstValue());
382
+		assertEquals(userUid + "#" + InstallationId.getIdentifier() + ":" + expectedId, idHeader.get().firstValue());
373 383
 
374 384
 	}
375 385
 
... ...
@@ -988,7 +998,7 @@ public class ReplicationStackTests extends AbstractRollingReplicationTests {
988 998
 			}
989 999
 		}
990 1000
 		assertNotNull(sent);
991
-		ItemValue<MailboxItem> item = addDraft(sent);
1001
+		ItemValue<MailboxItem> item = addDraft(sent, mailshare.name);
992 1002
 		IDbMailboxRecords recs = provider().instance(IDbMailboxRecords.class, sent.uid);
993 1003
 		Stream fetched = recs.fetchComplete(item.value.imapUid);
994 1004
 		assertNotNull(fetched);
... ...
@@ -1040,7 +1050,7 @@ public class ReplicationStackTests extends AbstractRollingReplicationTests {
1040 1050
 		assertNotNull(root);
1041 1051
 		IMailboxItems itemsApi = provider().instance(IMailboxItems.class, root.uid);
1042 1052
 
1043
-		ItemValue<MailboxItem> added = addDraft(root);
1053
+		ItemValue<MailboxItem> added = addDraft(root, mailshare.name);
1044 1054
 		Part partsRoot = added.value.body.structure;
1045 1055
 		PartsWalker<Void> walk = new PartsWalker<>(null);
1046 1056
 		AtomicInteger partBytes = new AtomicInteger();
... ...
@@ -46,15 +46,17 @@ public class BodyInternalIdCache {
46 46
 	public static class ExpectedId {
47 47
 		public final long id;
48 48
 		public final String updateOfBody;
49
+		public final String owner;
49 50
 
50
-		public ExpectedId(long id, String updateOfBody) {
51
+		public ExpectedId(long id, String owner, String updateOfBody) {
51 52
 			this.id = id;
53
+			this.owner = owner;
52 54
 			this.updateOfBody = updateOfBody;
53 55
 		}
54 56
 
55 57
 		public String toString() {
56 58
 			return MoreObjects.toStringHelper(ExpectedId.class).add("id", id).add("updateOfBody", updateOfBody)
57
-					.toString();
59
+					.add("owner", owner).toString();
58 60
 		}
59 61
 	}
60 62
 
... ...
@@ -65,20 +67,30 @@ public class BodyInternalIdCache {
65 67
 		public VanishedBody(long id) {
66 68
 			this.id = id;
67 69
 		}
70
+
71
+		@Override
72
+		public String toString() {
73
+			return "VanishedBody [id=" + id + ", version=" + version + "]";
74
+		}
75
+
68 76
 	}
69 77
 
70
-	public static ExpectedId expectedRecordId(String bodyUid) {
71
-		return bodyUidToExpectedRecordId.getIfPresent(bodyUid);
78
+	public static ExpectedId expectedRecordId(String owner, String bodyUid) {
79
+		ExpectedId ifPresent = bodyUidToExpectedRecordId.getIfPresent(bodyUid);
80
+		if (ifPresent != null && ifPresent.owner.equals(owner)) {
81
+			return ifPresent;
82
+		}
83
+		return null;
72 84
 	}
73 85
 
74
-	public static VanishedBody vanishedBody(String bodyUid) {
75
-		return vanish.getIfPresent(bodyUid);
86
+	public static VanishedBody vanishedBody(String owner, String bodyUid) {
87
+		return vanish.getIfPresent(owner + "#" + bodyUid);
76 88
 	}
77 89
 
78 90
 	public static void storeExpectedRecordId(String bodyUid, ExpectedId id) {
79 91
 		bodyUidToExpectedRecordId.put(bodyUid, id);
80 92
 		if (id.updateOfBody != null) {
81
-			vanish.put(id.updateOfBody, new VanishedBody(id.id));
93
+			vanish.put(id.owner + "#" + id.updateOfBody, new VanishedBody(id.id));
82 94
 		}
83 95
 	}
84 96
 
... ...
@@ -113,7 +113,7 @@ public class DbMailboxRecordsService extends BaseMailboxRecordsService implement
113 113
 		SubtreeLocation recordsLocation = optRecordsLocation
114 114
 				.orElseThrow(() -> new ServerFault("Missing subtree location"));
115 115
 
116
-		ExpectedId knownInternalId = BodyInternalIdCache.expectedRecordId(mail.messageBody);
116
+		ExpectedId knownInternalId = BodyInternalIdCache.expectedRecordId(container.owner, mail.messageBody);
117 117
 		ItemVersion version = null;
118 118
 		boolean isUpdate = false;
119 119
 		if (knownInternalId != null) {
... ...
@@ -263,7 +263,7 @@ public class DbMailboxRecordsService extends BaseMailboxRecordsService implement
263 263
 					logger.debug("Skipping create on expunged record");
264 264
 					return;
265 265
 				}
266
-				VanishedBody vanished = BodyInternalIdCache.vanishedBody(mr.messageBody);
266
+				VanishedBody vanished = BodyInternalIdCache.vanishedBody(container.owner, mr.messageBody);
267 267
 				if (vanished != null) {
268 268
 					logger.info("Don't touch {} {} as it vanished", mr.imapUid, mr.messageBody);
269 269
 					upNotifs.add(UpdateNotif.of(vanished.version, mr));
... ...
@@ -281,7 +281,7 @@ public class DbMailboxRecordsService extends BaseMailboxRecordsService implement
281 281
 					version = storeService.createWithId(uid, expId, null, uid, mr);
282 282
 					GuidExpectedIdCache.invalidate(IMailReplicaUids.uniqueId(container.uid) + ":" + mr.messageBody);
283 283
 				} else {
284
-					ExpectedId knownInternalId = BodyInternalIdCache.expectedRecordId(mr.messageBody);
284
+					ExpectedId knownInternalId = BodyInternalIdCache.expectedRecordId(container.owner, mr.messageBody);
285 285
 					if (knownInternalId == null) {
286 286
 						try {
287 287
 							version = storeService.create(uid, uid, mr);
... ...
@@ -306,7 +306,8 @@ public class DbMailboxRecordsService extends BaseMailboxRecordsService implement
306 306
 								logger.info("Update record {} to point to a different body {}", knownInternalId,
307 307
 										mr.messageBody);
308 308
 								version = storeService.update(knownInternalId.id, uid, mr);
309
-								BodyInternalIdCache.vanishedBody(knownInternalId.updateOfBody).version = version;
309
+								BodyInternalIdCache.vanishedBody(container.owner,
310
+										knownInternalId.updateOfBody).version = version;
310 311
 								update = true;
311 312
 							} catch (ServerFault sf) {
312 313
 								version = storeService.createWithId(uid, knownInternalId.id, null, uid, mr);
... ...
@@ -335,7 +336,7 @@ public class DbMailboxRecordsService extends BaseMailboxRecordsService implement
335 336
 
336 337
 			AtomicInteger softDelete = new AtomicInteger();
337 338
 			toUpdate.forEach((String uid, MailboxRecord mr) -> {
338
-				VanishedBody vanished = BodyInternalIdCache.vanishedBody(mr.messageBody);
339
+				VanishedBody vanished = BodyInternalIdCache.vanishedBody(container.owner, mr.messageBody);
339 340
 				if (vanished != null) {
340 341
 					logger.info("Using version from vanished item {} and the old imap uid", vanished);
341 342
 					upNotifs.add(UpdateNotif.of(vanished.version, mr));
... ...
@@ -71,16 +71,19 @@ public class DbMessageBodiesService implements IDbMessageBodies {
71 71
 				if (idHeader.isPresent()) {
72 72
 					String idStr = idHeader.get().firstValue();
73 73
 					int instIdx = idStr.lastIndexOf(':');
74
-					if (instIdx > 0) {
75
-						String instId = idStr.substring(0, instIdx);
74
+					int ownerIdx = idStr.lastIndexOf('#');
75
+					if (instIdx > 0 && ownerIdx > 0 && ownerIdx < instIdx) {
76
+						String owner = idStr.substring(0, ownerIdx);
77
+						String instId = idStr.substring(ownerIdx + 1, instIdx);
76 78
 						if (InstallationId.getIdentifier().equals(instId)) {
77 79
 							long internalId = Long.parseLong(idStr.substring(instIdx + 1));
78 80
 							String prevBody = null;
79 81
 							if (prevHeader.isPresent()) {
80 82
 								prevBody = prevHeader.get().firstValue();
81 83
 							}
82
-							logger.warn("********** caching {} => {}", body.guid, internalId);
83
-							BodyInternalIdCache.storeExpectedRecordId(body.guid, new ExpectedId(internalId, prevBody));
84
+							logger.warn("********** caching {} => {} for owner {}", body.guid, internalId, owner);
85
+							BodyInternalIdCache.storeExpectedRecordId(body.guid,
86
+									new ExpectedId(internalId, owner, prevBody));
84 87
 						}
85 88
 					}
86 89
 				}
... ...
@@ -374,15 +374,15 @@ public class ImapMailboxRecordsService extends BaseMailboxRecordsService impleme
374 374
 	private SizedStream createEmlStructure(long id, String previousBody, MessageBody body) {
375 375
 		Part structure = body.structure;
376 376
 		if (structure.mime.equals("message/rfc822")) {
377
-			return EmlBuilder.inputStream(id, previousBody, body.date, structure);
377
+			return EmlBuilder.inputStream(id, previousBody, body.date, structure, container.owner);
378 378
 		} else {
379 379
 			try {
380
-				body.headers
381
-						.add(Header.create(MailApiHeaders.X_BM_INTERNAL_ID, InstallationId.getIdentifier() + ":" + id));
380
+				body.headers.add(Header.create(MailApiHeaders.X_BM_INTERNAL_ID,
381
+						container.owner + "#" + InstallationId.getIdentifier() + ":" + id));
382 382
 				if (previousBody != null) {
383 383
 					body.headers.add(Header.create(MailApiHeaders.X_BM_PREVIOUS_BODY, previousBody));
384 384
 				}
385
-				Message msg = EmlBuilder.of(body);
385
+				Message msg = EmlBuilder.of(body, container.owner);
386 386
 				return Mime4JHelper.asSizedStream(msg);
387 387
 			} catch (ServerFault sf) {
388 388
 				throw sf;