Browse code

BM-14968 Chore: zero-copy write when dealing with incoming streams

Thomas Cataldo authored on 16/07/2019 04:41:37
Showing 2 changed files
... ...
@@ -41,6 +41,7 @@ public class TokensHandler {
41 41
 	long frameId = 0;
42 42
 
43 43
 	private ReplicationFrameBuilder frameBuilder;
44
+	private static final byte[] DELIM = "\r\n".getBytes();
44 45
 
45 46
 	public TokensHandler() {
46 47
 		frameBuilder = new ReplicationFrameBuilder(frameId++);
... ...
@@ -73,7 +74,7 @@ public class TokensHandler {
73 74
 
74 75
 			}
75 76
 		} else {
76
-			parser.delimitedMode("\r\n".getBytes());
77
+			parser.delimitedMode(DELIM);
77 78
 			delimitedMode = true;
78 79
 		}
79 80
 
... ...
@@ -20,11 +20,12 @@ package net.bluemind.backend.mail.parsing;
20 20
 import java.io.File;
21 21
 import java.io.IOException;
22 22
 import java.io.InputStream;
23
-import java.io.OutputStream;
23
+import java.nio.ByteBuffer;
24
+import java.nio.channels.SeekableByteChannel;
24 25
 import java.nio.file.Files;
25 26
 import java.nio.file.Path;
27
+import java.nio.file.StandardOpenOption;
26 28
 import java.util.concurrent.CompletableFuture;
27
-import java.util.concurrent.atomic.AtomicLong;
28 29
 import java.util.function.Function;
29 30
 
30 31
 import org.slf4j.Logger;
... ...
@@ -49,26 +50,26 @@ public class EZInputStreamAdapter {
49 50
 
50 51
 	private static class ResetableOutput {
51 52
 		private final Path file;
52
-		private final OutputStream fileOut;
53
+		private final SeekableByteChannel fileOut;
53 54
 		private boolean closed;
54 55
 		private boolean reset;
55 56
 
56
-		public ResetableOutput(File f) {
57
-			this.file = f.toPath();
57
+		public ResetableOutput(Path path) {
58
+			this.file = path;
58 59
 			try {
59
-				this.fileOut = Files.newOutputStream(file);
60
+				this.fileOut = Files.newByteChannel(file, StandardOpenOption.CREATE,
61
+						StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
60 62
 			} catch (IOException e) {
61 63
 				throw new AdaptException(e);
62 64
 			}
63 65
 		}
64 66
 
65
-		public void write(byte[] data) throws IOException {
67
+		public void write(ByteBuffer data) throws IOException {
66 68
 			fileOut.write(data);
67 69
 		}
68 70
 
69 71
 		public void close() {
70 72
 			try {
71
-				fileOut.flush();
72 73
 				fileOut.close();
73 74
 			} catch (Exception e) {
74 75
 				logger.error(e.getMessage(), e);
... ...
@@ -103,30 +104,27 @@ public class EZInputStreamAdapter {
103 104
 
104 105
 	}
105 106
 
106
-	private static boolean shm = isShmAvailable();
107
+	private static final Path shmParent = new File("/dev/shm/sync.bodies/").toPath();
108
+	private static final boolean shm = isShmAvailable();
107 109
 
108 110
 	private static boolean isShmAvailable() {
109 111
 		File shm = new File("/dev/shm");
110 112
 		boolean ret = shm.exists() && shm.isDirectory();
111 113
 		if (ret) {
112
-			File out = new File(shm, "sync.bodies");
113
-			out.mkdirs();
114
+			shmParent.toFile().mkdirs();
114 115
 		}
115 116
 		return ret;
116 117
 	}
117 118
 
118
-	private static final AtomicLong tmpName = new AtomicLong();
119
-
120 119
 	private static ResetableOutput output() {
121
-		if (shm) {
122
-			File f = new File("/dev/shm/sync.bodies/body." + tmpName.incrementAndGet());
123
-			return new ResetableOutput(f);
124
-		} else {
125
-			try {
126
-				return new ResetableOutput(File.createTempFile("ez-is-adapt", ".stream"));
127
-			} catch (IOException e) {
128
-				throw new AdaptException(e);
120
+		try {
121
+			if (shm) {
122
+				return new ResetableOutput(Files.createTempFile(shmParent, "ez-is-adapt", ".stream"));
123
+			} else {
124
+				return new ResetableOutput(Files.createTempFile("ez-is-adapt", ".stream"));
129 125
 			}
126
+		} catch (IOException e) {
127
+			throw new AdaptException(e);
130 128
 		}
131 129
 	}
132 130
 
... ...
@@ -172,9 +170,8 @@ public class EZInputStreamAdapter {
172 170
 
173 171
 		});
174 172
 		vxStream.dataHandler(buf -> {
175
-			byte[] toAdd = buf.getBytes();
176 173
 			try {
177
-				diskCopy.write(toAdd);
174
+				diskCopy.write(buf.getByteBuf().nioBuffer());
178 175
 			} catch (IOException e) {
179 176
 				logger.error(e.getMessage());
180 177
 			}