Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / libcephfs / recordlock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2011 New Dream Network
7  *               2016 Red Hat
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #include <pthread.h>
17 #include "gtest/gtest.h"
18 #ifndef GTEST_IS_THREADSAFE
19 #error "!GTEST_IS_THREADSAFE"
20 #endif
21
22 #include "include/cephfs/libcephfs.h"
23 #include <errno.h>
24 #include <sys/fcntl.h>
25 #include <unistd.h>
26 #include <sys/file.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <dirent.h>
30 #include <sys/xattr.h>
31
32 #include <stdlib.h>
33 #include <semaphore.h>
34 #include <time.h>
35 #include <sys/mman.h>
36
37 #ifdef __linux__
38 #include <limits.h>
39 #endif
40
41 // Startup common: create and mount ceph fs
42 #define STARTUP_CEPH() do {                             \
43     ASSERT_EQ(0, ceph_create(&cmount, NULL));           \
44     ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL));    \
45     ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));    \
46     ASSERT_EQ(0, ceph_mount(cmount, NULL));             \
47   } while(0)
48
49 // Cleanup common: unmount and release ceph fs
50 #define CLEANUP_CEPH() do {                     \
51     ASSERT_EQ(0, ceph_unmount(cmount));         \
52     ASSERT_EQ(0, ceph_release(cmount));         \
53   } while(0)
54
55 static const mode_t fileMode = S_IRWXU | S_IRWXG | S_IRWXO;
56
57 // Default wait time for normal and "slow" operations
58 // (5" should be enough in case of network congestion)
59 static const long waitMs = 10;
60 static const long waitSlowMs = 5000;
61
62 // Get the absolute struct timespec reference from now + 'ms' milliseconds
63 static const struct timespec* abstime(struct timespec &ts, long ms) {
64   if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
65     abort();
66   }
67   ts.tv_nsec += ms * 1000000;
68   ts.tv_sec += ts.tv_nsec / 1000000000;
69   ts.tv_nsec %= 1000000000;
70   return &ts;
71 }
72
73 /* Basic locking */
74
75 TEST(LibCephFS, BasicRecordLocking) {
76   struct ceph_mount_info *cmount = NULL;
77   STARTUP_CEPH();
78
79   char c_file[1024];
80   sprintf(c_file, "recordlock_test_%d", getpid());
81   Fh *fh = NULL;
82   Inode *root = NULL, *inode = NULL;
83   struct ceph_statx stx;
84   int rc;
85   struct flock lock1, lock2;
86   UserPerm *perms = ceph_mount_perms(cmount);
87
88   // Get the root inode
89   rc = ceph_ll_lookup_root(cmount, &root);
90   ASSERT_EQ(rc, 0); 
91
92   // Get the inode and Fh corresponding to c_file
93   rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
94                       &inode, &fh, &stx, 0, 0, perms);
95   ASSERT_EQ(rc, 0); 
96
97   // write lock twice
98   lock1.l_type = F_WRLCK;
99   lock1.l_whence = SEEK_SET;
100   lock1.l_start = 0;
101   lock1.l_len = 1024;
102   lock1.l_pid = getpid();
103   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
104
105   lock2.l_type = F_WRLCK;
106   lock2.l_whence = SEEK_SET;
107   lock2.l_start = 0;
108   lock2.l_len = 1024;
109   lock2.l_pid = getpid();
110   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
111
112   // Now try a conflicting read lock
113   lock2.l_type = F_RDLCK;
114   lock2.l_whence = SEEK_SET;
115   lock2.l_start = 100;
116   lock2.l_len = 100;
117   lock2.l_pid = getpid();
118   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
119
120   // Now do a getlk
121   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
122   ASSERT_EQ(lock2.l_type, F_WRLCK);
123   ASSERT_EQ(lock2.l_start, 0);
124   ASSERT_EQ(lock2.l_len, 1024);
125   ASSERT_EQ(lock2.l_pid, getpid());
126
127   // Extend the range of the write lock
128   lock1.l_type = F_WRLCK;
129   lock1.l_whence = SEEK_SET;
130   lock1.l_start = 1024;
131   lock1.l_len = 1024;
132   lock1.l_pid = getpid();
133   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
134
135   // Now do a getlk
136   lock2.l_type = F_RDLCK;
137   lock2.l_whence = SEEK_SET;
138   lock2.l_start = 100;
139   lock2.l_len = 100;
140   lock2.l_pid = getpid();
141   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
142   ASSERT_EQ(lock2.l_type, F_WRLCK);
143   ASSERT_EQ(lock2.l_start, 0);
144   ASSERT_EQ(lock2.l_len, 2048);
145   ASSERT_EQ(lock2.l_pid, getpid());
146
147   // Now release part of the range
148   lock1.l_type = F_UNLCK;
149   lock1.l_whence = SEEK_SET;
150   lock1.l_start = 512;
151   lock1.l_len = 1024;
152   lock1.l_pid = getpid();
153   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
154
155   // Now do a getlk to check 1st part
156   lock2.l_type = F_RDLCK;
157   lock2.l_whence = SEEK_SET;
158   lock2.l_start = 100;
159   lock2.l_len = 100;
160   lock2.l_pid = getpid();
161   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
162   ASSERT_EQ(lock2.l_type, F_WRLCK);
163   ASSERT_EQ(lock2.l_start, 0);
164   ASSERT_EQ(lock2.l_len, 512);
165   ASSERT_EQ(lock2.l_pid, getpid());
166
167   // Now do a getlk to check 2nd part
168   lock2.l_type = F_RDLCK;
169   lock2.l_whence = SEEK_SET;
170   lock2.l_start = 2000;
171   lock2.l_len = 100;
172   lock2.l_pid = getpid();
173   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
174   ASSERT_EQ(lock2.l_type, F_WRLCK);
175   ASSERT_EQ(lock2.l_start, 1536);
176   ASSERT_EQ(lock2.l_len, 512);
177   ASSERT_EQ(lock2.l_pid, getpid());
178
179   // Now do a getlk to check released part
180   lock2.l_type = F_RDLCK;
181   lock2.l_whence = SEEK_SET;
182   lock2.l_start = 512;
183   lock2.l_len = 1024;
184   lock2.l_pid = getpid();
185   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
186   ASSERT_EQ(lock2.l_type, F_UNLCK);
187   ASSERT_EQ(lock2.l_start, 512);
188   ASSERT_EQ(lock2.l_len, 1024);
189   ASSERT_EQ(lock2.l_pid, getpid());
190
191   // Now downgrade the 1st part of the lock
192   lock1.l_type = F_RDLCK;
193   lock1.l_whence = SEEK_SET;
194   lock1.l_start = 0;
195   lock1.l_len = 512;
196   lock1.l_pid = getpid();
197   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
198
199   // Now do a getlk to check 1st part
200   lock2.l_type = F_WRLCK;
201   lock2.l_whence = SEEK_SET;
202   lock2.l_start = 100;
203   lock2.l_len = 100;
204   lock2.l_pid = getpid();
205   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
206   ASSERT_EQ(lock2.l_type, F_RDLCK);
207   ASSERT_EQ(lock2.l_start, 0);
208   ASSERT_EQ(lock2.l_len, 512);
209   ASSERT_EQ(lock2.l_pid, getpid());
210
211   // Now upgrade the 1st part of the lock
212   lock1.l_type = F_WRLCK;
213   lock1.l_whence = SEEK_SET;
214   lock1.l_start = 0;
215   lock1.l_len = 512;
216   lock1.l_pid = getpid();
217   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
218
219   // Now do a getlk to check 1st part
220   lock2.l_type = F_WRLCK;
221   lock2.l_whence = SEEK_SET;
222   lock2.l_start = 100;
223   lock2.l_len = 100;
224   lock2.l_pid = getpid();
225   ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
226   ASSERT_EQ(lock2.l_type, F_WRLCK);
227   ASSERT_EQ(lock2.l_start, 0);
228   ASSERT_EQ(lock2.l_len, 512);
229   ASSERT_EQ(lock2.l_pid, getpid());
230
231   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
232   ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
233   CLEANUP_CEPH();
234 }
235
236 /* Locking in different threads */
237
238 // Used by ConcurrentLocking test
239 struct str_ConcurrentRecordLocking {
240   const char *file;
241   struct ceph_mount_info *cmount;  // !NULL if shared
242   sem_t sem[2];
243   sem_t semReply[2];
244   void sem_init(int pshared) {
245     ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
246     ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
247     ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
248     ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
249   }
250   void sem_destroy() {
251     ASSERT_EQ(0, ::sem_destroy(&sem[0]));
252     ASSERT_EQ(0, ::sem_destroy(&sem[1]));
253     ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
254     ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
255   }
256 };
257
258 // Wakeup main (for (N) steps)
259 #define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
260 // Wait for main to wake us up (for (RN) steps)
261 #define WAIT_MAIN(n) \
262   ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
263
264 // Wakeup worker (for (RN) steps)
265 #define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
266 // Wait for worker to wake us up (for (N) steps)
267 #define WAIT_WORKER(n) \
268   ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
269 // Worker shall not wake us up (for (N) steps)
270 #define NOT_WAIT_WORKER(n) \
271   ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
272
273 // Do twice an operation
274 #define TWICE(EXPR) do {                        \
275     EXPR;                                       \
276     EXPR;                                       \
277   } while(0)
278
279 /* Locking in different threads */
280
281 // Used by ConcurrentLocking test
282 static void thread_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
283   struct ceph_mount_info *const cmount = s.cmount;
284   Fh *fh = NULL;
285   Inode *root = NULL, *inode = NULL;
286   struct ceph_statx stx;
287   struct flock lock1;
288   int rc;
289   struct timespec ts;
290
291   // Get the root inode
292   rc = ceph_ll_lookup_root(cmount, &root);
293   ASSERT_EQ(rc, 0); 
294
295   // Get the inode and Fh corresponding to c_file
296   rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
297                       &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
298   ASSERT_EQ(rc, 0); 
299
300   lock1.l_type = F_WRLCK;
301   lock1.l_whence = SEEK_SET;
302   lock1.l_start = 0;
303   lock1.l_len = 1024;
304   lock1.l_pid = getpid();
305   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
306
307   PING_MAIN(1); // (1)
308   lock1.l_type = F_WRLCK;
309   lock1.l_whence = SEEK_SET;
310   lock1.l_start = 0;
311   lock1.l_len = 1024;
312   lock1.l_pid = getpid();
313   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
314   PING_MAIN(2); // (2)
315
316   lock1.l_type = F_UNLCK;
317   lock1.l_whence = SEEK_SET;
318   lock1.l_start = 0;
319   lock1.l_len = 1024;
320   lock1.l_pid = getpid();
321   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
322   PING_MAIN(3); // (3)
323
324   lock1.l_type = F_RDLCK;
325   lock1.l_whence = SEEK_SET;
326   lock1.l_start = 0;
327   lock1.l_len = 1024;
328   lock1.l_pid = getpid();
329   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
330   PING_MAIN(4); // (4)
331
332   WAIT_MAIN(1); // (R1)
333   lock1.l_type = F_UNLCK;
334   lock1.l_whence = SEEK_SET;
335   lock1.l_start = 0;
336   lock1.l_len = 1024;
337   lock1.l_pid = getpid();
338   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
339   PING_MAIN(5); // (5)
340
341   WAIT_MAIN(2); // (R2)
342   lock1.l_type = F_WRLCK;
343   lock1.l_whence = SEEK_SET;
344   lock1.l_start = 0;
345   lock1.l_len = 1024;
346   lock1.l_pid = getpid();
347   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
348   PING_MAIN(6); // (6)
349
350   WAIT_MAIN(3); // (R3)
351   lock1.l_type = F_UNLCK;
352   lock1.l_whence = SEEK_SET;
353   lock1.l_start = 0;
354   lock1.l_len = 1024;
355   lock1.l_pid = getpid();
356   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
357   PING_MAIN(7); // (7)
358
359   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
360 }
361
362 // Used by ConcurrentRecordLocking test
363 static void* thread_ConcurrentRecordLocking_(void *arg) {
364   str_ConcurrentRecordLocking *const s =
365     reinterpret_cast<str_ConcurrentRecordLocking*>(arg);
366   thread_ConcurrentRecordLocking(*s);
367   return NULL;
368 }
369
370 TEST(LibCephFS, ConcurrentRecordLocking) {
371   const pid_t mypid = getpid();
372   struct ceph_mount_info *cmount;
373   STARTUP_CEPH();
374
375   char c_file[1024];
376   sprintf(c_file, "recordlock_test_%d", mypid);
377   Fh *fh = NULL;
378   Inode *root = NULL, *inode = NULL;
379   struct ceph_statx stx;
380   struct flock lock1;
381   int rc;
382   UserPerm *perms = ceph_mount_perms(cmount);
383
384   // Get the root inode
385   rc = ceph_ll_lookup_root(cmount, &root);
386   ASSERT_EQ(rc, 0); 
387
388   // Get the inode and Fh corresponding to c_file
389   rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
390                       &inode, &fh, &stx, 0, 0, perms);
391   ASSERT_EQ(rc, 0); 
392
393   // Lock
394   lock1.l_type = F_WRLCK;
395   lock1.l_whence = SEEK_SET;
396   lock1.l_start = 0;
397   lock1.l_len = 1024;
398   lock1.l_pid = getpid();
399   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
400
401   // Start locker thread
402   pthread_t thread;
403   struct timespec ts;
404   str_ConcurrentRecordLocking s = { c_file, cmount };
405   s.sem_init(0);
406   ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentRecordLocking_, &s));
407   // Synchronization point with thread (failure: thread is dead)
408   WAIT_WORKER(1); // (1)
409
410   // Shall not have lock immediately
411   NOT_WAIT_WORKER(2); // (2)
412
413   // Unlock
414   lock1.l_type = F_UNLCK;
415   lock1.l_whence = SEEK_SET;
416   lock1.l_start = 0;
417   lock1.l_len = 1024;
418   lock1.l_pid = getpid();
419   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
420
421   // Shall have lock
422   // Synchronization point with thread (failure: thread is dead)
423   WAIT_WORKER(2); // (2)
424
425   // Synchronization point with thread (failure: thread is dead)
426   WAIT_WORKER(3); // (3)
427
428   // Wait for thread to share lock
429   WAIT_WORKER(4); // (4)
430   lock1.l_type = F_WRLCK;
431   lock1.l_whence = SEEK_SET;
432   lock1.l_start = 0;
433   lock1.l_len = 1024;
434   lock1.l_pid = getpid();
435   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
436   lock1.l_type = F_RDLCK;
437   lock1.l_whence = SEEK_SET;
438   lock1.l_start = 0;
439   lock1.l_len = 1024;
440   lock1.l_pid = getpid();
441   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
442
443   // Wake up thread to unlock shared lock
444   PING_WORKER(1); // (R1)
445   WAIT_WORKER(5); // (5)
446
447   // Now we can lock exclusively
448   // Upgrade to exclusive lock (as per POSIX)
449   lock1.l_type = F_WRLCK;
450   lock1.l_whence = SEEK_SET;
451   lock1.l_start = 0;
452   lock1.l_len = 1024;
453   lock1.l_pid = getpid();
454   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
455
456   // Wake up thread to lock shared lock
457   PING_WORKER(2); // (R2)
458
459   // Shall not have lock immediately
460   NOT_WAIT_WORKER(6); // (6)
461
462   // Release lock ; thread will get it
463   lock1.l_type = F_UNLCK;
464   lock1.l_whence = SEEK_SET;
465   lock1.l_start = 0;
466   lock1.l_len = 1024;
467   lock1.l_pid = getpid();
468   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
469   WAIT_WORKER(6); // (6)
470
471   // We no longer have the lock
472   lock1.l_type = F_WRLCK;
473   lock1.l_whence = SEEK_SET;
474   lock1.l_start = 0;
475   lock1.l_len = 1024;
476   lock1.l_pid = getpid();
477   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
478   lock1.l_type = F_RDLCK;
479   lock1.l_whence = SEEK_SET;
480   lock1.l_start = 0;
481   lock1.l_len = 1024;
482   lock1.l_pid = getpid();
483   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
484
485   // Wake up thread to unlock exclusive lock
486   PING_WORKER(3); // (R3)
487   WAIT_WORKER(7); // (7)
488
489   // We can lock it again
490   lock1.l_type = F_WRLCK;
491   lock1.l_whence = SEEK_SET;
492   lock1.l_start = 0;
493   lock1.l_len = 1024;
494   lock1.l_pid = getpid();
495   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
496   lock1.l_type = F_UNLCK;
497   lock1.l_whence = SEEK_SET;
498   lock1.l_start = 0;
499   lock1.l_len = 1024;
500   lock1.l_pid = getpid();
501   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
502
503   // Cleanup
504   void *retval = (void*) (uintptr_t) -1;
505   ASSERT_EQ(0, pthread_join(thread, &retval));
506   ASSERT_EQ(NULL, retval);
507   s.sem_destroy();
508   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
509   ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
510   CLEANUP_CEPH();
511 }
512
513 TEST(LibCephFS, ThreesomeRecordLocking) {
514   const pid_t mypid = getpid();
515   struct ceph_mount_info *cmount;
516   STARTUP_CEPH();
517
518   char c_file[1024];
519   sprintf(c_file, "recordlock_test_%d", mypid);
520   Fh *fh = NULL;
521   Inode *root = NULL, *inode = NULL;
522   struct ceph_statx stx;
523   struct flock lock1;
524   int rc;
525   UserPerm *perms = ceph_mount_perms(cmount);
526
527   // Get the root inode
528   rc = ceph_ll_lookup_root(cmount, &root);
529   ASSERT_EQ(rc, 0); 
530
531   // Get the inode and Fh corresponding to c_file
532   rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
533                       &inode, &fh, &stx, 0, 0, perms);
534   ASSERT_EQ(rc, 0); 
535
536   // Lock
537   lock1.l_type = F_WRLCK;
538   lock1.l_whence = SEEK_SET;
539   lock1.l_start = 0;
540   lock1.l_len = 1024;
541   lock1.l_pid = getpid();
542   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
543
544   // Start locker thread
545   pthread_t thread[2];
546   struct timespec ts;
547   str_ConcurrentRecordLocking s = { c_file, cmount };
548   s.sem_init(0);
549   ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentRecordLocking_, &s));
550   ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentRecordLocking_, &s));
551   // Synchronization point with thread (failure: thread is dead)
552   TWICE(WAIT_WORKER(1)); // (1)
553
554   // Shall not have lock immediately
555   NOT_WAIT_WORKER(2); // (2)
556
557   // Unlock
558   lock1.l_type = F_UNLCK;
559   lock1.l_whence = SEEK_SET;
560   lock1.l_start = 0;
561   lock1.l_len = 1024;
562   lock1.l_pid = getpid();
563   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
564
565   // Shall have lock
566   TWICE(// Synchronization point with thread (failure: thread is dead)
567         WAIT_WORKER(2); // (2)
568         
569         // Synchronization point with thread (failure: thread is dead)
570         WAIT_WORKER(3)); // (3)
571   
572   // Wait for thread to share lock
573   TWICE(WAIT_WORKER(4)); // (4)
574   lock1.l_type = F_WRLCK;
575   lock1.l_whence = SEEK_SET;
576   lock1.l_start = 0;
577   lock1.l_len = 1024;
578   lock1.l_pid = getpid();
579   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
580   lock1.l_type = F_RDLCK;
581   lock1.l_whence = SEEK_SET;
582   lock1.l_start = 0;
583   lock1.l_len = 1024;
584   lock1.l_pid = getpid();
585   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
586
587   // Wake up thread to unlock shared lock
588   TWICE(PING_WORKER(1); // (R1)
589         WAIT_WORKER(5)); // (5)
590
591   // Now we can lock exclusively
592   // Upgrade to exclusive lock (as per POSIX)
593   lock1.l_type = F_WRLCK;
594   lock1.l_whence = SEEK_SET;
595   lock1.l_start = 0;
596   lock1.l_len = 1024;
597   lock1.l_pid = getpid();
598   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
599
600   TWICE(  // Wake up thread to lock shared lock
601         PING_WORKER(2); // (R2)
602         
603         // Shall not have lock immediately
604         NOT_WAIT_WORKER(6)); // (6)
605   
606   // Release lock ; thread will get it
607   lock1.l_type = F_UNLCK;
608   lock1.l_whence = SEEK_SET;
609   lock1.l_start = 0;
610   lock1.l_len = 1024;
611   lock1.l_pid = getpid();
612   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
613   TWICE(WAIT_WORKER(6); // (6)
614         
615         // We no longer have the lock
616         lock1.l_type = F_WRLCK;
617         lock1.l_whence = SEEK_SET;
618         lock1.l_start = 0;
619         lock1.l_len = 1024;
620         lock1.l_pid = getpid();
621         ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
622         lock1.l_type = F_RDLCK;
623         lock1.l_whence = SEEK_SET;
624         lock1.l_start = 0;
625         lock1.l_len = 1024;
626         lock1.l_pid = getpid();
627         ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
628         
629         // Wake up thread to unlock exclusive lock
630         PING_WORKER(3); // (R3)
631         WAIT_WORKER(7); // (7)
632         );
633   
634   // We can lock it again
635   lock1.l_type = F_WRLCK;
636   lock1.l_whence = SEEK_SET;
637   lock1.l_start = 0;
638   lock1.l_len = 1024;
639   lock1.l_pid = getpid();
640   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
641   lock1.l_type = F_UNLCK;
642   lock1.l_whence = SEEK_SET;
643   lock1.l_start = 0;
644   lock1.l_len = 1024;
645   lock1.l_pid = getpid();
646   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
647
648   // Cleanup
649   void *retval = (void*) (uintptr_t) -1;
650   ASSERT_EQ(0, pthread_join(thread[0], &retval));
651   ASSERT_EQ(NULL, retval);
652   ASSERT_EQ(0, pthread_join(thread[1], &retval));
653   ASSERT_EQ(NULL, retval);
654   s.sem_destroy();
655   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
656   ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
657   CLEANUP_CEPH();
658 }
659
660 /* Locking in different processes */
661
662 #define PROCESS_SLOW_MS() \
663   static const long waitMs = 100; \
664   (void) waitMs
665
666 // Used by ConcurrentLocking test
667 static void process_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
668   const pid_t mypid = getpid();
669   PROCESS_SLOW_MS();
670
671   struct ceph_mount_info *cmount = NULL;
672   struct timespec ts;
673   Fh *fh = NULL;
674   Inode *root = NULL, *inode = NULL;
675   struct ceph_statx stx;
676   int rc;
677   struct flock lock1;
678
679   STARTUP_CEPH();
680   s.cmount = cmount;
681
682   // Get the root inode
683   rc = ceph_ll_lookup_root(cmount, &root);
684   ASSERT_EQ(rc, 0); 
685
686   // Get the inode and Fh corresponding to c_file
687   rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
688                       &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
689   ASSERT_EQ(rc, 0); 
690
691   WAIT_MAIN(1); // (R1)
692
693   lock1.l_type = F_WRLCK;
694   lock1.l_whence = SEEK_SET;
695   lock1.l_start = 0;
696   lock1.l_len = 1024;
697   lock1.l_pid = getpid();
698   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
699   PING_MAIN(1); // (1)
700   lock1.l_type = F_WRLCK;
701   lock1.l_whence = SEEK_SET;
702   lock1.l_start = 0;
703   lock1.l_len = 1024;
704   lock1.l_pid = getpid();
705   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
706   PING_MAIN(2); // (2)
707
708   lock1.l_type = F_UNLCK;
709   lock1.l_whence = SEEK_SET;
710   lock1.l_start = 0;
711   lock1.l_len = 1024;
712   lock1.l_pid = getpid();
713   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
714   PING_MAIN(3); // (3)
715
716   lock1.l_type = F_RDLCK;
717   lock1.l_whence = SEEK_SET;
718   lock1.l_start = 0;
719   lock1.l_len = 1024;
720   lock1.l_pid = getpid();
721   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
722   PING_MAIN(4); // (4)
723
724   WAIT_MAIN(2); // (R2)
725   lock1.l_type = F_UNLCK;
726   lock1.l_whence = SEEK_SET;
727   lock1.l_start = 0;
728   lock1.l_len = 1024;
729   lock1.l_pid = getpid();
730   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
731   PING_MAIN(5); // (5)
732
733   WAIT_MAIN(3); // (R3)
734   lock1.l_type = F_WRLCK;
735   lock1.l_whence = SEEK_SET;
736   lock1.l_start = 0;
737   lock1.l_len = 1024;
738   lock1.l_pid = getpid();
739   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
740   PING_MAIN(6); // (6)
741
742   WAIT_MAIN(4); // (R4)
743   lock1.l_type = F_UNLCK;
744   lock1.l_whence = SEEK_SET;
745   lock1.l_start = 0;
746   lock1.l_len = 1024;
747   lock1.l_pid = getpid();
748   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
749   PING_MAIN(7); // (7)
750
751   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
752   CLEANUP_CEPH();
753
754   s.sem_destroy();
755   exit(EXIT_SUCCESS);
756 }
757
758 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
759 TEST(LibCephFS, DISABLED_InterProcessRecordLocking) {
760   PROCESS_SLOW_MS();
761   // Process synchronization
762   char c_file[1024];
763   const pid_t mypid = getpid();
764   sprintf(c_file, "recordlock_test_%d", mypid);
765   Fh *fh = NULL;
766   Inode *root = NULL, *inode = NULL;
767   struct ceph_statx stx;
768   struct flock lock1;
769   int rc;
770
771   // Note: the semaphores MUST be on a shared memory segment
772   str_ConcurrentRecordLocking *const shs =
773     reinterpret_cast<str_ConcurrentRecordLocking*>
774     (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
775           -1, 0));
776   str_ConcurrentRecordLocking &s = *shs;
777   s.file = c_file;
778   s.sem_init(1);
779
780   // Start locker process
781   const pid_t pid = fork();
782   ASSERT_GE(pid, 0);
783   if (pid == 0) {
784     process_ConcurrentRecordLocking(s);
785     exit(EXIT_FAILURE);
786   }
787
788   struct timespec ts;
789   struct ceph_mount_info *cmount;
790   STARTUP_CEPH();
791   UserPerm *perms = ceph_mount_perms(cmount);
792
793   // Get the root inode
794   rc = ceph_ll_lookup_root(cmount, &root);
795   ASSERT_EQ(rc, 0); 
796
797   // Get the inode and Fh corresponding to c_file
798   rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
799                       &inode, &fh, &stx, 0, 0, perms);
800   ASSERT_EQ(rc, 0); 
801
802   // Lock
803   lock1.l_type = F_WRLCK;
804   lock1.l_whence = SEEK_SET;
805   lock1.l_start = 0;
806   lock1.l_len = 1024;
807   lock1.l_pid = getpid();
808   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
809
810   // Synchronization point with process (failure: process is dead)
811   PING_WORKER(1); // (R1)
812   WAIT_WORKER(1); // (1)
813
814   // Shall not have lock immediately
815   NOT_WAIT_WORKER(2); // (2)
816
817   // Unlock
818   lock1.l_type = F_UNLCK;
819   lock1.l_whence = SEEK_SET;
820   lock1.l_start = 0;
821   lock1.l_len = 1024;
822   lock1.l_pid = getpid();
823   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
824
825   // Shall have lock
826   // Synchronization point with process (failure: process is dead)
827   WAIT_WORKER(2); // (2)
828
829   // Synchronization point with process (failure: process is dead)
830   WAIT_WORKER(3); // (3)
831
832   // Wait for process to share lock
833   WAIT_WORKER(4); // (4)
834   lock1.l_type = F_WRLCK;
835   lock1.l_whence = SEEK_SET;
836   lock1.l_start = 0;
837   lock1.l_len = 1024;
838   lock1.l_pid = getpid();
839   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
840   lock1.l_type = F_RDLCK;
841   lock1.l_whence = SEEK_SET;
842   lock1.l_start = 0;
843   lock1.l_len = 1024;
844   lock1.l_pid = getpid();
845   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
846
847   // Wake up process to unlock shared lock
848   PING_WORKER(2); // (R2)
849   WAIT_WORKER(5); // (5)
850
851   // Now we can lock exclusively
852   // Upgrade to exclusive lock (as per POSIX)
853   lock1.l_type = F_WRLCK;
854   lock1.l_whence = SEEK_SET;
855   lock1.l_start = 0;
856   lock1.l_len = 1024;
857   lock1.l_pid = getpid();
858   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
859
860   // Wake up process to lock shared lock
861   PING_WORKER(3); // (R3)
862
863   // Shall not have lock immediately
864   NOT_WAIT_WORKER(6); // (6)
865
866   // Release lock ; process will get it
867   lock1.l_type = F_UNLCK;
868   lock1.l_whence = SEEK_SET;
869   lock1.l_start = 0;
870   lock1.l_len = 1024;
871   lock1.l_pid = getpid();
872   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
873   WAIT_WORKER(6); // (6)
874
875   // We no longer have the lock
876   lock1.l_type = F_WRLCK;
877   lock1.l_whence = SEEK_SET;
878   lock1.l_start = 0;
879   lock1.l_len = 1024;
880   lock1.l_pid = getpid();
881   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
882   lock1.l_type = F_RDLCK;
883   lock1.l_whence = SEEK_SET;
884   lock1.l_start = 0;
885   lock1.l_len = 1024;
886   lock1.l_pid = getpid();
887   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
888
889   // Wake up process to unlock exclusive lock
890   PING_WORKER(4); // (R4)
891   WAIT_WORKER(7); // (7)
892
893   // We can lock it again
894   lock1.l_type = F_WRLCK;
895   lock1.l_whence = SEEK_SET;
896   lock1.l_start = 0;
897   lock1.l_len = 1024;
898   lock1.l_pid = getpid();
899   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
900   lock1.l_type = F_UNLCK;
901   lock1.l_whence = SEEK_SET;
902   lock1.l_start = 0;
903   lock1.l_len = 1024;
904   lock1.l_pid = getpid();
905   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
906
907   // Wait pid
908   int status;
909   ASSERT_EQ(pid, waitpid(pid, &status, 0));
910   ASSERT_EQ(EXIT_SUCCESS, status);
911
912   // Cleanup
913   s.sem_destroy();
914   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
915   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
916   ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
917   CLEANUP_CEPH();
918 }
919
920 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
921 TEST(LibCephFS, DISABLED_ThreesomeInterProcessRecordLocking) {
922   PROCESS_SLOW_MS();
923   // Process synchronization
924   char c_file[1024];
925   const pid_t mypid = getpid();
926   sprintf(c_file, "recordlock_test_%d", mypid);
927   Fh *fh = NULL;
928   Inode *root = NULL, *inode = NULL;
929   struct ceph_statx stx;
930   struct flock lock1;
931   int rc;
932
933   // Note: the semaphores MUST be on a shared memory segment
934   str_ConcurrentRecordLocking *const shs =
935     reinterpret_cast<str_ConcurrentRecordLocking*>
936     (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
937           -1, 0));
938   str_ConcurrentRecordLocking &s = *shs;
939   s.file = c_file;
940   s.sem_init(1);
941
942   // Start locker processes
943   pid_t pid[2];
944   pid[0] = fork();
945   ASSERT_GE(pid[0], 0);
946   if (pid[0] == 0) {
947     process_ConcurrentRecordLocking(s);
948     exit(EXIT_FAILURE);
949   }
950   pid[1] = fork();
951   ASSERT_GE(pid[1], 0);
952   if (pid[1] == 0) {
953     process_ConcurrentRecordLocking(s);
954     exit(EXIT_FAILURE);
955   }
956
957   struct timespec ts;
958   struct ceph_mount_info *cmount;
959   STARTUP_CEPH();
960
961   // Get the root inode
962   rc = ceph_ll_lookup_root(cmount, &root);
963   ASSERT_EQ(rc, 0); 
964
965   // Get the inode and Fh corresponding to c_file
966   UserPerm *perms = ceph_mount_perms(cmount);
967   rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
968                       &inode, &fh, &stx, 0, 0, perms);
969   ASSERT_EQ(rc, 0); 
970
971   // Lock
972   lock1.l_type = F_WRLCK;
973   lock1.l_whence = SEEK_SET;
974   lock1.l_start = 0;
975   lock1.l_len = 1024;
976   lock1.l_pid = getpid();
977   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
978
979   // Synchronization point with process (failure: process is dead)
980   TWICE(PING_WORKER(1)); // (R1)
981   TWICE(WAIT_WORKER(1)); // (1)
982
983   // Shall not have lock immediately
984   NOT_WAIT_WORKER(2); // (2)
985
986   // Unlock
987   lock1.l_type = F_UNLCK;
988   lock1.l_whence = SEEK_SET;
989   lock1.l_start = 0;
990   lock1.l_len = 1024;
991   lock1.l_pid = getpid();
992   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
993
994   // Shall have lock
995   TWICE(// Synchronization point with process (failure: process is dead)
996         WAIT_WORKER(2); // (2)
997         
998         // Synchronization point with process (failure: process is dead)
999         WAIT_WORKER(3)); // (3)
1000   
1001   // Wait for process to share lock
1002   TWICE(WAIT_WORKER(4)); // (4)
1003   lock1.l_type = F_WRLCK;
1004   lock1.l_whence = SEEK_SET;
1005   lock1.l_start = 0;
1006   lock1.l_len = 1024;
1007   lock1.l_pid = getpid();
1008   ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1009   lock1.l_type = F_RDLCK;
1010   lock1.l_whence = SEEK_SET;
1011   lock1.l_start = 0;
1012   lock1.l_len = 1024;
1013   lock1.l_pid = getpid();
1014   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1015
1016   // Wake up process to unlock shared lock
1017   TWICE(PING_WORKER(2); // (R2)
1018         WAIT_WORKER(5)); // (5)
1019
1020   // Now we can lock exclusively
1021   // Upgrade to exclusive lock (as per POSIX)
1022   lock1.l_type = F_WRLCK;
1023   lock1.l_whence = SEEK_SET;
1024   lock1.l_start = 0;
1025   lock1.l_len = 1024;
1026   lock1.l_pid = getpid();
1027   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
1028
1029   TWICE(  // Wake up process to lock shared lock
1030         PING_WORKER(3); // (R3)
1031         
1032         // Shall not have lock immediately
1033         NOT_WAIT_WORKER(6)); // (6)
1034   
1035   // Release lock ; process will get it
1036   lock1.l_type = F_UNLCK;
1037   lock1.l_whence = SEEK_SET;
1038   lock1.l_start = 0;
1039   lock1.l_len = 1024;
1040   lock1.l_pid = getpid();
1041   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1042   TWICE(WAIT_WORKER(6); // (6)
1043         
1044         // We no longer have the lock
1045         lock1.l_type = F_WRLCK;
1046         lock1.l_whence = SEEK_SET;
1047         lock1.l_start = 0;
1048         lock1.l_len = 1024;
1049         lock1.l_pid = getpid();
1050         ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
1051         lock1.l_type = F_RDLCK;
1052         lock1.l_whence = SEEK_SET;
1053         lock1.l_start = 0;
1054         lock1.l_len = 1024;
1055         lock1.l_pid = getpid();
1056         ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
1057         
1058         // Wake up process to unlock exclusive lock
1059         PING_WORKER(4); // (R4)
1060         WAIT_WORKER(7); // (7)
1061         );
1062   
1063   // We can lock it again
1064   lock1.l_type = F_WRLCK;
1065   lock1.l_whence = SEEK_SET;
1066   lock1.l_start = 0;
1067   lock1.l_len = 1024;
1068   lock1.l_pid = getpid();
1069   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1070   lock1.l_type = F_UNLCK;
1071   lock1.l_whence = SEEK_SET;
1072   lock1.l_start = 0;
1073   lock1.l_len = 1024;
1074   lock1.l_pid = getpid();
1075   ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1076
1077   // Wait pids
1078   int status;
1079   ASSERT_EQ(pid[0], waitpid(pid[0], &status, 0));
1080   ASSERT_EQ(EXIT_SUCCESS, status);
1081   ASSERT_EQ(pid[1], waitpid(pid[1], &status, 0));
1082   ASSERT_EQ(EXIT_SUCCESS, status);
1083
1084   // Cleanup
1085   s.sem_destroy();
1086   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
1087   ASSERT_EQ(0, ceph_ll_close(cmount, fh));
1088   ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
1089   CLEANUP_CEPH();
1090 }