Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / libcephfs / flock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2011 New Dream Network
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <pthread.h>
16 #include "gtest/gtest.h"
17 #ifndef GTEST_IS_THREADSAFE
18 #error "!GTEST_IS_THREADSAFE"
19 #endif
20
21 #include "include/cephfs/libcephfs.h"
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25 #include <sys/file.h>
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <dirent.h>
29 #include <sys/xattr.h>
30
31 #include <stdlib.h>
32 #include <semaphore.h>
33 #include <time.h>
34 #include <sys/mman.h>
35
36 #ifdef __linux__
37 #include <limits.h>
38 #endif
39
40 // Startup common: create and mount ceph fs
41 #define STARTUP_CEPH() do {                             \
42     ASSERT_EQ(0, ceph_create(&cmount, NULL));           \
43     ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL));    \
44     ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));    \
45     ASSERT_EQ(0, ceph_mount(cmount, NULL));             \
46   } while(0)
47
48 // Cleanup common: unmount and release ceph fs
49 #define CLEANUP_CEPH() do {                     \
50     ASSERT_EQ(0, ceph_unmount(cmount));         \
51     ASSERT_EQ(0, ceph_release(cmount));         \
52   } while(0)
53
54 static const mode_t fileMode = S_IRWXU | S_IRWXG | S_IRWXO;
55
56 // Default wait time for normal and "slow" operations
57 // (5" should be enough in case of network congestion)
58 static const long waitMs = 10;
59 static const long waitSlowMs = 5000;
60
61 // Get the absolute struct timespec reference from now + 'ms' milliseconds
62 static const struct timespec* abstime(struct timespec &ts, long ms) {
63   if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
64     abort();
65   }
66   ts.tv_nsec += ms * 1000000;
67   ts.tv_sec += ts.tv_nsec / 1000000000;
68   ts.tv_nsec %= 1000000000;
69   return &ts;
70 }
71
72 /* Basic locking */
73 TEST(LibCephFS, BasicLocking) {
74   struct ceph_mount_info *cmount = NULL;
75   STARTUP_CEPH();
76
77   char c_file[1024];
78   sprintf(c_file, "/flock_test_%d", getpid());
79   const int fd = ceph_open(cmount, c_file, O_RDWR | O_CREAT, fileMode);
80   ASSERT_GE(fd, 0); 
81
82   // Lock exclusively twice
83   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, 42));
84   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 43));
85   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 44));
86   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 42));
87
88   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 43));
89   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 44));
90   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 43));
91
92   // Lock shared three times
93   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, 42));
94   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, 43));
95   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, 44));
96   // And then attempt to lock exclusively
97   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 45));
98   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 42));
99   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 45));
100   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 44));
101   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 45));
102   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 43));
103   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, 45));
104   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, 42));
105   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 45));
106
107   // Lock shared with upgrade to exclusive (POSIX) 
108   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, 42));
109   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, 42));
110   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 42));
111
112   // Lock exclusive with downgrade to shared (POSIX) 
113   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, 42));
114   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, 42));
115   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, 42));
116
117   ASSERT_EQ(0, ceph_close(cmount, fd));
118   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
119   CLEANUP_CEPH();
120 }
121
122 /* Locking in different threads */
123
124 // Used by ConcurrentLocking test
125 struct str_ConcurrentLocking {
126   const char *file;
127   struct ceph_mount_info *cmount;  // !NULL if shared
128   sem_t sem[2];
129   sem_t semReply[2];
130   void sem_init(int pshared) {
131     ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
132     ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
133     ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
134     ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
135   }
136   void sem_destroy() {
137     ASSERT_EQ(0, ::sem_destroy(&sem[0]));
138     ASSERT_EQ(0, ::sem_destroy(&sem[1]));
139     ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
140     ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
141   }
142 };
143
144 // Wakeup main (for (N) steps)
145 #define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
146 // Wait for main to wake us up (for (RN) steps)
147 #define WAIT_MAIN(n) \
148   ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
149
150 // Wakeup worker (for (RN) steps)
151 #define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
152 // Wait for worker to wake us up (for (N) steps)
153 #define WAIT_WORKER(n) \
154   ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
155 // Worker shall not wake us up (for (N) steps)
156 #define NOT_WAIT_WORKER(n) \
157   ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
158
159 // Do twice an operation
160 #define TWICE(EXPR) do {                        \
161     EXPR;                                       \
162     EXPR;                                       \
163   } while(0)
164
165 /* Locking in different threads */
166
167 // Used by ConcurrentLocking test
168 static void thread_ConcurrentLocking(str_ConcurrentLocking& s) {
169   struct ceph_mount_info *const cmount = s.cmount;
170   struct timespec ts;
171
172   const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode);
173   ASSERT_GE(fd, 0); 
174
175   ASSERT_EQ(-EWOULDBLOCK,
176             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
177   PING_MAIN(1); // (1)
178   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
179   PING_MAIN(2); // (2)
180
181   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
182   PING_MAIN(3); // (3)
183
184   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, pthread_self()));
185   PING_MAIN(4); // (4)
186
187   WAIT_MAIN(1); // (R1)
188   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
189   PING_MAIN(5); // (5)
190
191   WAIT_MAIN(2); // (R2)
192   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
193   PING_MAIN(6); // (6)
194
195   WAIT_MAIN(3); // (R3)
196   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
197   PING_MAIN(7); // (7)
198 }
199
200 // Used by ConcurrentLocking test
201 static void* thread_ConcurrentLocking_(void *arg) {
202   str_ConcurrentLocking *const s =
203     reinterpret_cast<str_ConcurrentLocking*>(arg);
204   thread_ConcurrentLocking(*s);
205   return NULL;
206 }
207
208 TEST(LibCephFS, ConcurrentLocking) {
209   const pid_t mypid = getpid();
210   struct ceph_mount_info *cmount;
211   STARTUP_CEPH();
212
213   char c_file[1024];
214   sprintf(c_file, "/flock_test_%d", mypid);
215   const int fd = ceph_open(cmount, c_file, O_RDWR | O_CREAT, fileMode);
216   ASSERT_GE(fd, 0); 
217
218   // Lock
219   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
220
221   // Start locker thread
222   pthread_t thread;
223   struct timespec ts;
224   str_ConcurrentLocking s = { c_file, cmount };
225   s.sem_init(0);
226   ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentLocking_, &s));
227   // Synchronization point with thread (failure: thread is dead)
228   WAIT_WORKER(1); // (1)
229
230   // Shall not have lock immediately
231   NOT_WAIT_WORKER(2); // (2)
232
233   // Unlock
234   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
235
236   // Shall have lock
237   // Synchronization point with thread (failure: thread is dead)
238   WAIT_WORKER(2); // (2)
239
240   // Synchronization point with thread (failure: thread is dead)
241   WAIT_WORKER(3); // (3)
242
243   // Wait for thread to share lock
244   WAIT_WORKER(4); // (4)
245   ASSERT_EQ(-EWOULDBLOCK,
246             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
247   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
248
249   // Wake up thread to unlock shared lock
250   PING_WORKER(1); // (R1)
251   WAIT_WORKER(5); // (5)
252
253   // Now we can lock exclusively
254   // Upgrade to exclusive lock (as per POSIX)
255   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
256
257   // Wake up thread to lock shared lock
258   PING_WORKER(2); // (R2)
259
260   // Shall not have lock immediately
261   NOT_WAIT_WORKER(6); // (6)
262
263   // Release lock ; thread will get it
264   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
265   WAIT_WORKER(6); // (6)
266
267   // We no longer have the lock
268   ASSERT_EQ(-EWOULDBLOCK,
269             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
270   ASSERT_EQ(-EWOULDBLOCK,
271             ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
272
273   // Wake up thread to unlock exclusive lock
274   PING_WORKER(3); // (R3)
275   WAIT_WORKER(7); // (7)
276
277   // We can lock it again
278   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
279   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
280
281   // Cleanup
282   void *retval = (void*) (uintptr_t) -1;
283   ASSERT_EQ(0, pthread_join(thread, &retval));
284   ASSERT_EQ(NULL, retval);
285   s.sem_destroy();
286   ASSERT_EQ(0, ceph_close(cmount, fd));
287   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
288   CLEANUP_CEPH();
289 }
290
291 TEST(LibCephFS, ThreesomeLocking) {
292   const pid_t mypid = getpid();
293   struct ceph_mount_info *cmount;
294   STARTUP_CEPH();
295
296   char c_file[1024];
297   sprintf(c_file, "/flock_test_%d", mypid);
298   const int fd = ceph_open(cmount, c_file, O_RDWR | O_CREAT, fileMode);
299   ASSERT_GE(fd, 0); 
300
301   // Lock
302   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
303
304   // Start locker thread
305   pthread_t thread[2];
306   struct timespec ts;
307   str_ConcurrentLocking s = { c_file, cmount };
308   s.sem_init(0);
309   ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentLocking_, &s));
310   ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentLocking_, &s));
311   // Synchronization point with thread (failure: thread is dead)
312   TWICE(WAIT_WORKER(1)); // (1)
313
314   // Shall not have lock immediately
315   NOT_WAIT_WORKER(2); // (2)
316
317   // Unlock
318   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
319
320   // Shall have lock
321   TWICE(// Synchronization point with thread (failure: thread is dead)
322         WAIT_WORKER(2); // (2)
323         
324         // Synchronization point with thread (failure: thread is dead)
325         WAIT_WORKER(3)); // (3)
326   
327   // Wait for thread to share lock
328   TWICE(WAIT_WORKER(4)); // (4)
329   ASSERT_EQ(-EWOULDBLOCK,
330             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
331   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
332
333   // Wake up thread to unlock shared lock
334   TWICE(PING_WORKER(1); // (R1)
335         WAIT_WORKER(5)); // (5)
336
337   // Now we can lock exclusively
338   // Upgrade to exclusive lock (as per POSIX)
339   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
340
341   TWICE(  // Wake up thread to lock shared lock
342         PING_WORKER(2); // (R2)
343         
344         // Shall not have lock immediately
345         NOT_WAIT_WORKER(6)); // (6)
346   
347   // Release lock ; thread will get it
348   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
349   TWICE(WAIT_WORKER(6); // (6)
350         
351         // We no longer have the lock
352         ASSERT_EQ(-EWOULDBLOCK,
353                   ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
354         ASSERT_EQ(-EWOULDBLOCK,
355                   ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
356         
357         // Wake up thread to unlock exclusive lock
358         PING_WORKER(3); // (R3)
359         WAIT_WORKER(7); // (7)
360         );
361   
362   // We can lock it again
363   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
364   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
365
366   // Cleanup
367   void *retval = (void*) (uintptr_t) -1;
368   ASSERT_EQ(0, pthread_join(thread[0], &retval));
369   ASSERT_EQ(NULL, retval);
370   ASSERT_EQ(0, pthread_join(thread[1], &retval));
371   ASSERT_EQ(NULL, retval);
372   s.sem_destroy();
373   ASSERT_EQ(0, ceph_close(cmount, fd));
374   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
375   CLEANUP_CEPH();
376 }
377
378 /* Locking in different processes */
379
380 #define PROCESS_SLOW_MS() \
381   static const long waitMs = 100; \
382   (void) waitMs
383
384 // Used by ConcurrentLocking test
385 static void process_ConcurrentLocking(str_ConcurrentLocking& s) {
386   const pid_t mypid = getpid();
387   PROCESS_SLOW_MS();
388
389   struct ceph_mount_info *cmount = NULL;
390   struct timespec ts;
391
392   STARTUP_CEPH();
393   s.cmount = cmount;
394
395   const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode);
396   ASSERT_GE(fd, 0); 
397   WAIT_MAIN(1); // (R1)
398
399   ASSERT_EQ(-EWOULDBLOCK,
400             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
401   PING_MAIN(1); // (1)
402   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
403   PING_MAIN(2); // (2)
404
405   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
406   PING_MAIN(3); // (3)
407
408   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, mypid));
409   PING_MAIN(4); // (4)
410
411   WAIT_MAIN(2); // (R2)
412   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
413   PING_MAIN(5); // (5)
414
415   WAIT_MAIN(3); // (R3)
416   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
417   PING_MAIN(6); // (6)
418
419   WAIT_MAIN(4); // (R4)
420   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
421   PING_MAIN(7); // (7)
422
423   CLEANUP_CEPH();
424
425   s.sem_destroy();
426   exit(EXIT_SUCCESS);
427 }
428
429 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
430 TEST(LibCephFS, DISABLED_InterProcessLocking) {
431   PROCESS_SLOW_MS();
432   // Process synchronization
433   char c_file[1024];
434   const pid_t mypid = getpid();
435   sprintf(c_file, "/flock_test_%d", mypid);
436
437   // Note: the semaphores MUST be on a shared memory segment
438   str_ConcurrentLocking *const shs =
439     reinterpret_cast<str_ConcurrentLocking*>
440     (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
441           -1, 0));
442   str_ConcurrentLocking &s = *shs;
443   s.file = c_file;
444   s.sem_init(1);
445
446   // Start locker process
447   const pid_t pid = fork();
448   ASSERT_GE(pid, 0);
449   if (pid == 0) {
450     process_ConcurrentLocking(s);
451     exit(EXIT_FAILURE);
452   }
453
454   struct timespec ts;
455   struct ceph_mount_info *cmount;
456   STARTUP_CEPH();
457
458   const int fd = ceph_open(cmount, c_file, O_RDWR | O_CREAT, fileMode);
459   ASSERT_GE(fd, 0); 
460
461   // Lock
462   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
463
464   // Synchronization point with process (failure: process is dead)
465   PING_WORKER(1); // (R1)
466   WAIT_WORKER(1); // (1)
467
468   // Shall not have lock immediately
469   NOT_WAIT_WORKER(2); // (2)
470
471   // Unlock
472   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
473
474   // Shall have lock
475   // Synchronization point with process (failure: process is dead)
476   WAIT_WORKER(2); // (2)
477
478   // Synchronization point with process (failure: process is dead)
479   WAIT_WORKER(3); // (3)
480
481   // Wait for process to share lock
482   WAIT_WORKER(4); // (4)
483   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
484   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
485
486   // Wake up process to unlock shared lock
487   PING_WORKER(2); // (R2)
488   WAIT_WORKER(5); // (5)
489
490   // Now we can lock exclusively
491   // Upgrade to exclusive lock (as per POSIX)
492   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
493
494   // Wake up process to lock shared lock
495   PING_WORKER(3); // (R3)
496
497   // Shall not have lock immediately
498   NOT_WAIT_WORKER(6); // (6)
499
500   // Release lock ; process will get it
501   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
502   WAIT_WORKER(6); // (6)
503
504   // We no longer have the lock
505   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
506   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
507
508   // Wake up process to unlock exclusive lock
509   PING_WORKER(4); // (R4)
510   WAIT_WORKER(7); // (7)
511
512   // We can lock it again
513   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
514   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
515
516   // Wait pid
517   int status;
518   ASSERT_EQ(pid, waitpid(pid, &status, 0));
519   ASSERT_EQ(EXIT_SUCCESS, status);
520
521   // Cleanup
522   s.sem_destroy();
523   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
524   ASSERT_EQ(0, ceph_close(cmount, fd));
525   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
526   CLEANUP_CEPH();
527 }
528
529 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
530 TEST(LibCephFS, DISABLED_ThreesomeInterProcessLocking) {
531   PROCESS_SLOW_MS();
532   // Process synchronization
533   char c_file[1024];
534   const pid_t mypid = getpid();
535   sprintf(c_file, "/flock_test_%d", mypid);
536
537   // Note: the semaphores MUST be on a shared memory segment
538   str_ConcurrentLocking *const shs =
539     reinterpret_cast<str_ConcurrentLocking*>
540     (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
541           -1, 0));
542   str_ConcurrentLocking &s = *shs;
543   s.file = c_file;
544   s.sem_init(1);
545
546   // Start locker processes
547   pid_t pid[2];
548   pid[0] = fork();
549   ASSERT_GE(pid[0], 0);
550   if (pid[0] == 0) {
551     process_ConcurrentLocking(s);
552     exit(EXIT_FAILURE);
553   }
554   pid[1] = fork();
555   ASSERT_GE(pid[1], 0);
556   if (pid[1] == 0) {
557     process_ConcurrentLocking(s);
558     exit(EXIT_FAILURE);
559   }
560
561   struct timespec ts;
562   struct ceph_mount_info *cmount;
563   STARTUP_CEPH();
564
565   const int fd = ceph_open(cmount, c_file, O_RDWR | O_CREAT, fileMode);
566   ASSERT_GE(fd, 0); 
567
568   // Lock
569   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
570
571   // Synchronization point with process (failure: process is dead)
572   TWICE(PING_WORKER(1)); // (R1)
573   TWICE(WAIT_WORKER(1)); // (1)
574
575   // Shall not have lock immediately
576   NOT_WAIT_WORKER(2); // (2)
577
578   // Unlock
579   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
580
581   // Shall have lock
582   TWICE(// Synchronization point with process (failure: process is dead)
583         WAIT_WORKER(2); // (2)
584         
585         // Synchronization point with process (failure: process is dead)
586         WAIT_WORKER(3)); // (3)
587   
588   // Wait for process to share lock
589   TWICE(WAIT_WORKER(4)); // (4)
590   ASSERT_EQ(-EWOULDBLOCK,
591             ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
592   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
593
594   // Wake up process to unlock shared lock
595   TWICE(PING_WORKER(2); // (R2)
596         WAIT_WORKER(5)); // (5)
597
598   // Now we can lock exclusively
599   // Upgrade to exclusive lock (as per POSIX)
600   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
601
602   TWICE(  // Wake up process to lock shared lock
603         PING_WORKER(3); // (R3)
604         
605         // Shall not have lock immediately
606         NOT_WAIT_WORKER(6)); // (6)
607   
608   // Release lock ; process will get it
609   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
610   TWICE(WAIT_WORKER(6); // (6)
611         
612         // We no longer have the lock
613         ASSERT_EQ(-EWOULDBLOCK,
614                   ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
615         ASSERT_EQ(-EWOULDBLOCK,
616                   ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
617         
618         // Wake up process to unlock exclusive lock
619         PING_WORKER(4); // (R4)
620         WAIT_WORKER(7); // (7)
621         );
622   
623   // We can lock it again
624   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
625   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
626
627   // Wait pids
628   int status;
629   ASSERT_EQ(pid[0], waitpid(pid[0], &status, 0));
630   ASSERT_EQ(EXIT_SUCCESS, status);
631   ASSERT_EQ(pid[1], waitpid(pid[1], &status, 0));
632   ASSERT_EQ(EXIT_SUCCESS, status);
633
634   // Cleanup
635   s.sem_destroy();
636   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
637   ASSERT_EQ(0, ceph_close(cmount, fd));
638   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
639   CLEANUP_CEPH();
640 }