View Javadoc
1   /*
2    * Copyright (C) 2017, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.dfs;
12  
13  import static java.util.concurrent.TimeUnit.MILLISECONDS;
14  import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
15  import static org.junit.Assert.assertEquals;
16  import static org.junit.Assert.assertTrue;
17  
18  import java.time.Duration;
19  import java.util.Arrays;
20  import java.util.Collections;
21  import java.util.HashMap;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.stream.LongStream;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ExecutorService;
28  import java.util.concurrent.Executors;
29  
30  import org.eclipse.jgit.internal.storage.dfs.DfsBlockCacheConfig.IndexEventConsumer;
31  import org.eclipse.jgit.internal.storage.pack.PackExt;
32  import org.eclipse.jgit.junit.TestRepository;
33  import org.eclipse.jgit.junit.TestRng;
34  import org.eclipse.jgit.lib.ObjectId;
35  import org.eclipse.jgit.lib.ObjectInserter;
36  import org.eclipse.jgit.lib.ObjectReader;
37  import org.eclipse.jgit.revwalk.RevCommit;
38  import org.junit.Before;
39  import org.junit.Rule;
40  import org.junit.Test;
41  import org.junit.rules.TestName;
42  
43  public class DfsBlockCacheTest {
44  	@Rule
45  	public TestName testName = new TestName();
46  	private TestRng rng;
47  	private DfsBlockCache cache;
48  	private ExecutorService pool;
49  
50  	@Before
51  	public void setUp() {
52  		rng = new TestRng(testName.getMethodName());
53  		pool = Executors.newFixedThreadPool(10);
54  		resetCache();
55  	}
56  
57  	@SuppressWarnings("resource")
58  	@Test
59  	public void streamKeyReusesBlocks() throws Exception {
60  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
61  		InMemoryRepository r1 = new InMemoryRepository(repo);
62  		byte[] content = rng.nextBytes(424242);
63  		ObjectId id;
64  		try (ObjectInserter ins = r1.newObjectInserter()) {
65  			id = ins.insert(OBJ_BLOB, content);
66  			ins.flush();
67  		}
68  
69  		long oldSize = LongStream.of(cache.getCurrentSize()).sum();
70  		assertTrue(oldSize > 2000);
71  		assertEquals(0, LongStream.of(cache.getHitCount()).sum());
72  
73  		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
74  		InMemoryRepository r2 = new InMemoryRepository(repo);
75  		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
76  		try (ObjectReader rdr = r2.newObjectReader()) {
77  			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
78  			assertTrue(Arrays.equals(content, actual));
79  		}
80  		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
81  		assertEquals(oldSize, LongStream.of(cache.getCurrentSize()).sum());
82  	}
83  
84  	@SuppressWarnings("resource")
85  	@Test
86  	public void weirdBlockSize() throws Exception {
87  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
88  		InMemoryRepository r1 = new InMemoryRepository(repo);
89  
90  		byte[] content1 = rng.nextBytes(4);
91  		byte[] content2 = rng.nextBytes(424242);
92  		ObjectId id1;
93  		ObjectId id2;
94  		try (ObjectInserter ins = r1.newObjectInserter()) {
95  			id1 = ins.insert(OBJ_BLOB, content1);
96  			id2 = ins.insert(OBJ_BLOB, content2);
97  			ins.flush();
98  		}
99  
100 		resetCache();
101 		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
102 
103 		InMemoryRepository r2 = new InMemoryRepository(repo);
104 		r2.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
105 		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
106 		try (ObjectReader rdr = r2.newObjectReader()) {
107 			byte[] actual = rdr.open(id1, OBJ_BLOB).getBytes();
108 			assertTrue(Arrays.equals(content1, actual));
109 		}
110 
111 		InMemoryRepository r3 = new InMemoryRepository(repo);
112 		r3.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
113 		r3.getObjectDatabase().commitPack(packs, Collections.emptyList());
114 		try (ObjectReader rdr = r3.newObjectReader()) {
115 			byte[] actual = rdr.open(id2, OBJ_BLOB).getBytes();
116 			assertTrue(Arrays.equals(content2, actual));
117 		}
118 	}
119 
120 	@SuppressWarnings("resource")
121 	@Test
122 	public void hasCacheHotMap() throws Exception {
123 		Map<PackExt, Integer> cacheHotMap = new HashMap<>();
124 		// Pack index will be kept in cache longer.
125 		cacheHotMap.put(PackExt.INDEX, Integer.valueOf(3));
126 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
127 				.setBlockLimit(512 * 4).setCacheHotMap(cacheHotMap));
128 		cache = DfsBlockCache.getInstance();
129 
130 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
131 		InMemoryRepository r1 = new InMemoryRepository(repo);
132 		byte[] content = rng.nextBytes(424242);
133 		ObjectId id;
134 		try (ObjectInserter ins = r1.newObjectInserter()) {
135 			id = ins.insert(OBJ_BLOB, content);
136 			ins.flush();
137 		}
138 
139 		try (ObjectReader rdr = r1.newObjectReader()) {
140 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
141 			assertTrue(Arrays.equals(content, actual));
142 		}
143 		// All cache entries are hot and cache is at capacity.
144 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
145 		assertEquals(99, cache.getFillPercentage());
146 
147 		InMemoryRepository r2 = new InMemoryRepository(repo);
148 		content = rng.nextBytes(424242);
149 		try (ObjectInserter ins = r2.newObjectInserter()) {
150 			ins.insert(OBJ_BLOB, content);
151 			ins.flush();
152 		}
153 		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
154 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
155 		assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
156 	}
157 
158 	@SuppressWarnings("resource")
159 	@Test
160 	public void hasIndexEventConsumerOnlyLoaded() throws Exception {
161 		AtomicInteger loaded = new AtomicInteger();
162 		IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
163 			@Override
164 			public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
165 					long loadMicros, long bytes,
166 					Duration lastEvictionDuration) {
167 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
168 				assertTrue(cacheHit);
169 				assertTrue(lastEvictionDuration.isZero());
170 				loaded.incrementAndGet();
171 			}
172 		};
173 
174 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
175 				.setBlockLimit(512 * 4)
176 				.setIndexEventConsumer(indexEventConsumer));
177 		cache = DfsBlockCache.getInstance();
178 
179 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
180 		InMemoryRepository r1 = new InMemoryRepository(repo);
181 		byte[] content = rng.nextBytes(424242);
182 		ObjectId id;
183 		try (ObjectInserter ins = r1.newObjectInserter()) {
184 			id = ins.insert(OBJ_BLOB, content);
185 			ins.flush();
186 		}
187 
188 		try (ObjectReader rdr = r1.newObjectReader()) {
189 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
190 			assertTrue(Arrays.equals(content, actual));
191 		}
192 		// All cache entries are hot and cache is at capacity.
193 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
194 		assertEquals(99, cache.getFillPercentage());
195 
196 		InMemoryRepository r2 = new InMemoryRepository(repo);
197 		content = rng.nextBytes(424242);
198 		try (ObjectInserter ins = r2.newObjectInserter()) {
199 			ins.insert(OBJ_BLOB, content);
200 			ins.flush();
201 		}
202 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
203 		assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
204 		assertEquals(1, loaded.get());
205 	}
206 
207 	@SuppressWarnings("resource")
208 	@Test
209 	public void hasIndexEventConsumerLoadedAndEvicted() throws Exception {
210 		AtomicInteger loaded = new AtomicInteger();
211 		AtomicInteger evicted = new AtomicInteger();
212 		IndexEventConsumer indexEventConsumer = new IndexEventConsumer() {
213 			@Override
214 			public void acceptRequestedEvent(int packExtPos, boolean cacheHit,
215 					long loadMicros, long bytes,
216 					Duration lastEvictionDuration) {
217 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
218 				assertTrue(cacheHit);
219 				assertTrue(lastEvictionDuration.isZero());
220 				loaded.incrementAndGet();
221 			}
222 
223 			@Override
224 			public void acceptEvictedEvent(int packExtPos, long bytes,
225 					int totalCacheHitCount, Duration lastEvictionDuration) {
226 				assertEquals(PackExt.INDEX.getPosition(), packExtPos);
227 				assertTrue(totalCacheHitCount > 0);
228 				assertTrue(lastEvictionDuration.isZero());
229 				evicted.incrementAndGet();
230 			}
231 
232 			@Override
233 			public boolean shouldReportEvictedEvent() {
234 				return true;
235 			}
236 		};
237 
238 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
239 				.setBlockLimit(512 * 4)
240 				.setIndexEventConsumer(indexEventConsumer));
241 		cache = DfsBlockCache.getInstance();
242 
243 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
244 		InMemoryRepository r1 = new InMemoryRepository(repo);
245 		byte[] content = rng.nextBytes(424242);
246 		ObjectId id;
247 		try (ObjectInserter ins = r1.newObjectInserter()) {
248 			id = ins.insert(OBJ_BLOB, content);
249 			ins.flush();
250 		}
251 
252 		try (ObjectReader rdr = r1.newObjectReader()) {
253 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
254 			assertTrue(Arrays.equals(content, actual));
255 		}
256 		// All cache entries are hot and cache is at capacity.
257 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
258 		assertEquals(99, cache.getFillPercentage());
259 
260 		InMemoryRepository r2 = new InMemoryRepository(repo);
261 		content = rng.nextBytes(424242);
262 		try (ObjectInserter ins = r2.newObjectInserter()) {
263 			ins.insert(OBJ_BLOB, content);
264 			ins.flush();
265 		}
266 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
267 		assertEquals(1, cache.getEvictions()[PackExt.INDEX.getPosition()]);
268 		assertEquals(1, loaded.get());
269 		assertEquals(1, evicted.get());
270 	}
271 
272 	@Test
273 	public void noConcurrencySerializedReads_oneRepo() throws Exception {
274 		InMemoryRepository r1 = createRepoWithBitmap("test");
275 		// Reset cache with concurrency Level at 1 i.e. no concurrency.
276 		resetCache(1);
277 
278 		DfsReader reader = (DfsReader) r1.newObjectReader();
279 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
280 			// Only load non-garbage pack with bitmap.
281 			if (pack.isGarbage()) {
282 				continue;
283 			}
284 			asyncRun(() -> pack.getBitmapIndex(reader));
285 			asyncRun(() -> pack.getPackIndex(reader));
286 			asyncRun(() -> pack.getBitmapIndex(reader));
287 		}
288 		waitForExecutorPoolTermination();
289 
290 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
291 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
292 		assertEquals(1, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
293 	}
294 
295 	@SuppressWarnings("resource")
296 	@Test
297 	public void noConcurrencySerializedReads_twoRepos() throws Exception {
298 		InMemoryRepository r1 = createRepoWithBitmap("test1");
299 		InMemoryRepository r2 = createRepoWithBitmap("test2");
300 		resetCache(1);
301 
302 		DfsReader reader = (DfsReader) r1.newObjectReader();
303 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
304 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
305 		// Safety check that both repos have the same number of packs.
306 		assertEquals(r1Packs.length, r2Packs.length);
307 
308 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
309 			DfsPackFile pack1 = r1Packs[i];
310 			DfsPackFile pack2 = r2Packs[i];
311 			if (pack1.isGarbage() || pack2.isGarbage()) {
312 				continue;
313 			}
314 			asyncRun(() -> pack1.getBitmapIndex(reader));
315 			asyncRun(() -> pack2.getBitmapIndex(reader));
316 		}
317 
318 		waitForExecutorPoolTermination();
319 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
320 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
321 		assertEquals(2, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
322 	}
323 
324 	@SuppressWarnings("resource")
325 	@Test
326 	public void lowConcurrencyParallelReads_twoRepos() throws Exception {
327 		InMemoryRepository r1 = createRepoWithBitmap("test1");
328 		InMemoryRepository r2 = createRepoWithBitmap("test2");
329 		resetCache(2);
330 
331 		DfsReader reader = (DfsReader) r1.newObjectReader();
332 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
333 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
334 		// Safety check that both repos have the same number of packs.
335 		assertEquals(r1Packs.length, r2Packs.length);
336 
337 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
338 			DfsPackFile pack1 = r1Packs[i];
339 			DfsPackFile pack2 = r2Packs[i];
340 			if (pack1.isGarbage() || pack2.isGarbage()) {
341 				continue;
342 			}
343 			asyncRun(() -> pack1.getBitmapIndex(reader));
344 			asyncRun(() -> pack2.getBitmapIndex(reader));
345 		}
346 
347 		waitForExecutorPoolTermination();
348 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
349 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
350 		assertEquals(2, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
351 	}
352 
353 	@SuppressWarnings("resource")
354 	@Test
355 	public void lowConcurrencyParallelReads_twoReposAndIndex()
356 			throws Exception {
357 		InMemoryRepository r1 = createRepoWithBitmap("test1");
358 		InMemoryRepository r2 = createRepoWithBitmap("test2");
359 		resetCache(2);
360 
361 		DfsReader reader = (DfsReader) r1.newObjectReader();
362 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
363 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
364 		// Safety check that both repos have the same number of packs.
365 		assertEquals(r1Packs.length, r2Packs.length);
366 
367 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
368 			DfsPackFile pack1 = r1Packs[i];
369 			DfsPackFile pack2 = r2Packs[i];
370 			if (pack1.isGarbage() || pack2.isGarbage()) {
371 				continue;
372 			}
373 			asyncRun(() -> pack1.getBitmapIndex(reader));
374 			asyncRun(() -> pack1.getPackIndex(reader));
375 			asyncRun(() -> pack2.getBitmapIndex(reader));
376 		}
377 		waitForExecutorPoolTermination();
378 
379 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
380 		// Index is loaded once for each repo.
381 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
382 		assertEquals(2, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
383 	}
384 
385 	@Test
386 	public void highConcurrencyParallelReads_oneRepo() throws Exception {
387 		InMemoryRepository r1 = createRepoWithBitmap("test");
388 		resetCache();
389 
390 		DfsReader reader = (DfsReader) r1.newObjectReader();
391 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
392 			// Only load non-garbage pack with bitmap.
393 			if (pack.isGarbage()) {
394 				continue;
395 			}
396 			asyncRun(() -> pack.getBitmapIndex(reader));
397 			asyncRun(() -> pack.getPackIndex(reader));
398 			asyncRun(() -> pack.getBitmapIndex(reader));
399 		}
400 		waitForExecutorPoolTermination();
401 
402 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
403 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
404 		assertEquals(1, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
405 	}
406 
407 	@Test
408 	public void highConcurrencyParallelReads_oneRepoParallelReverseIndex()
409 			throws Exception {
410 		InMemoryRepository r1 = createRepoWithBitmap("test");
411 		resetCache();
412 
413 		DfsReader reader = (DfsReader) r1.newObjectReader();
414 		reader.getOptions().setLoadRevIndexInParallel(true);
415 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
416 			// Only load non-garbage pack with bitmap.
417 			if (pack.isGarbage()) {
418 				continue;
419 			}
420 			asyncRun(() -> pack.getBitmapIndex(reader));
421 			asyncRun(() -> pack.getPackIndex(reader));
422 			asyncRun(() -> pack.getBitmapIndex(reader));
423 		}
424 		waitForExecutorPoolTermination();
425 
426 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
427 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
428 		assertEquals(1, cache.getMissCount()[PackExt.REVERSE_INDEX.ordinal()]);
429 	}
430 
431 	private void resetCache() {
432 		resetCache(32);
433 	}
434 
435 	private void resetCache(int concurrencyLevel) {
436 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
437 				.setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20));
438 		cache = DfsBlockCache.getInstance();
439 	}
440 
441 	private InMemoryRepository createRepoWithBitmap(String repoName)
442 			throws Exception {
443 		DfsRepositoryDescription repoDesc = new DfsRepositoryDescription(
444 				repoName);
445 		InMemoryRepository repo = new InMemoryRepository(repoDesc);
446 		try (TestRepository<InMemoryRepository> repository = new TestRepository<>(
447 				repo)) {
448 			RevCommit commit = repository.branch("/refs/ref1" + repoName)
449 					.commit().add("blob1", "blob1" + repoName).create();
450 			repository.branch("/refs/ref2" + repoName).commit()
451 					.add("blob2", "blob2" + repoName).parent(commit).create();
452 		}
453 		new DfsGarbageCollector(repo).pack(null);
454 		return repo;
455 	}
456 
457 	private void asyncRun(Callable<?> call) {
458 		pool.execute(() -> {
459 			try {
460 				call.call();
461 			} catch (Exception e) {
462 				// Ignore.
463 			}
464 		});
465 	}
466 
467 	private void waitForExecutorPoolTermination() throws Exception {
468 		pool.shutdown();
469 		pool.awaitTermination(500, MILLISECONDS);
470 		assertTrue("Threads did not complete, likely due to a deadlock.",
471 				pool.isTerminated());
472 	}
473 }