Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / client / SyntheticClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "include/compat.h"
16
17 #include <iostream>
18 #include <sstream>
19 using namespace std;
20
21
22 #include "common/config.h"
23 #include "SyntheticClient.h"
24 #include "osdc/Objecter.h"
25 #include "osdc/Filer.h"
26
27
28 #include "include/filepath.h"
29 #include "common/perf_counters.h"
30
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <fcntl.h>
34 #include <utime.h>
35 #include <math.h>
36 #include <sys/statvfs.h>
37
38 #include "common/errno.h"
39 #include "include/assert.h"
40 #include "include/cephfs/ceph_statx.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_client
44 #undef dout_prefix
45 #define dout_prefix *_dout << "client." << (whoami >= 0 ? whoami:client->get_nodeid()) << " "
46
47 // traces
48 //void trace_include(SyntheticClient *syn, Client *cl, string& prefix);
49 //void trace_openssh(SyntheticClient *syn, Client *cl, string& prefix);
50
51 int num_client = 1;
52 list<int> syn_modes;
53 list<int> syn_iargs;
54 list<string> syn_sargs;
55 int syn_filer_flags = 0;
56
57 void parse_syn_options(vector<const char*>& args)
58 {
59   vector<const char*> nargs;
60
61   for (unsigned i=0; i<args.size(); i++) {
62     if (strcmp(args[i],"--num-client") == 0) {
63       num_client = atoi(args[++i]);
64       continue;
65     }
66     if (strcmp(args[i],"--syn") == 0) {
67       ++i;
68
69       if (strcmp(args[i], "mksnap") == 0) {
70         syn_modes.push_back(SYNCLIENT_MODE_MKSNAP);
71         syn_sargs.push_back(args[++i]); // path
72         syn_sargs.push_back(args[++i]); // name
73       }
74       else if (strcmp(args[i], "rmsnap") == 0) {
75         syn_modes.push_back(SYNCLIENT_MODE_RMSNAP);
76         syn_sargs.push_back(args[++i]); // path
77         syn_sargs.push_back(args[++i]); // name
78       } else if (strcmp(args[i], "mksnapfile") == 0) {
79         syn_modes.push_back(SYNCLIENT_MODE_MKSNAPFILE);
80         syn_sargs.push_back(args[++i]); // path
81       } else if (strcmp(args[i],"rmfile") == 0) {
82         syn_modes.push_back( SYNCLIENT_MODE_RMFILE );
83       } else if (strcmp(args[i],"writefile") == 0) {
84         syn_modes.push_back( SYNCLIENT_MODE_WRITEFILE );
85         syn_iargs.push_back( atoi(args[++i]) );
86         syn_iargs.push_back( atoi(args[++i]) );
87       } else if (strcmp(args[i],"wrshared") == 0) {
88         syn_modes.push_back( SYNCLIENT_MODE_WRSHARED );
89         syn_iargs.push_back( atoi(args[++i]) );
90         syn_iargs.push_back( atoi(args[++i]) );
91       } else if (strcmp(args[i],"writebatch") == 0) {
92           syn_modes.push_back( SYNCLIENT_MODE_WRITEBATCH );
93         syn_iargs.push_back( atoi(args[++i]) );
94         syn_iargs.push_back( atoi(args[++i]) );
95         syn_iargs.push_back( atoi(args[++i]) );
96       } else if (strcmp(args[i],"readfile") == 0) {
97         syn_modes.push_back( SYNCLIENT_MODE_READFILE );
98         syn_iargs.push_back( atoi(args[++i]) );
99         syn_iargs.push_back( atoi(args[++i]) );
100       } else if (strcmp(args[i],"readwriterandom") == 0) {
101         syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM );
102         syn_iargs.push_back( atoi(args[++i]) );
103         syn_iargs.push_back( atoi(args[++i]) );
104       } else if (strcmp(args[i],"readwriterandom_ex") == 0) {
105         syn_modes.push_back( SYNCLIENT_MODE_RDWRRANDOM_EX );
106         syn_iargs.push_back( atoi(args[++i]) );
107         syn_iargs.push_back( atoi(args[++i]) );
108       } else if (strcmp(args[i],"overloadosd0") == 0) {
109         syn_modes.push_back( SYNCLIENT_MODE_OVERLOAD_OSD_0 );
110         syn_iargs.push_back( atoi(args[++i]) );
111         syn_iargs.push_back( atoi(args[++i]) );
112         syn_iargs.push_back( atoi(args[++i]) );
113       } else if (strcmp(args[i],"readshared") == 0) {
114         syn_modes.push_back( SYNCLIENT_MODE_READSHARED );
115         syn_iargs.push_back( atoi(args[++i]) );
116         syn_iargs.push_back( atoi(args[++i]) );
117       } else if (strcmp(args[i],"rw") == 0) {
118         int a = atoi(args[++i]);
119         int b = atoi(args[++i]);
120         syn_modes.push_back( SYNCLIENT_MODE_WRITEFILE );
121         syn_iargs.push_back( a );
122         syn_iargs.push_back( b );
123         syn_modes.push_back( SYNCLIENT_MODE_READFILE );
124         syn_iargs.push_back( a );
125         syn_iargs.push_back( b );
126       } else if (strcmp(args[i],"dumpplacement") == 0) {
127         syn_modes.push_back( SYNCLIENT_MODE_DUMP );
128         syn_sargs.push_back( args[++i] );   
129       } else if (strcmp(args[i],"dropcache") == 0) {
130         syn_modes.push_back( SYNCLIENT_MODE_DROPCACHE );
131       } else if (strcmp(args[i],"makedirs") == 0) {
132         syn_modes.push_back( SYNCLIENT_MODE_MAKEDIRS );
133         syn_iargs.push_back( atoi(args[++i]) );
134         syn_iargs.push_back( atoi(args[++i]) );
135         syn_iargs.push_back( atoi(args[++i]) );
136       } else if (strcmp(args[i],"makedirmess") == 0) {
137         syn_modes.push_back( SYNCLIENT_MODE_MAKEDIRMESS );
138         syn_iargs.push_back( atoi(args[++i]) );
139       } else if (strcmp(args[i],"statdirs") == 0) {
140         syn_modes.push_back( SYNCLIENT_MODE_STATDIRS );
141         syn_iargs.push_back( atoi(args[++i]) );
142         syn_iargs.push_back( atoi(args[++i]) );
143         syn_iargs.push_back( atoi(args[++i]) );
144       } else if (strcmp(args[i],"readdirs") == 0) {
145         syn_modes.push_back( SYNCLIENT_MODE_READDIRS );
146         syn_iargs.push_back( atoi(args[++i]) );
147         syn_iargs.push_back( atoi(args[++i]) );
148         syn_iargs.push_back( atoi(args[++i]) );
149       } else if (strcmp(args[i],"makefiles") == 0) {
150         syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES );
151         syn_iargs.push_back( atoi(args[++i]) );
152         syn_iargs.push_back( atoi(args[++i]) );
153         syn_iargs.push_back( atoi(args[++i]) );
154       } else if (strcmp(args[i],"makefiles2") == 0) {
155         syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES2 );
156         syn_iargs.push_back( atoi(args[++i]) );
157         syn_iargs.push_back( atoi(args[++i]) );
158         syn_iargs.push_back( atoi(args[++i]) );
159       } else if (strcmp(args[i],"linktest") == 0) {
160         syn_modes.push_back( SYNCLIENT_MODE_LINKTEST );
161       } else if (strcmp(args[i],"createshared") == 0) {
162         syn_modes.push_back( SYNCLIENT_MODE_CREATESHARED );
163         syn_iargs.push_back( atoi(args[++i]) );
164       } else if (strcmp(args[i],"openshared") == 0) {
165         syn_modes.push_back( SYNCLIENT_MODE_OPENSHARED );
166         syn_iargs.push_back( atoi(args[++i]) );
167         syn_iargs.push_back( atoi(args[++i]) );
168       } else if (strcmp(args[i],"createobjects") == 0) {
169         syn_modes.push_back( SYNCLIENT_MODE_CREATEOBJECTS );
170         syn_iargs.push_back( atoi(args[++i]) );
171         syn_iargs.push_back( atoi(args[++i]) );
172         syn_iargs.push_back( atoi(args[++i]) );
173       } else if (strcmp(args[i],"objectrw") == 0) {
174         syn_modes.push_back( SYNCLIENT_MODE_OBJECTRW );
175         syn_iargs.push_back( atoi(args[++i]) );
176         syn_iargs.push_back( atoi(args[++i]) );
177         syn_iargs.push_back( atoi(args[++i]) );
178         syn_iargs.push_back( atoi(args[++i]) );
179         syn_iargs.push_back( atoi(args[++i]) );
180         syn_iargs.push_back( atoi(args[++i]) );
181       } else if (strcmp(args[i],"walk") == 0) {
182         syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
183         //syn_sargs.push_back( atoi(args[++i]) );
184       } else if (strcmp(args[i],"randomwalk") == 0) {
185         syn_modes.push_back( SYNCLIENT_MODE_RANDOMWALK );
186         syn_iargs.push_back( atoi(args[++i]) );       
187       } else if (strcmp(args[i],"trace") == 0) {
188         syn_modes.push_back( SYNCLIENT_MODE_TRACE );
189         syn_sargs.push_back( args[++i] );
190         syn_iargs.push_back( atoi(args[++i]) );
191         syn_iargs.push_back(1);// data
192       } else if (strcmp(args[i],"mtrace") == 0) {
193         syn_modes.push_back( SYNCLIENT_MODE_TRACE );
194         syn_sargs.push_back( args[++i] );
195         syn_iargs.push_back( atoi(args[++i]) );
196         syn_iargs.push_back(0);// no data
197       } else if (strcmp(args[i],"thrashlinks") == 0) {
198         syn_modes.push_back( SYNCLIENT_MODE_THRASHLINKS );
199         syn_iargs.push_back( atoi(args[++i]) );
200         syn_iargs.push_back( atoi(args[++i]) );
201         syn_iargs.push_back( atoi(args[++i]) );
202         syn_iargs.push_back( atoi(args[++i]) );
203       } else if (strcmp(args[i],"foo") == 0) {
204         syn_modes.push_back( SYNCLIENT_MODE_FOO );
205       } else if (strcmp(args[i],"until") == 0) {
206         syn_modes.push_back( SYNCLIENT_MODE_UNTIL );
207         syn_iargs.push_back( atoi(args[++i]) );
208       } else if (strcmp(args[i],"sleepuntil") == 0) {
209         syn_modes.push_back( SYNCLIENT_MODE_SLEEPUNTIL );
210         syn_iargs.push_back( atoi(args[++i]) );
211       } else if (strcmp(args[i],"only") == 0) {
212         syn_modes.push_back( SYNCLIENT_MODE_ONLY );
213         syn_iargs.push_back( atoi(args[++i]) );
214       } else if (strcmp(args[i],"onlyrange") == 0) {
215         syn_modes.push_back( SYNCLIENT_MODE_ONLYRANGE );
216         syn_iargs.push_back( atoi(args[++i]) );
217         syn_iargs.push_back( atoi(args[++i]) );
218       } else if (strcmp(args[i],"sleep") == 0) { 
219         syn_modes.push_back( SYNCLIENT_MODE_SLEEP );
220         syn_iargs.push_back( atoi(args[++i]) );
221       } else if (strcmp(args[i],"randomsleep") == 0) { 
222         syn_modes.push_back( SYNCLIENT_MODE_RANDOMSLEEP );
223         syn_iargs.push_back( atoi(args[++i]) );
224       } else if (strcmp(args[i],"opentest") == 0) { 
225         syn_modes.push_back( SYNCLIENT_MODE_OPENTEST );
226         syn_iargs.push_back( atoi(args[++i]) );
227       } else if (strcmp(args[i],"optest") == 0) {
228         syn_modes.push_back( SYNCLIENT_MODE_OPTEST );
229         syn_iargs.push_back( atoi(args[++i]) );
230       } else if (strcmp(args[i],"truncate") == 0) { 
231         syn_modes.push_back( SYNCLIENT_MODE_TRUNCATE );
232         syn_sargs.push_back(args[++i]);
233         syn_iargs.push_back(atoi(args[++i]));
234       } else if (strcmp(args[i],"importfind") == 0) {
235         syn_modes.push_back(SYNCLIENT_MODE_IMPORTFIND);
236         syn_sargs.push_back(args[++i]);
237         syn_sargs.push_back(args[++i]);
238         syn_iargs.push_back(atoi(args[++i]));
239       } else if (strcmp(args[i], "lookuphash") == 0) {
240         syn_modes.push_back(SYNCLIENT_MODE_LOOKUPHASH);
241         syn_sargs.push_back(args[++i]);
242         syn_sargs.push_back(args[++i]);
243         syn_sargs.push_back(args[++i]);
244       } else if (strcmp(args[i], "lookupino") == 0) {
245         syn_modes.push_back(SYNCLIENT_MODE_LOOKUPINO);
246         syn_sargs.push_back(args[++i]);
247       } else if (strcmp(args[i], "chunkfile") == 0) {
248         syn_modes.push_back(SYNCLIENT_MODE_CHUNK);
249         syn_sargs.push_back(args[++i]);
250       } else {
251         cerr << "unknown syn arg " << args[i] << std::endl;
252         ceph_abort();
253       }
254     }
255     else if (strcmp(args[i], "localize_reads") == 0) {
256       cerr << "set CEPH_OSD_FLAG_LOCALIZE_READS" << std::endl;
257       syn_filer_flags |= CEPH_OSD_FLAG_LOCALIZE_READS;
258     }
259     else {
260       nargs.push_back(args[i]);
261     }
262   }
263
264   args = nargs;
265 }
266
267
268 SyntheticClient::SyntheticClient(StandaloneClient *client, int w)
269 {
270   this->client = client;
271   whoami = w;
272   thread_id = 0;
273   
274   did_readdir = false;
275
276   run_only = -1;
277   exclude = -1;
278
279   this->modes = syn_modes;
280   this->iargs = syn_iargs;
281   this->sargs = syn_sargs;
282
283   run_start = ceph_clock_now();
284 }
285
286
287
288
289 #define DBL 2
290
291 void *synthetic_client_thread_entry(void *ptr)
292 {
293   SyntheticClient *sc = static_cast<SyntheticClient*>(ptr);
294   //int r = 
295   sc->run();
296   return 0;//(void*)r;
297 }
298
299 string SyntheticClient::get_sarg(int seq) 
300 {
301   string a;
302   if (!sargs.empty()) {
303     a = sargs.front(); 
304     sargs.pop_front();
305   }
306   if (a.length() == 0 || a == "~") {
307     char s[30];
308     snprintf(s, sizeof(s), "syn.%lld.%d", (long long)client->whoami.v, seq);
309     a = s;
310   } 
311   return a;
312 }
313
314 int SyntheticClient::run()
315 {
316   UserPerm perms = client->pick_my_perms();
317   dout(15) << "initing" << dendl;
318   int err = client->init();
319   if (err < 0) {
320     dout(0) << "failed to initialize: " << cpp_strerror(err) << dendl;
321     return -1;
322   }
323
324   dout(15) << "mounting" << dendl;
325   err = client->mount("", perms);
326   if (err < 0) {
327     dout(0) << "failed to mount: " << cpp_strerror(err) << dendl;
328     client->shutdown();
329     return -1;
330   }
331
332   //run_start = ceph_clock_now(client->cct);
333   run_until = utime_t(0,0);
334   dout(5) << "run" << dendl;
335
336   int seq = 0;
337
338   for (list<int>::iterator it = modes.begin();
339        it != modes.end();
340        ++it) {
341     int mode = *it;
342     dout(3) << "mode " << mode << dendl;
343
344     switch (mode) {
345
346
347       // WHO?
348
349     case SYNCLIENT_MODE_ONLY:
350       {
351         run_only = iargs.front();
352         iargs.pop_front();
353         if (run_only == client->get_nodeid())
354           dout(2) << "only " << run_only << dendl;
355       }
356       break;
357     case SYNCLIENT_MODE_ONLYRANGE:
358       {
359         int first = iargs.front();
360         iargs.pop_front();
361         int last = iargs.front();
362         iargs.pop_front();
363         if (first <= client->get_nodeid() &&
364             last > client->get_nodeid()) {
365           run_only = client->get_nodeid();
366           dout(2) << "onlyrange [" << first << ", " << last << ") includes me" << dendl;
367         } else
368           run_only = client->get_nodeid().v+1;  // not me
369       }
370       break;
371     case SYNCLIENT_MODE_EXCLUDE:
372       {
373         exclude = iargs.front();
374         iargs.pop_front();
375         if (exclude == client->get_nodeid()) {
376           run_only = client->get_nodeid().v + 1;
377           dout(2) << "not running " << exclude << dendl;
378         } else
379           run_only = -1;
380       }
381       break;
382
383       // HOW LONG?
384
385     case SYNCLIENT_MODE_UNTIL:
386       {
387         int iarg1 = iargs.front();
388         iargs.pop_front();
389         if (run_me()) {
390           if (iarg1) {
391             dout(2) << "until " << iarg1 << dendl;
392             utime_t dur(iarg1,0);
393             run_until = run_start + dur;
394           } else {
395             dout(2) << "until " << iarg1 << " (no limit)" << dendl;
396             run_until = utime_t(0,0);
397           }
398         }
399       }
400       break;
401
402
403       // ...
404
405     case SYNCLIENT_MODE_FOO:
406       if (run_me()) {
407         foo();
408       }
409       did_run_me();
410       break;
411
412     case SYNCLIENT_MODE_RANDOMSLEEP:
413       {
414         int iarg1 = iargs.front();
415         iargs.pop_front();
416         if (run_me()) {
417           srand(time(0) + getpid() + client->whoami.v);
418           sleep(rand() % iarg1);
419         }
420         did_run_me();
421       }
422       break;
423
424     case SYNCLIENT_MODE_SLEEP:
425       {
426         int iarg1 = iargs.front();
427         iargs.pop_front();
428         if (run_me()) {
429           dout(2) << "sleep " << iarg1 << dendl;
430           sleep(iarg1);
431         }
432         did_run_me();
433       }
434       break;
435
436     case SYNCLIENT_MODE_SLEEPUNTIL:
437       {
438         int iarg1 = iargs.front();
439         iargs.pop_front();
440         if (iarg1 && run_me()) {
441           dout(2) << "sleepuntil " << iarg1 << dendl;
442           utime_t at = ceph_clock_now() - run_start;
443           if (at.sec() < iarg1)
444             sleep(iarg1 - at.sec());
445         }
446         did_run_me();
447       }
448       break;
449
450     case SYNCLIENT_MODE_RANDOMWALK:
451       {
452         int iarg1 = iargs.front();
453         iargs.pop_front();
454         if (run_me()) {
455           dout(2) << "randomwalk " << iarg1 << dendl;
456           random_walk(iarg1);
457         }
458         did_run_me();
459       }
460       break;
461
462
463     case SYNCLIENT_MODE_DROPCACHE:
464       {
465         client->sync_fs();
466         client->drop_caches();
467       }
468       break;
469
470     case SYNCLIENT_MODE_DUMP:
471       {
472         string sarg1 = get_sarg(0);
473         if (run_me()) {
474           dout(2) << "placement dump " << sarg1 << dendl;
475           dump_placement(sarg1);
476         }
477         did_run_me();
478       }
479       break;
480
481
482     case SYNCLIENT_MODE_MAKEDIRMESS:
483       {
484         string sarg1 = get_sarg(0);
485         int iarg1 = iargs.front();  iargs.pop_front();
486         if (run_me()) {
487           dout(2) << "makedirmess " << sarg1 << " " << iarg1 << dendl;
488           make_dir_mess(sarg1.c_str(), iarg1);
489         }
490         did_run_me();
491       }
492       break;
493     case SYNCLIENT_MODE_MAKEDIRS:
494       {
495         string sarg1 = get_sarg(seq++);
496         int iarg1 = iargs.front();  iargs.pop_front();
497         int iarg2 = iargs.front();  iargs.pop_front();
498         int iarg3 = iargs.front();  iargs.pop_front();
499         if (run_me()) {
500           dout(2) << "makedirs " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << dendl;
501           make_dirs(sarg1.c_str(), iarg1, iarg2, iarg3);
502         }
503         did_run_me();
504       }
505       break;
506     case SYNCLIENT_MODE_STATDIRS:
507       {
508         string sarg1 = get_sarg(0);
509         int iarg1 = iargs.front();  iargs.pop_front();
510         int iarg2 = iargs.front();  iargs.pop_front();
511         int iarg3 = iargs.front();  iargs.pop_front();
512         if (run_me()) {
513           dout(2) << "statdirs " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << dendl;
514           stat_dirs(sarg1.c_str(), iarg1, iarg2, iarg3);
515         }
516         did_run_me();
517       }
518       break;
519     case SYNCLIENT_MODE_READDIRS:
520       {
521         string sarg1 = get_sarg(0);
522         int iarg1 = iargs.front();  iargs.pop_front();
523         int iarg2 = iargs.front();  iargs.pop_front();
524         int iarg3 = iargs.front();  iargs.pop_front();
525         if (run_me()) {
526           dout(2) << "readdirs " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << dendl;
527           read_dirs(sarg1.c_str(), iarg1, iarg2, iarg3);
528         }
529         did_run_me();
530       }
531       break;
532
533
534     case SYNCLIENT_MODE_THRASHLINKS:
535       {
536         string sarg1 = get_sarg(0);
537         int iarg1 = iargs.front();  iargs.pop_front();
538         int iarg2 = iargs.front();  iargs.pop_front();
539         int iarg3 = iargs.front();  iargs.pop_front();
540         int iarg4 = iargs.front();  iargs.pop_front();
541         if (run_me()) {
542           dout(2) << "thrashlinks " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << dendl;
543           thrash_links(sarg1.c_str(), iarg1, iarg2, iarg3, iarg4);
544         }
545         did_run_me();
546       }
547       break;
548
549     case SYNCLIENT_MODE_LINKTEST:
550       {
551         if (run_me()) {
552           link_test();
553         }
554         did_run_me();
555       }
556       break;
557
558
559     case SYNCLIENT_MODE_MAKEFILES:
560       {
561         int num = iargs.front();  iargs.pop_front();
562         int count = iargs.front();  iargs.pop_front();
563         int priv = iargs.front();  iargs.pop_front();
564         if (run_me()) {
565           dout(2) << "makefiles " << num << " " << count << " " << priv << dendl;
566           make_files(num, count, priv, false);
567         }
568         did_run_me();
569       }
570       break;
571     case SYNCLIENT_MODE_MAKEFILES2:
572       {
573         int num = iargs.front();  iargs.pop_front();
574         int count = iargs.front();  iargs.pop_front();
575         int priv = iargs.front();  iargs.pop_front();
576         if (run_me()) {
577           dout(2) << "makefiles2 " << num << " " << count << " " << priv << dendl;
578           make_files(num, count, priv, true);
579         }
580         did_run_me();
581       }
582       break;
583     case SYNCLIENT_MODE_CREATESHARED:
584       {
585         string sarg1 = get_sarg(0);
586         int num = iargs.front();  iargs.pop_front();
587         if (run_me()) {
588           dout(2) << "createshared " << num << dendl;
589           create_shared(num);
590         }
591         did_run_me();
592       }
593       break;
594     case SYNCLIENT_MODE_OPENSHARED:
595       {
596         string sarg1 = get_sarg(0);
597         int num = iargs.front();  iargs.pop_front();
598         int count = iargs.front();  iargs.pop_front();
599         if (run_me()) {
600           dout(2) << "openshared " << num << dendl;
601           open_shared(num, count);
602         }
603         did_run_me();
604       }
605       break;
606
607     case SYNCLIENT_MODE_CREATEOBJECTS:
608       {
609         int count = iargs.front();  iargs.pop_front();
610         int size = iargs.front();  iargs.pop_front();
611         int inflight = iargs.front();  iargs.pop_front();
612         if (run_me()) {
613           dout(2) << "createobjects " << count << " of " << size << " bytes"
614                   << ", " << inflight << " in flight" << dendl;
615           create_objects(count, size, inflight);
616         }
617         did_run_me();
618       }
619       break;
620     case SYNCLIENT_MODE_OBJECTRW:
621       {
622         int count = iargs.front();  iargs.pop_front();
623         int size = iargs.front();  iargs.pop_front();
624         int wrpc = iargs.front();  iargs.pop_front();
625         int overlap = iargs.front();  iargs.pop_front();
626         int rskew = iargs.front();  iargs.pop_front();
627         int wskew = iargs.front();  iargs.pop_front();
628         if (run_me()) {
629           dout(2) << "objectrw " << count << " " << size << " " << wrpc 
630                   << " " << overlap << " " << rskew << " " << wskew << dendl;
631           object_rw(count, size, wrpc, overlap, rskew, wskew);
632         }
633         did_run_me();
634       }
635       break;
636
637     case SYNCLIENT_MODE_FULLWALK:
638       {
639         string sarg1;// = get_sarg(0);
640         if (run_me()) {
641           dout(2) << "fullwalk" << sarg1 << dendl;
642           full_walk(sarg1);
643         }
644         did_run_me();
645       }
646       break;
647     case SYNCLIENT_MODE_REPEATWALK:
648       {
649         string sarg1 = get_sarg(0);
650         if (run_me()) {
651           dout(2) << "repeatwalk " << sarg1 << dendl;
652           while (full_walk(sarg1) == 0) ;
653         }
654         did_run_me();
655       }
656       break;
657
658     case SYNCLIENT_MODE_RMFILE:
659       {
660         string sarg1 = get_sarg(0);
661         if (run_me()) {
662           rm_file(sarg1);
663         }
664         did_run_me();
665       }
666       break;
667
668     case SYNCLIENT_MODE_WRITEFILE:
669       {
670         string sarg1 = get_sarg(0);
671         int iarg1 = iargs.front();  iargs.pop_front();
672         int iarg2 = iargs.front();  iargs.pop_front();
673         dout(1) << "WRITING SYN CLIENT" << dendl;
674         if (run_me()) {
675           write_file(sarg1, iarg1, iarg2);
676         }
677         did_run_me();
678       }
679       break;
680
681     case SYNCLIENT_MODE_CHUNK:
682       if (run_me()) {
683         string sarg1 = get_sarg(0);
684         chunk_file(sarg1);
685       }
686       did_run_me();
687       break;
688
689
690     case SYNCLIENT_MODE_OVERLOAD_OSD_0:
691       {
692         dout(1) << "OVERLOADING OSD 0" << dendl;
693         int iarg1 = iargs.front(); iargs.pop_front();
694         int iarg2 = iargs.front(); iargs.pop_front();
695         int iarg3 = iargs.front(); iargs.pop_front();
696         if (run_me()) {
697           overload_osd_0(iarg1, iarg2, iarg3);
698         }
699         did_run_me();
700       }
701       break;
702
703     case SYNCLIENT_MODE_WRSHARED:
704       {
705         string sarg1 = "shared";
706         int iarg1 = iargs.front();  iargs.pop_front();
707         int iarg2 = iargs.front();  iargs.pop_front();
708         if (run_me()) {
709           write_file(sarg1, iarg1, iarg2);
710         }
711         did_run_me();
712       }
713       break;
714     case SYNCLIENT_MODE_READSHARED:
715       {
716         string sarg1 = "shared";
717         int iarg1 = iargs.front();  iargs.pop_front();
718         int iarg2 = iargs.front();  iargs.pop_front();
719         if (run_me()) {
720           read_file(sarg1, iarg1, iarg2, true);
721         }
722         did_run_me();
723       }
724       break;
725     case SYNCLIENT_MODE_WRITEBATCH:
726       {
727         int iarg1 = iargs.front(); iargs.pop_front();
728         int iarg2 = iargs.front(); iargs.pop_front();
729         int iarg3 = iargs.front(); iargs.pop_front();
730
731         if (run_me()) {
732           write_batch(iarg1, iarg2, iarg3);
733         }
734         did_run_me();
735       }
736       break;
737
738     case SYNCLIENT_MODE_READFILE:
739       {
740         string sarg1 = get_sarg(0);
741         int iarg1 = iargs.front();  iargs.pop_front();
742         int iarg2 = iargs.front();  iargs.pop_front();
743
744         dout(1) << "READING SYN CLIENT" << dendl;
745         if (run_me()) {
746           read_file(sarg1, iarg1, iarg2);
747         }
748         did_run_me();
749       }
750       break;
751
752     case SYNCLIENT_MODE_RDWRRANDOM:
753       {
754         string sarg1 = get_sarg(0);
755         int iarg1 = iargs.front();  iargs.pop_front();
756         int iarg2 = iargs.front();  iargs.pop_front();
757
758         dout(1) << "RANDOM READ WRITE SYN CLIENT" << dendl;
759         if (run_me()) {
760           read_random(sarg1, iarg1, iarg2);
761         }
762         did_run_me();
763       }
764       break;
765
766     case SYNCLIENT_MODE_RDWRRANDOM_EX:
767       {
768         string sarg1 = get_sarg(0);
769         int iarg1 = iargs.front();  iargs.pop_front();
770         int iarg2 = iargs.front();  iargs.pop_front();
771
772         dout(1) << "RANDOM READ WRITE SYN CLIENT" << dendl;
773         if (run_me()) {
774           read_random_ex(sarg1, iarg1, iarg2);
775         }
776         did_run_me();
777       }
778       break;
779     case SYNCLIENT_MODE_TRACE:
780       {
781         string tfile = get_sarg(0);
782         sargs.push_front(string("~"));
783         int iarg1 = iargs.front();  iargs.pop_front();
784         int playdata = iargs.front(); iargs.pop_front();
785         string prefix = get_sarg(0);
786         char realtfile[100];
787         snprintf(realtfile, sizeof(realtfile), tfile.c_str(), (int)client->get_nodeid().v);
788
789         if (run_me()) {
790           dout(0) << "trace " << tfile << " prefix=" << prefix << " count=" << iarg1 << " data=" << playdata << dendl;
791           
792           Trace t(realtfile);
793           
794           if (iarg1 == 0) iarg1 = 1; // play trace at least once!
795
796           for (int i=0; i<iarg1; i++) {
797             utime_t start = ceph_clock_now();
798             
799             if (time_to_stop()) break;
800             play_trace(t, prefix, !playdata);
801             if (time_to_stop()) break;
802             if (iarg1 > 1) clean_dir(prefix);  // clean only if repeat
803             
804             utime_t lat = ceph_clock_now();
805             lat -= start;
806             
807             dout(0) << " trace " << tfile << " loop " << (i+1) << "/" << iarg1 << " done in " << (double)lat << " seconds" << dendl;
808             if (client->logger
809                 && i > 0
810                 && i < iarg1-1
811                 ) {
812               //client->logger->finc("trsum", (double)lat);
813               //client->logger->inc("trnum");
814             }
815           }
816           dout(1) << "done " << dendl;
817         }
818         did_run_me();
819       }
820       break;
821
822
823     case SYNCLIENT_MODE_OPENTEST:
824       {
825         int count = iargs.front();  iargs.pop_front();
826         if (run_me()) {
827           for (int i=0; i<count; i++) {
828             int fd = client->open("test", (rand()%2) ?
829                                   (O_WRONLY|O_CREAT) : O_RDONLY,
830                                   perms);
831             if (fd > 0) client->close(fd);
832           }
833         }
834         did_run_me();
835       }
836       break;
837
838     case SYNCLIENT_MODE_OPTEST:
839       {
840         int count = iargs.front();  iargs.pop_front();
841         if (run_me()) {
842           client->mknod("test", 0777, perms);
843           struct stat st;
844           for (int i=0; i<count; i++) {
845             client->lstat("test", &st, perms);
846             client->chmod("test", 0777, perms);
847           }
848         }
849         did_run_me();
850       }
851       break;
852
853     case SYNCLIENT_MODE_TRUNCATE:
854       {
855         string file = get_sarg(0);
856         sargs.push_front(file);
857         int iarg1 = iargs.front();  iargs.pop_front();
858         if (run_me()) {
859           client->truncate(file.c_str(), iarg1, perms);
860         }
861         did_run_me();
862       }
863       break;
864
865
866     case SYNCLIENT_MODE_IMPORTFIND:
867       {
868         string base = get_sarg(0);
869         string find = get_sarg(0);
870         int data = get_iarg();
871         if (run_me()) {
872           import_find(base.c_str(), find.c_str(), data);
873         }
874         did_run_me();
875       }
876       break;
877
878     case SYNCLIENT_MODE_LOOKUPHASH:
879       {
880         inodeno_t ino;
881         string iname = get_sarg(0);
882         sscanf(iname.c_str(), "%llx", (long long unsigned*)&ino.val);
883         inodeno_t dirino;
884         string diname = get_sarg(0);
885         sscanf(diname.c_str(), "%llx", (long long unsigned*)&dirino.val);
886         string name = get_sarg(0);
887         if (run_me()) {
888           lookup_hash(ino, dirino, name.c_str(), perms);
889         }
890       }
891       break;
892     case SYNCLIENT_MODE_LOOKUPINO:
893       {
894         inodeno_t ino;
895         string iname = get_sarg(0);
896         sscanf(iname.c_str(), "%llx", (long long unsigned*)&ino.val);
897         if (run_me()) {
898           lookup_ino(ino, perms);
899         }
900       }
901       break;
902       
903     case SYNCLIENT_MODE_MKSNAP:
904       {
905         string base = get_sarg(0);
906         string name = get_sarg(0);
907         if (run_me())
908           mksnap(base.c_str(), name.c_str(), perms);
909         did_run_me();
910       }
911       break;
912     case SYNCLIENT_MODE_RMSNAP:
913       {
914         string base = get_sarg(0);
915         string name = get_sarg(0);
916         if (run_me())
917           rmsnap(base.c_str(), name.c_str(), perms);
918         did_run_me();
919       }
920       break;
921     case SYNCLIENT_MODE_MKSNAPFILE:
922       {
923         string base = get_sarg(0);
924         if (run_me())
925           mksnapfile(base.c_str());
926         did_run_me();
927       }
928       break;
929
930     default:
931       ceph_abort();
932     }
933   }
934   dout(1) << "syn done, unmounting " << dendl;
935
936   client->unmount();
937   client->shutdown();
938   return 0;
939 }
940
941
942 int SyntheticClient::start_thread()
943 {
944   assert(!thread_id);
945
946   pthread_create(&thread_id, NULL, synthetic_client_thread_entry, this);
947   assert(thread_id);
948   ceph_pthread_setname(thread_id, "client");
949   return 0;
950 }
951
952 int SyntheticClient::join_thread()
953 {
954   assert(thread_id);
955   void *rv;
956   pthread_join(thread_id, &rv);
957   return 0;
958 }
959
960
961 bool roll_die(float p) 
962 {
963   float r = (float)(rand() % 100000) / 100000.0;
964   if (r < p) 
965     return true;
966   else 
967     return false;
968 }
969
970 void SyntheticClient::init_op_dist()
971 {
972   op_dist.clear();
973 #if 0
974   op_dist.add( CEPH_MDS_OP_STAT, 610 );
975   op_dist.add( CEPH_MDS_OP_UTIME, 0 );
976   op_dist.add( CEPH_MDS_OP_CHMOD, 1 );
977   op_dist.add( CEPH_MDS_OP_CHOWN, 1 );
978 #endif
979
980   op_dist.add( CEPH_MDS_OP_READDIR, 2 );
981   op_dist.add( CEPH_MDS_OP_MKNOD, 30 );
982   op_dist.add( CEPH_MDS_OP_LINK, 0 );
983   op_dist.add( CEPH_MDS_OP_UNLINK, 20 );
984   op_dist.add( CEPH_MDS_OP_RENAME, 40 );
985
986   op_dist.add( CEPH_MDS_OP_MKDIR, 10 );
987   op_dist.add( CEPH_MDS_OP_RMDIR, 20 );
988   op_dist.add( CEPH_MDS_OP_SYMLINK, 20 );
989
990   op_dist.add( CEPH_MDS_OP_OPEN, 200 );
991   //op_dist.add( CEPH_MDS_OP_READ, 0 );
992   //op_dist.add( CEPH_MDS_OP_WRITE, 0 );
993   //op_dist.add( CEPH_MDS_OP_TRUNCATE, 0 );
994   //op_dist.add( CEPH_MDS_OP_FSYNC, 0 );
995   //op_dist.add( CEPH_MDS_OP_RELEASE, 200 );
996   op_dist.normalize();
997 }
998
999 void SyntheticClient::up()
1000 {
1001   cwd = cwd.prefixpath(cwd.depth()-1);
1002   dout(DBL) << "cd .. -> " << cwd << dendl;
1003   clear_dir();
1004 }
1005
1006 int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only)
1007 {
1008   dout(4) << "play trace prefix '" << prefix << "'" << dendl;
1009   UserPerm perms = client->pick_my_perms();
1010   t.start();
1011
1012   string buf;
1013   string buf2;
1014
1015   utime_t start = ceph_clock_now();
1016
1017   ceph::unordered_map<int64_t, int64_t> open_files;
1018   ceph::unordered_map<int64_t, dir_result_t*> open_dirs;
1019
1020   ceph::unordered_map<int64_t, Fh*> ll_files;
1021   ceph::unordered_map<int64_t, dir_result_t*> ll_dirs;
1022   ceph::unordered_map<uint64_t, int64_t> ll_inos;
1023
1024   Inode *i1, *i2;
1025
1026   ll_inos[1] = 1; // root inode is known.
1027
1028   // prefix?
1029   const char *p = prefix.c_str();
1030   if (prefix.length()) {
1031     client->mkdir(prefix.c_str(), 0755, perms);
1032     struct ceph_statx stx;
1033     i1 = client->ll_get_inode(vinodeno_t(1, CEPH_NOSNAP));
1034     if (client->ll_lookupx(i1, prefix.c_str(), &i2, &stx, CEPH_STATX_INO, 0, perms) == 0) {
1035       ll_inos[1] = stx.stx_ino;
1036       dout(5) << "'root' ino is " << inodeno_t(stx.stx_ino) << dendl;
1037       client->ll_put(i1);
1038     } else {
1039       dout(0) << "warning: play_trace couldn't lookup up my per-client directory" << dendl;
1040     }
1041   } else
1042     (void) client->ll_get_inode(vinodeno_t(1, CEPH_NOSNAP));
1043
1044   utime_t last_status = start;
1045
1046   int n = 0;
1047
1048   // for object traces
1049   Mutex lock("synclient foo");
1050   Cond cond;
1051   bool ack;
1052
1053   while (!t.end()) {
1054
1055     if (++n == 100) {
1056       n = 00;
1057       utime_t now = last_status;
1058       if (now - last_status > 1.0) {
1059         last_status = now;
1060         dout(1) << "play_trace at line " << t.get_line() << dendl;
1061       }
1062     }
1063     
1064     if (time_to_stop()) break;
1065     
1066     // op
1067     const char *op = t.get_string(buf, 0);
1068     dout(4) << (t.get_line()-1) << ": trace op " << op << dendl;
1069     
1070     if (op[0] == '@') {
1071       // timestamp... ignore it!
1072       t.get_int(); // sec
1073       t.get_int(); // usec
1074       op = t.get_string(buf, 0);
1075     }
1076
1077     // high level ops ---------------------
1078     UserPerm perms = client->pick_my_perms();
1079     if (strcmp(op, "link") == 0) {
1080       const char *a = t.get_string(buf, p);
1081       const char *b = t.get_string(buf2, p);
1082       client->link(a, b, perms);
1083     } else if (strcmp(op, "unlink") == 0) {
1084       const char *a = t.get_string(buf, p);
1085       client->unlink(a, perms);
1086     } else if (strcmp(op, "rename") == 0) {
1087       const char *a = t.get_string(buf, p);
1088       const char *b = t.get_string(buf2, p);
1089       client->rename(a,b, perms);
1090     } else if (strcmp(op, "mkdir") == 0) {
1091       const char *a = t.get_string(buf, p);
1092       int64_t b = t.get_int();
1093       client->mkdir(a, b, perms);
1094     } else if (strcmp(op, "rmdir") == 0) {
1095       const char *a = t.get_string(buf, p);
1096       client->rmdir(a, perms);
1097     } else if (strcmp(op, "symlink") == 0) {
1098       const char *a = t.get_string(buf, p);
1099       const char *b = t.get_string(buf2, p);
1100       client->symlink(a, b, perms);
1101     } else if (strcmp(op, "readlink") == 0) {
1102       const char *a = t.get_string(buf, p);
1103       char buf[100];
1104       client->readlink(a, buf, 100, perms);
1105     } else if (strcmp(op, "lstat") == 0) {
1106       struct stat st;
1107       const char *a = t.get_string(buf, p);
1108       if (strcmp(a, p) != 0 &&
1109           strcmp(a, "/") != 0 &&
1110           strcmp(a, "/lib") != 0 && // or /lib.. that would be a lookup. hack.
1111           a[0] != 0)  // stop stating the root directory already
1112         client->lstat(a, &st, perms);
1113     } else if (strcmp(op, "chmod") == 0) {
1114       const char *a = t.get_string(buf, p);
1115       int64_t b = t.get_int();
1116       client->chmod(a, b, perms);
1117     } else if (strcmp(op, "chown") == 0) {
1118       const char *a = t.get_string(buf, p);
1119       int64_t b = t.get_int();
1120       int64_t c = t.get_int();
1121       client->chown(a, b, c, perms);
1122     } else if (strcmp(op, "utime") == 0) {
1123       const char *a = t.get_string(buf, p);
1124       int64_t b = t.get_int();
1125       int64_t c = t.get_int();
1126       struct utimbuf u;
1127       u.actime = b;
1128       u.modtime = c;
1129       client->utime(a, &u, perms);
1130     } else if (strcmp(op, "mknod") == 0) {
1131       const char *a = t.get_string(buf, p);
1132       int64_t b = t.get_int();
1133       int64_t c = t.get_int();
1134       client->mknod(a, b, perms, c);
1135     } else if (strcmp(op, "oldmknod") == 0) {
1136       const char *a = t.get_string(buf, p);
1137       int64_t b = t.get_int();
1138       client->mknod(a, b, perms, 0);
1139     } else if (strcmp(op, "getdir") == 0) {
1140       const char *a = t.get_string(buf, p);
1141       list<string> contents;
1142       int r = client->getdir(a, contents, perms);
1143       if (r < 0) {
1144         dout(1) << "getdir on " << a << " returns " << r << dendl;
1145       }
1146     } else if (strcmp(op, "opendir") == 0) {
1147       const char *a = t.get_string(buf, p);
1148       int64_t b = t.get_int();
1149       dir_result_t *dirp;
1150       client->opendir(a, &dirp, perms);
1151       if (dirp) open_dirs[b] = dirp;
1152     } else if (strcmp(op, "closedir") == 0) {
1153       int64_t a = t.get_int();
1154       client->closedir(open_dirs[a]);
1155       open_dirs.erase(a);
1156     } else if (strcmp(op, "open") == 0) {
1157       const char *a = t.get_string(buf, p);
1158       int64_t b = t.get_int(); 
1159       int64_t c = t.get_int(); 
1160       int64_t d = t.get_int();
1161       int64_t fd = client->open(a, b, perms, c);
1162       if (fd > 0) open_files[d] = fd;
1163     } else if (strcmp(op, "oldopen") == 0) {
1164       const char *a = t.get_string(buf, p);
1165       int64_t b = t.get_int(); 
1166       int64_t d = t.get_int();
1167       int64_t fd = client->open(a, b, perms, 0755);
1168       if (fd > 0) open_files[d] = fd;
1169     } else if (strcmp(op, "close") == 0) {
1170       int64_t id = t.get_int();
1171       int64_t fh = open_files[id];
1172       if (fh > 0) client->close(fh);
1173       open_files.erase(id);
1174     } else if (strcmp(op, "lseek") == 0) {
1175       int64_t f = t.get_int();
1176       int fd = open_files[f];
1177       int64_t off = t.get_int();
1178       int64_t whence = t.get_int();
1179       client->lseek(fd, off, whence);
1180     } else if (strcmp(op, "read") == 0) {
1181       int64_t f = t.get_int();
1182       int64_t size = t.get_int();
1183       int64_t off = t.get_int();
1184       int64_t fd = open_files[f];
1185       if (!metadata_only) {
1186         char *b = new char[size];
1187         client->read(fd, b, size, off);
1188         delete[] b;
1189       }
1190     } else if (strcmp(op, "write") == 0) {
1191       int64_t f = t.get_int();
1192       int64_t fd = open_files[f];
1193       int64_t size = t.get_int();
1194       int64_t off = t.get_int();
1195       if (!metadata_only) {
1196         char *b = new char[size];
1197         memset(b, 1, size);            // let's write 1's!
1198         client->write(fd, b, size, off);
1199         delete[] b;
1200       } else {
1201         client->write(fd, NULL, 0, size+off);
1202       }
1203     } else if (strcmp(op, "truncate") == 0) {
1204       const char *a = t.get_string(buf, p);
1205       int64_t l = t.get_int();
1206       client->truncate(a, l, perms);
1207     } else if (strcmp(op, "ftruncate") == 0) {
1208       int64_t f = t.get_int();
1209       int fd = open_files[f];
1210       int64_t l = t.get_int();
1211       client->ftruncate(fd, l, perms);
1212     } else if (strcmp(op, "fsync") == 0) {
1213       int64_t f = t.get_int();
1214       int64_t b = t.get_int();
1215       int fd = open_files[f];
1216       client->fsync(fd, b);
1217     } else if (strcmp(op, "chdir") == 0) {
1218       const char *a = t.get_string(buf, p);
1219       // Client users should remember their path, but since this
1220       // is just a synthetic client we ignore it.
1221       std::string ignore;
1222       client->chdir(a, ignore, perms);
1223     } else if (strcmp(op, "statfs") == 0) {
1224       struct statvfs stbuf;
1225       client->statfs("/", &stbuf, perms);
1226     }
1227
1228     // low level ops ---------------------
1229     else if (strcmp(op, "ll_lookup") == 0) {
1230       int64_t i = t.get_int();
1231       const char *name = t.get_string(buf, p);
1232       int64_t r = t.get_int();
1233       struct ceph_statx stx;
1234       if (ll_inos.count(i)) {
1235           i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1236           if (client->ll_lookupx(i1, name, &i2, &stx, CEPH_STATX_INO, 0, perms) == 0)
1237             ll_inos[r] = stx.stx_ino;
1238           client->ll_put(i1);
1239       }
1240     } else if (strcmp(op, "ll_forget") == 0) {
1241       int64_t i = t.get_int();
1242       int64_t n = t.get_int();
1243       if (ll_inos.count(i) && 
1244           client->ll_forget(
1245             client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP)), n))
1246         ll_inos.erase(i);
1247     } else if (strcmp(op, "ll_getattr") == 0) {
1248       int64_t i = t.get_int();
1249       struct stat attr;
1250       if (ll_inos.count(i)) {
1251         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1252         client->ll_getattr(i1, &attr, perms);
1253         client->ll_put(i1);
1254       }
1255     } else if (strcmp(op, "ll_setattr") == 0) {
1256       int64_t i = t.get_int();
1257       struct stat attr;
1258       memset(&attr, 0, sizeof(attr));
1259       attr.st_mode = t.get_int();
1260       attr.st_uid = t.get_int();
1261       attr.st_gid = t.get_int();
1262       attr.st_size = t.get_int();
1263       attr.st_mtime = t.get_int();
1264       attr.st_atime = t.get_int();
1265       int mask = t.get_int();
1266       if (ll_inos.count(i)) {
1267         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1268         client->ll_setattr(i1, &attr, mask, perms);
1269         client->ll_put(i1);
1270       }
1271     } else if (strcmp(op, "ll_readlink") == 0) {
1272       int64_t i = t.get_int();
1273       if (ll_inos.count(i)) {
1274         char buf[PATH_MAX];
1275         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1276         client->ll_readlink(i1, buf, sizeof(buf), perms);
1277         client->ll_put(i1);
1278       }
1279     } else if (strcmp(op, "ll_mknod") == 0) {
1280       int64_t i = t.get_int();
1281       const char *n = t.get_string(buf, p);
1282       int m = t.get_int();
1283       int r = t.get_int();
1284       int64_t ri = t.get_int();
1285       struct stat attr;
1286       if (ll_inos.count(i)) {
1287         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1288         if (client->ll_mknod(i1, n, m, r, &attr, &i2, perms) == 0)
1289           ll_inos[ri] = attr.st_ino;
1290         client->ll_put(i1);
1291       }
1292     } else if (strcmp(op, "ll_mkdir") == 0) {
1293       int64_t i = t.get_int();
1294       const char *n = t.get_string(buf, p);
1295       int m = t.get_int();
1296       int64_t ri = t.get_int();
1297       struct stat attr;
1298       if (ll_inos.count(i)) {
1299         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1300         if (client->ll_mkdir(i1, n, m, &attr, &i2, perms) == 0)
1301           ll_inos[ri] = attr.st_ino;
1302         client->ll_put(i1);
1303       }
1304     } else if (strcmp(op, "ll_symlink") == 0) {
1305       int64_t i = t.get_int();
1306       const char *n = t.get_string(buf, p);
1307       const char *v = t.get_string(buf2, p);
1308       int64_t ri = t.get_int();
1309       struct stat attr;
1310       if (ll_inos.count(i)) {
1311         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1312         if (client->ll_symlink(i1, n, v, &attr, &i2, perms) == 0)
1313           ll_inos[ri] = attr.st_ino;
1314         client->ll_put(i1);
1315       }
1316     } else if (strcmp(op, "ll_unlink") == 0) {
1317       int64_t i = t.get_int();
1318       const char *n = t.get_string(buf, p);
1319       if (ll_inos.count(i)) {
1320         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1321         client->ll_unlink(i1, n, perms);
1322         client->ll_put(i1);
1323       }
1324     } else if (strcmp(op, "ll_rmdir") == 0) {
1325       int64_t i = t.get_int();
1326       const char *n = t.get_string(buf, p);
1327       if (ll_inos.count(i)) {
1328         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1329         client->ll_rmdir(i1, n, perms);
1330         client->ll_put(i1);
1331       }
1332     } else if (strcmp(op, "ll_rename") == 0) {
1333       int64_t i = t.get_int();
1334       const char *n = t.get_string(buf, p);
1335       int64_t ni = t.get_int();
1336       const char *nn = t.get_string(buf2, p);
1337       if (ll_inos.count(i) &&
1338           ll_inos.count(ni)) {
1339         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1340         i2 = client->ll_get_inode(vinodeno_t(ll_inos[ni],CEPH_NOSNAP));
1341         client->ll_rename(i1, n, i2, nn, perms);
1342         client->ll_put(i1);
1343         client->ll_put(i2);
1344       }
1345     } else if (strcmp(op, "ll_link") == 0) {
1346       int64_t i = t.get_int();
1347       int64_t ni = t.get_int();
1348       const char *nn = t.get_string(buf, p);
1349       if (ll_inos.count(i) &&
1350           ll_inos.count(ni)) {
1351         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1352         i2 = client->ll_get_inode(vinodeno_t(ll_inos[ni],CEPH_NOSNAP));
1353         client->ll_link(i1, i2, nn, perms);
1354         client->ll_put(i1);
1355         client->ll_put(i2);
1356       }
1357     } else if (strcmp(op, "ll_opendir") == 0) {
1358       int64_t i = t.get_int();
1359       int64_t r = t.get_int();
1360       dir_result_t *dirp;
1361       if (ll_inos.count(i)) {
1362         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1363         if (client->ll_opendir(i1, O_RDONLY, &dirp, perms) == 0)
1364           ll_dirs[r] = dirp;
1365         client->ll_put(i1);
1366       }
1367     } else if (strcmp(op, "ll_releasedir") == 0) {
1368       int64_t f = t.get_int();
1369       if (ll_dirs.count(f)) {
1370         client->ll_releasedir(ll_dirs[f]);
1371         ll_dirs.erase(f);
1372       }
1373     } else if (strcmp(op, "ll_open") == 0) {
1374       int64_t i = t.get_int();
1375       int64_t f = t.get_int();
1376       int64_t r = t.get_int();
1377       Fh *fhp;
1378       if (ll_inos.count(i)) {
1379         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1380         if (client->ll_open(i1, f, &fhp, perms) == 0)
1381           ll_files[r] = fhp;
1382         client->ll_put(i1);
1383       }
1384     } else if (strcmp(op, "ll_create") == 0) {
1385       int64_t i = t.get_int();
1386       const char *n = t.get_string(buf, p);
1387       int64_t m = t.get_int();
1388       int64_t f = t.get_int();
1389       int64_t r = t.get_int();
1390       int64_t ri = t.get_int();
1391       struct stat attr;
1392       if (ll_inos.count(i)) {
1393         Fh *fhp;
1394         i1 = client->ll_get_inode(vinodeno_t(ll_inos[i],CEPH_NOSNAP));
1395         if (client->ll_create(i1, n, m, f, &attr, NULL, &fhp, perms) == 0) {
1396           ll_inos[ri] = attr.st_ino;
1397           ll_files[r] = fhp;
1398         }
1399         client->ll_put(i1);
1400       }
1401     } else if (strcmp(op, "ll_read") == 0) {
1402       int64_t f = t.get_int();
1403       int64_t off = t.get_int();
1404       int64_t size = t.get_int();
1405       if (ll_files.count(f) &&
1406           !metadata_only) {
1407         bufferlist bl;
1408         client->ll_read(ll_files[f], off, size, &bl);
1409       }
1410     } else if (strcmp(op, "ll_write") == 0) {
1411       int64_t f = t.get_int();
1412       int64_t off = t.get_int();
1413       int64_t size = t.get_int();
1414       if (ll_files.count(f)) {
1415         if (!metadata_only) {
1416           bufferlist bl;
1417           bufferptr bp(size);
1418           bl.push_back(bp);
1419           bp.zero();
1420           client->ll_write(ll_files[f], off, size, bl.c_str());
1421         } else {
1422           client->ll_write(ll_files[f], off+size, 0, NULL);
1423         }
1424       }
1425     } else if (strcmp(op, "ll_flush") == 0) {
1426       int64_t f = t.get_int();
1427       if (!metadata_only &&
1428           ll_files.count(f)) 
1429         client->ll_flush(ll_files[f]);
1430     } else if (strcmp(op, "ll_fsync") == 0) {
1431       int64_t f = t.get_int();
1432       if (!metadata_only &&
1433           ll_files.count(f)) 
1434         client->ll_fsync(ll_files[f], false); // FIXME dataonly param
1435     } else if (strcmp(op, "ll_release") == 0) {
1436       int64_t f = t.get_int();
1437       if (ll_files.count(f)) {
1438         client->ll_release(ll_files[f]);
1439         ll_files.erase(f);
1440       }
1441     } else if (strcmp(op, "ll_statfs") == 0) {
1442       int64_t i = t.get_int();
1443       if (ll_inos.count(i))
1444         {} //client->ll_statfs(vinodeno_t(ll_inos[i],CEPH_NOSNAP), perms);
1445     }
1446
1447
1448     // object-level traces
1449
1450     else if (strcmp(op, "o_stat") == 0) {
1451       int64_t oh = t.get_int();
1452       int64_t ol = t.get_int();
1453       object_t oid = file_object_t(oh, ol);
1454       lock.Lock();
1455       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
1456       uint64_t size;
1457       ceph::real_time mtime;
1458       client->objecter->stat(oid, oloc, CEPH_NOSNAP, &size, &mtime, 0,
1459                              new C_SafeCond(&lock, &cond, &ack));
1460       while (!ack) cond.Wait(lock);
1461       lock.Unlock();
1462     }
1463     else if (strcmp(op, "o_read") == 0) {
1464       int64_t oh = t.get_int();
1465       int64_t ol = t.get_int();
1466       int64_t off = t.get_int();
1467       int64_t len = t.get_int();
1468       object_t oid = file_object_t(oh, ol);
1469       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
1470       lock.Lock();
1471       bufferlist bl;
1472       client->objecter->read(oid, oloc, off, len, CEPH_NOSNAP, &bl, 0,
1473                              new C_SafeCond(&lock, &cond, &ack));
1474       while (!ack) cond.Wait(lock);
1475       lock.Unlock();
1476     }
1477     else if (strcmp(op, "o_write") == 0) {
1478       int64_t oh = t.get_int();
1479       int64_t ol = t.get_int();
1480       int64_t off = t.get_int();
1481       int64_t len = t.get_int();
1482       object_t oid = file_object_t(oh, ol);
1483       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
1484       lock.Lock();
1485       bufferptr bp(len);
1486       bufferlist bl;
1487       bl.push_back(bp);
1488       SnapContext snapc;
1489       client->objecter->write(oid, oloc, off, len, snapc, bl,
1490                               ceph::real_clock::now(), 0,
1491                               new C_SafeCond(&lock, &cond, &ack));
1492       while (!ack) cond.Wait(lock);
1493       lock.Unlock();
1494     }
1495     else if (strcmp(op, "o_zero") == 0) {
1496       int64_t oh = t.get_int();
1497       int64_t ol = t.get_int();
1498       int64_t off = t.get_int();
1499       int64_t len = t.get_int();
1500       object_t oid = file_object_t(oh, ol);
1501       object_locator_t oloc(SYNCLIENT_FIRST_POOL);
1502       lock.Lock();
1503       SnapContext snapc;
1504       client->objecter->zero(oid, oloc, off, len, snapc,
1505                              ceph::real_clock::now(), 0,
1506                              new C_SafeCond(&lock, &cond, &ack));
1507       while (!ack) cond.Wait(lock);
1508       lock.Unlock();
1509     }
1510
1511
1512     else {
1513       dout(0) << (t.get_line()-1) << ": *** trace hit unrecognized symbol '" << op << "' " << dendl;
1514       ceph_abort();
1515     }
1516   }
1517
1518   dout(10) << "trace finished on line " << t.get_line() << dendl;
1519
1520   // close open files
1521   for (ceph::unordered_map<int64_t, int64_t>::iterator fi = open_files.begin();
1522        fi != open_files.end();
1523        ++fi) {
1524     dout(1) << "leftover close " << fi->second << dendl;
1525     if (fi->second > 0) client->close(fi->second);
1526   }
1527   for (ceph::unordered_map<int64_t, dir_result_t*>::iterator fi = open_dirs.begin();
1528        fi != open_dirs.end();
1529        ++fi) {
1530     dout(1) << "leftover closedir " << fi->second << dendl;
1531     if (fi->second != 0) client->closedir(fi->second);
1532   }
1533   for (ceph::unordered_map<int64_t,Fh*>::iterator fi = ll_files.begin();
1534        fi != ll_files.end();
1535        ++fi) {
1536     dout(1) << "leftover ll_release " << fi->second << dendl;
1537     if (fi->second) client->ll_release(fi->second);
1538   }
1539   for (ceph::unordered_map<int64_t,dir_result_t*>::iterator fi = ll_dirs.begin();
1540        fi != ll_dirs.end();
1541        ++fi) {
1542     dout(1) << "leftover ll_releasedir " << fi->second << dendl;
1543     if (fi->second) client->ll_releasedir(fi->second);
1544   }
1545   
1546   return 0;
1547 }
1548
1549
1550
1551 int SyntheticClient::clean_dir(string& basedir)
1552 {
1553   // read dir
1554   list<string> contents;
1555   UserPerm perms = client->pick_my_perms();
1556   int r = client->getdir(basedir.c_str(), contents, perms);
1557   if (r < 0) {
1558     dout(1) << "getdir on " << basedir << " returns " << r << dendl;
1559     return r;
1560   }
1561
1562   for (list<string>::iterator it = contents.begin();
1563        it != contents.end();
1564        ++it) {
1565     if (*it == ".") continue;
1566     if (*it == "..") continue;
1567     string file = basedir + "/" + *it;
1568
1569     if (time_to_stop()) break;
1570
1571     struct stat st;
1572     int r = client->lstat(file.c_str(), &st, perms);
1573     if (r < 0) {
1574       dout(1) << "stat error on " << file << " r=" << r << dendl;
1575       continue;
1576     }
1577
1578     if ((st.st_mode & S_IFMT) == S_IFDIR) {
1579       clean_dir(file);
1580       client->rmdir(file.c_str(), perms);
1581     } else {
1582       client->unlink(file.c_str(), perms);
1583     }
1584   }
1585
1586   return 0;
1587
1588 }
1589
1590
1591 int SyntheticClient::full_walk(string& basedir) 
1592 {
1593   if (time_to_stop()) return -1;
1594
1595   list<string> dirq;
1596   list<frag_info_t> statq;
1597   dirq.push_back(basedir);
1598   frag_info_t empty;
1599   memset(&empty, 0, sizeof(empty));
1600   statq.push_back(empty);
1601
1602   ceph::unordered_map<inodeno_t, int> nlink;
1603   ceph::unordered_map<inodeno_t, int> nlink_seen;
1604
1605   UserPerm perms = client->pick_my_perms();
1606   while (!dirq.empty()) {
1607     string dir = dirq.front();
1608     frag_info_t expect = statq.front();
1609     dirq.pop_front();
1610     statq.pop_front();
1611
1612     frag_info_t actual = empty;
1613
1614     // read dir
1615     list<string> contents;
1616     int r = client->getdir(dir.c_str(), contents, perms);
1617     if (r < 0) {
1618       dout(1) << "getdir on " << dir << " returns " << r << dendl;
1619       continue;
1620     }
1621     
1622     for (list<string>::iterator it = contents.begin();
1623          it != contents.end();
1624          ++it) {
1625       if (*it == "." ||
1626           *it == "..") 
1627         continue;
1628       string file = dir + "/" + *it;
1629       
1630       struct stat st;
1631       frag_info_t dirstat;
1632       int r = client->lstat(file.c_str(), &st, perms, &dirstat);
1633       if (r < 0) {
1634         dout(1) << "stat error on " << file << " r=" << r << dendl;
1635         continue;
1636       }
1637
1638       nlink_seen[st.st_ino]++;
1639       nlink[st.st_ino] = st.st_nlink;
1640
1641       if (S_ISDIR(st.st_mode))
1642         actual.nsubdirs++;
1643       else
1644         actual.nfiles++;
1645
1646       // print
1647       char *tm = ctime(&st.st_mtime);
1648       tm[strlen(tm)-1] = 0;
1649       printf("%llx %c%c%c%c%c%c%c%c%c%c %2d %5d %5d %8llu %12s %s\n",
1650              (long long)st.st_ino,
1651              S_ISDIR(st.st_mode) ? 'd':'-',
1652              (st.st_mode & 0400) ? 'r':'-',
1653              (st.st_mode & 0200) ? 'w':'-',
1654              (st.st_mode & 0100) ? 'x':'-',
1655              (st.st_mode & 040) ? 'r':'-',
1656              (st.st_mode & 020) ? 'w':'-',
1657              (st.st_mode & 010) ? 'x':'-',
1658              (st.st_mode & 04) ? 'r':'-',
1659              (st.st_mode & 02) ? 'w':'-',
1660              (st.st_mode & 01) ? 'x':'-',
1661              (int)st.st_nlink,
1662              (int)st.st_uid, (int)st.st_gid,
1663              (long long unsigned)st.st_size,
1664              tm,
1665              file.c_str());
1666
1667       
1668       if ((st.st_mode & S_IFMT) == S_IFDIR) {
1669         dirq.push_back(file);
1670         statq.push_back(dirstat);
1671       }
1672     }
1673
1674     if (dir != "" &&
1675         (actual.nsubdirs != expect.nsubdirs ||
1676          actual.nfiles != expect.nfiles)) {
1677       dout(0) << dir << ": expected " << expect << dendl;
1678       dout(0) << dir << ":      got " << actual << dendl;
1679     }
1680   }
1681
1682   for (ceph::unordered_map<inodeno_t,int>::iterator p = nlink.begin(); p != nlink.end(); ++p) {
1683     if (nlink_seen[p->first] != p->second)
1684       dout(0) << p->first << " nlink " << p->second << " != " << nlink_seen[p->first] << "seen" << dendl;
1685   }
1686
1687   return 0;
1688 }
1689
1690
1691
1692 int SyntheticClient::dump_placement(string& fn) {
1693   
1694   UserPerm perms = client->pick_my_perms();
1695
1696   // open file
1697   int fd = client->open(fn.c_str(), O_RDONLY, perms);
1698   dout(5) << "reading from " << fn << " fd " << fd << dendl;
1699   if (fd < 0) return fd;
1700
1701
1702   // How big is it?
1703   struct stat stbuf;
1704   int lstat_result = client->lstat(fn.c_str(), &stbuf, perms);
1705   if (lstat_result < 0) {
1706     dout(0) << "lstat error for file " << fn << dendl;
1707     client->close(fd);
1708     return lstat_result;
1709   }
1710     
1711   off_t filesize = stbuf.st_size;
1712  
1713   // grab the placement info
1714   vector<ObjectExtent> extents;
1715   off_t offset = 0;
1716   client->enumerate_layout(fd, extents, filesize, offset);
1717   client->close(fd);
1718
1719
1720   // run through all the object extents
1721   dout(0) << "file size is " << filesize << dendl;
1722   dout(0) << "(osd, start, length) tuples for file " << fn << dendl;
1723   for (const auto& x : extents) {
1724     int osd = client->objecter->with_osdmap([&](const OSDMap& o) {
1725         return o.get_pg_acting_primary(o.object_locator_to_pg(x.oid, x.oloc));
1726       });
1727
1728     // run through all the buffer extents
1729     for (const auto& be : x.buffer_extents)
1730       dout(0) << "OSD " << osd << ", offset " << be.first
1731               << ", length " << be.second << dendl;
1732   }
1733   return 0;
1734 }
1735
1736
1737
1738 int SyntheticClient::make_dirs(const char *basedir, int dirs, int files, int depth)
1739 {
1740   if (time_to_stop()) return 0;
1741
1742   UserPerm perms = client->pick_my_perms();
1743   // make sure base dir exists
1744   int r = client->mkdir(basedir, 0755, perms);
1745   if (r != 0) {
1746     dout(1) << "can't make base dir? " << basedir << dendl;
1747     //return -1;
1748   }
1749
1750   // children
1751   char d[500];
1752   dout(3) << "make_dirs " << basedir << " dirs " << dirs << " files " << files << " depth " << depth << dendl;
1753   for (int i=0; i<files; i++) {
1754     snprintf(d, sizeof(d), "%s/file.%d", basedir, i);
1755     client->mknod(d, 0644, perms);
1756   }
1757
1758   if (depth == 0) return 0;
1759
1760   for (int i=0; i<dirs; i++) {
1761     snprintf(d, sizeof(d), "%s/dir.%d", basedir, i);
1762     make_dirs(d, dirs, files, depth-1);
1763   }
1764   
1765   return 0;
1766 }
1767
1768 int SyntheticClient::stat_dirs(const char *basedir, int dirs, int files, int depth)
1769 {
1770   if (time_to_stop()) return 0;
1771
1772   UserPerm perms = client->pick_my_perms();
1773
1774   // make sure base dir exists
1775   struct stat st;
1776   int r = client->lstat(basedir, &st, perms);
1777   if (r != 0) {
1778     dout(1) << "can't make base dir? " << basedir << dendl;
1779     return -1;
1780   }
1781
1782   // children
1783   char d[500];
1784   dout(3) << "stat_dirs " << basedir << " dirs " << dirs << " files " << files << " depth " << depth << dendl;
1785   for (int i=0; i<files; i++) {
1786     snprintf(d, sizeof(d), "%s/file.%d", basedir, i);
1787     client->lstat(d, &st, perms);
1788   }
1789
1790   if (depth == 0) return 0;
1791
1792   for (int i=0; i<dirs; i++) {
1793     snprintf(d, sizeof(d), "%s/dir.%d", basedir, i);
1794     stat_dirs(d, dirs, files, depth-1);
1795   }
1796   
1797   return 0;
1798 }
1799 int SyntheticClient::read_dirs(const char *basedir, int dirs, int files, int depth)
1800 {
1801   if (time_to_stop()) return 0;
1802
1803   struct stat st;
1804
1805   // children
1806   char d[500];
1807   dout(3) << "read_dirs " << basedir << " dirs " << dirs << " files " << files << " depth " << depth << dendl;
1808
1809   list<string> contents;
1810   UserPerm perms = client->pick_my_perms();
1811   utime_t s = ceph_clock_now();
1812   int r = client->getdir(basedir, contents, perms);
1813   utime_t e = ceph_clock_now();
1814   e -= s;
1815   if (r < 0) {
1816     dout(0) << "getdir couldn't readdir " << basedir << ", stopping" << dendl;
1817     return -1;
1818   }
1819
1820   for (int i=0; i<files; i++) {
1821     snprintf(d, sizeof(d), "%s/file.%d", basedir, i);
1822     utime_t s = ceph_clock_now();
1823     if (client->lstat(d, &st, perms) < 0) {
1824       dout(2) << "read_dirs failed stat on " << d << ", stopping" << dendl;
1825       return -1;
1826     }
1827     utime_t e = ceph_clock_now();
1828     e -= s;
1829   }
1830
1831   if (depth > 0) 
1832     for (int i=0; i<dirs; i++) {
1833       snprintf(d, sizeof(d), "%s/dir.%d", basedir, i);
1834       if (read_dirs(d, dirs, files, depth-1) < 0) return -1;
1835     }
1836
1837   return 0;
1838 }
1839
1840
1841 int SyntheticClient::make_files(int num, int count, int priv, bool more)
1842 {
1843   int whoami = client->get_nodeid().v;
1844   char d[255];
1845   UserPerm perms = client->pick_my_perms();
1846
1847   if (priv) {
1848     for (int c=0; c<count; c++) {
1849       snprintf(d, sizeof(d), "dir.%d.run%d", whoami, c);
1850       client->mkdir(d, 0755, perms);
1851     }
1852   } else {
1853     // shared
1854     if (true || whoami == 0) {
1855       for (int c=0; c<count; c++) {
1856         snprintf(d, sizeof(d), "dir.%d.run%d", 0, c);
1857         client->mkdir(d, 0755, perms);
1858       }
1859     } else {
1860       sleep(2);
1861     }
1862   }
1863   
1864   // files
1865   struct stat st;
1866   utime_t start = ceph_clock_now();
1867   for (int c=0; c<count; c++) {
1868     for (int n=0; n<num; n++) {
1869       snprintf(d, sizeof(d), "dir.%d.run%d/file.client%d.%d", priv ? whoami:0, c, whoami, n);
1870
1871       client->mknod(d, 0644, perms);
1872
1873       if (more) {
1874         client->lstat(d, &st, perms);
1875         int fd = client->open(d, O_RDONLY, perms);
1876         client->unlink(d, perms);
1877         client->close(fd);
1878       }
1879
1880       if (time_to_stop()) return 0;
1881     }
1882   }
1883   utime_t end = ceph_clock_now();
1884   end -= start;
1885   dout(0) << "makefiles time is " << end << " or " << ((double)end / (double)num) <<" per file" << dendl;
1886
1887   return 0;
1888 }
1889
1890 int SyntheticClient::link_test()
1891 {
1892   char d[255];
1893   char e[255];
1894
1895   UserPerm perms = client->pick_my_perms();
1896
1897  // create files
1898   int num = 200;
1899
1900   client->mkdir("orig", 0755, perms);
1901   client->mkdir("copy", 0755, perms);
1902
1903   utime_t start = ceph_clock_now();
1904   for (int i=0; i<num; i++) {
1905     snprintf(d, sizeof(d), "orig/file.%d", i);
1906     client->mknod(d, 0755, perms);
1907   }
1908   utime_t end = ceph_clock_now();
1909   end -= start;
1910
1911   dout(0) << "orig " << end << dendl;
1912
1913   // link
1914   start = ceph_clock_now();
1915   for (int i=0; i<num; i++) {
1916     snprintf(d, sizeof(d), "orig/file.%d", i);
1917     snprintf(e, sizeof(e), "copy/file.%d", i);
1918     client->link(d, e, perms);
1919   }
1920   end = ceph_clock_now();
1921   end -= start;
1922   dout(0) << "copy " << end << dendl;
1923
1924   return 0;
1925 }
1926
1927
1928 int SyntheticClient::create_shared(int num)
1929 {
1930   // files
1931   UserPerm perms = client->pick_my_perms();
1932   char d[255];
1933   client->mkdir("test", 0755, perms);
1934   for (int n=0; n<num; n++) {
1935     snprintf(d, sizeof(d), "test/file.%d", n);
1936     client->mknod(d, 0644, perms);
1937   }
1938   
1939   return 0;
1940 }
1941
1942 int SyntheticClient::open_shared(int num, int count)
1943 {
1944   // files
1945   char d[255];
1946   UserPerm perms = client->pick_my_perms();
1947   for (int c=0; c<count; c++) {
1948     // open
1949     list<int> fds;
1950     for (int n=0; n<num; n++) {
1951       snprintf(d, sizeof(d), "test/file.%d", n);
1952       int fd = client->open(d, O_RDONLY, perms);
1953       if (fd > 0) fds.push_back(fd);
1954     }
1955
1956     if (false && client->get_nodeid() == 0)
1957       for (int n=0; n<num; n++) {
1958         snprintf(d, sizeof(d), "test/file.%d", n);
1959         client->unlink(d, perms);
1960       }
1961
1962     while (!fds.empty()) {
1963       int fd = fds.front();
1964       fds.pop_front();
1965       client->close(fd);
1966     }
1967   }
1968   
1969   return 0;
1970 }
1971
1972
1973 // Hits OSD 0 with writes to various files with OSD 0 as the primary.
1974 int SyntheticClient::overload_osd_0(int n, int size, int wrsize) {
1975   UserPerm perms = client->pick_my_perms();
1976   // collect a bunch of files starting on OSD 0
1977   int left = n;
1978   int tried = 0;
1979   while (left < 0) {
1980
1981
1982     // pull open a file
1983     dout(0) << "in OSD overload" << dendl;
1984     string filename = get_sarg(tried);
1985     dout(1) << "OSD Overload workload: trying file " << filename << dendl;
1986     int fd = client->open(filename.c_str(), O_RDWR|O_CREAT, perms);
1987     ++tried;
1988
1989     // only use the file if its first primary is OSD 0
1990     int primary_osd = check_first_primary(fd);
1991     if (primary_osd != 0) {
1992       client->close(fd);
1993       dout(1) << "OSD Overload workload: SKIPPING file " << filename <<
1994         " with OSD " << primary_osd << " as first primary. " << dendl;
1995       continue;
1996     }
1997       dout(1) << "OSD Overload workload: USING file " << filename <<
1998         " with OSD 0 as first primary. " << dendl;
1999
2000
2001     --left;
2002     // do whatever operation we want to do on the file. How about a write?
2003     write_fd(fd, size, wrsize);
2004   }
2005   return 0;
2006 }
2007
2008
2009 // See what the primary is for the first object in this file.
2010 int SyntheticClient::check_first_primary(int fh)
2011 {
2012   vector<ObjectExtent> extents;
2013   client->enumerate_layout(fh, extents, 1, 0);
2014   return client->objecter->with_osdmap([&](const OSDMap& o) {
2015       return o.get_pg_acting_primary(
2016         o.object_locator_to_pg(extents.begin()->oid, extents.begin()->oloc));
2017     });
2018 }
2019
2020 int SyntheticClient::rm_file(string& fn)
2021 {
2022   UserPerm perms = client->pick_my_perms();
2023   return client->unlink(fn.c_str(), perms);
2024 }
2025
2026 int SyntheticClient::write_file(string& fn, int size, loff_t wrsize)   // size is in MB, wrsize in bytes
2027 {
2028   //uint64_t wrsize = 1024*256;
2029   char *buf = new char[wrsize+100];   // 1 MB
2030   memset(buf, 7, wrsize);
2031   int64_t chunks = (uint64_t)size * (uint64_t)(1024*1024) / (uint64_t)wrsize;
2032   UserPerm perms = client->pick_my_perms();
2033
2034   int fd = client->open(fn.c_str(), O_RDWR|O_CREAT, perms);
2035   dout(5) << "writing to " << fn << " fd " << fd << dendl;
2036   if (fd < 0) {
2037     delete[] buf;
2038     return fd;
2039   }
2040
2041   utime_t from = ceph_clock_now();
2042   utime_t start = from;
2043   uint64_t bytes = 0, total = 0;
2044
2045
2046   for (loff_t i=0; i<chunks; i++) {
2047     if (time_to_stop()) {
2048       dout(0) << "stopping" << dendl;
2049       break;
2050     }
2051     dout(2) << "writing block " << i << "/" << chunks << dendl;
2052     
2053     // fill buf with a 16 byte fingerprint
2054     // 64 bits : file offset
2055     // 64 bits : client id
2056     // = 128 bits (16 bytes)
2057     uint64_t *p = (uint64_t*)buf;
2058     while ((char*)p < buf + wrsize) {
2059       *p = (uint64_t)i*(uint64_t)wrsize + (uint64_t)((char*)p - buf);      
2060       p++;
2061       *p = client->get_nodeid().v;
2062       p++;
2063     }
2064
2065     client->write(fd, buf, wrsize, i*wrsize);
2066     bytes += wrsize;
2067     total += wrsize;
2068
2069     utime_t now = ceph_clock_now();
2070     if (now - from >= 1.0) {
2071       double el = now - from;
2072       dout(0) << "write " << (bytes / el / 1048576.0) << " MB/sec" << dendl;
2073       from = now;
2074       bytes = 0;
2075     }
2076   }
2077
2078   client->fsync(fd, true);
2079
2080   utime_t stop = ceph_clock_now();
2081   double el = stop - start;
2082   dout(0) << "write total " << (total / el / 1048576.0) << " MB/sec ("
2083           << total << " bytes in " << el << " seconds)" << dendl;
2084
2085   client->close(fd);
2086   delete[] buf;
2087
2088   return 0;
2089 }
2090
2091 int SyntheticClient::write_fd(int fd, int size, int wrsize)   // size is in MB, wrsize in bytes
2092 {
2093   //uint64_t wrsize = 1024*256;
2094   char *buf = new char[wrsize+100];   // 1 MB
2095   memset(buf, 7, wrsize);
2096   uint64_t chunks = (uint64_t)size * (uint64_t)(1024*1024) / (uint64_t)wrsize;
2097
2098   //dout(5) << "SyntheticClient::write_fd: writing to fd " << fd << dendl;
2099   if (fd < 0) {
2100     delete[] buf;
2101     return fd;
2102   }
2103
2104   for (unsigned i=0; i<chunks; i++) {
2105     if (time_to_stop()) {
2106       dout(0) << "stopping" << dendl;
2107       break;
2108     }
2109     dout(2) << "writing block " << i << "/" << chunks << dendl;
2110     
2111     // fill buf with a 16 byte fingerprint
2112     // 64 bits : file offset
2113     // 64 bits : client id
2114     // = 128 bits (16 bytes)
2115     uint64_t *p = (uint64_t*)buf;
2116     while ((char*)p < buf + wrsize) {
2117       *p = (uint64_t)i*(uint64_t)wrsize + (uint64_t)((char*)p - buf);      
2118       p++;
2119       *p = client->get_nodeid().v;
2120       p++;
2121     }
2122
2123     client->write(fd, buf, wrsize, i*wrsize);
2124   }
2125   client->close(fd);
2126   delete[] buf;
2127
2128   return 0;
2129 }
2130
2131
2132 int SyntheticClient::write_batch(int nfile, int size, int wrsize)
2133 {
2134   for (int i=0; i<nfile; i++) {
2135       string sarg1 = get_sarg(i);
2136     dout(0) << "Write file " << sarg1 << dendl;
2137     write_file(sarg1, size, wrsize);
2138   }
2139   return 0;
2140 }
2141
2142 // size is in MB, wrsize in bytes
2143 int SyntheticClient::read_file(const std::string& fn, int size,
2144                                int rdsize, bool ignoreprint)
2145 {
2146   char *buf = new char[rdsize]; 
2147   memset(buf, 1, rdsize);
2148   uint64_t chunks = (uint64_t)size * (uint64_t)(1024*1024) / (uint64_t)rdsize;
2149   UserPerm perms = client->pick_my_perms();
2150
2151   int fd = client->open(fn.c_str(), O_RDONLY, perms);
2152   dout(5) << "reading from " << fn << " fd " << fd << dendl;
2153   if (fd < 0) {
2154     delete[] buf;
2155     return fd;
2156   }
2157
2158   utime_t from = ceph_clock_now();
2159   utime_t start = from;
2160   uint64_t bytes = 0, total = 0;
2161
2162   for (unsigned i=0; i<chunks; i++) {
2163     if (time_to_stop()) break;
2164     dout(2) << "reading block " << i << "/" << chunks << dendl;
2165     int r = client->read(fd, buf, rdsize, i*rdsize);
2166     if (r < rdsize) {
2167       dout(1) << "read_file got r = " << r << ", probably end of file" << dendl;
2168       break;
2169     }
2170  
2171     bytes += rdsize;
2172     total += rdsize;
2173
2174     utime_t now = ceph_clock_now();
2175     if (now - from >= 1.0) {
2176       double el = now - from;
2177       dout(0) << "read " << (bytes / el / 1048576.0) << " MB/sec" << dendl;
2178       from = now;
2179       bytes = 0;
2180     }
2181
2182     // verify fingerprint
2183     int bad = 0;
2184     uint64_t *p = (uint64_t*)buf;
2185     while ((char*)p + 32 < buf + rdsize) {
2186       uint64_t readoff = *p;
2187       uint64_t wantoff = (uint64_t)i*(uint64_t)rdsize + (uint64_t)((char*)p - buf);
2188       p++;
2189       int64_t readclient = *p;
2190       p++;
2191       if (readoff != wantoff ||
2192           readclient != client->get_nodeid()) {
2193         if (!bad && !ignoreprint)
2194           dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
2195                   << ", should be offset " << wantoff << " client " << client->get_nodeid()
2196                   << dendl;
2197         bad++;
2198       }
2199     }
2200     if (bad && !ignoreprint) 
2201       dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << dendl;
2202   }
2203
2204   utime_t stop = ceph_clock_now();
2205   double el = stop - start;
2206   dout(0) << "read total " << (total / el / 1048576.0) << " MB/sec ("
2207           << total << " bytes in " << el << " seconds)" << dendl;
2208
2209   client->close(fd);
2210   delete[] buf;
2211
2212   return 0;
2213 }
2214
2215
2216
2217
2218 class C_Ref : public Context {
2219   Mutex& lock;
2220   Cond& cond;
2221   int *ref;
2222 public:
2223   C_Ref(Mutex &l, Cond &c, int *r) : lock(l), cond(c), ref(r) {
2224     lock.Lock();
2225     (*ref)++;
2226     lock.Unlock();
2227   }
2228   void finish(int) override {
2229     lock.Lock();
2230     (*ref)--;
2231     cond.Signal();
2232     lock.Unlock();
2233   }
2234 };
2235
2236 int SyntheticClient::create_objects(int nobj, int osize, int inflight)
2237 {
2238   // divy up
2239   int numc = num_client ? num_client : 1;
2240
2241   int start, inc, end;
2242
2243   if (1) {
2244     // strided
2245     start = client->get_nodeid().v; //nobjs % numc;
2246     inc = numc;
2247     end = start + nobj;
2248   } else {
2249     // segments
2250     start = nobj * client->get_nodeid().v / numc;
2251     inc = 1;
2252     end = nobj * (client->get_nodeid().v+1) / numc;
2253   }
2254
2255   dout(5) << "create_objects " << nobj << " size=" << osize 
2256           << " .. doing [" << start << "," << end << ") inc " << inc
2257           << dendl;
2258   
2259   bufferptr bp(osize);
2260   bp.zero();
2261   bufferlist bl;
2262   bl.push_back(bp);
2263
2264   Mutex lock("create_objects lock");
2265   Cond cond;
2266   
2267   int unsafe = 0;
2268   
2269   list<utime_t> starts;
2270
2271   for (int i=start; i<end; i += inc) {
2272     if (time_to_stop()) break;
2273
2274     object_t oid = file_object_t(999, i);
2275     object_locator_t oloc(SYNCLIENT_FIRST_POOL);
2276     SnapContext snapc;
2277     
2278     if (i % inflight == 0) {
2279       dout(6) << "create_objects " << i << "/" << (nobj+1) << dendl;
2280     }
2281     dout(10) << "writing " << oid << dendl;
2282
2283     starts.push_back(ceph_clock_now());
2284     client->client_lock.Lock();
2285     client->objecter->write(oid, oloc, 0, osize, snapc, bl,
2286                             ceph::real_clock::now(), 0,
2287                             new C_Ref(lock, cond, &unsafe));
2288     client->client_lock.Unlock();
2289
2290     lock.Lock();
2291     while (unsafe > inflight) {
2292       dout(20) << "waiting for " << unsafe << " unsafe" << dendl;
2293       cond.Wait(lock);
2294     }
2295     lock.Unlock();
2296
2297     utime_t lat = ceph_clock_now();
2298     lat -= starts.front();
2299     starts.pop_front();
2300   }
2301
2302   lock.Lock();
2303   while (unsafe > 0) {
2304     dout(10) << "waiting for " << unsafe << " unsafe" << dendl;
2305     cond.Wait(lock);
2306   }
2307   lock.Unlock();
2308
2309   dout(5) << "create_objects done" << dendl;
2310   return 0;
2311 }
2312
2313 int SyntheticClient::object_rw(int nobj, int osize, int wrpc, 
2314                                int overlappc,
2315                                double rskew, double wskew)
2316 {
2317   dout(5) << "object_rw " << nobj << " size=" << osize << " with "
2318           << wrpc << "% writes" 
2319           << ", " << overlappc << "% overlap"
2320           << ", rskew = " << rskew
2321           << ", wskew = " << wskew
2322           << dendl;
2323
2324   bufferptr bp(osize);
2325   bp.zero();
2326   bufferlist bl;
2327   bl.push_back(bp);
2328
2329   // start with odd number > nobj
2330   rjhash<uint32_t> h;
2331   unsigned prime = nobj + 1;             // this is the minimum!
2332   prime += h(nobj) % (3*nobj);  // bump it up some
2333   prime |= 1;                               // make it odd
2334
2335   while (true) {
2336     unsigned j;
2337     for (j=2; j*j<=prime; j++)
2338       if (prime % j == 0) break;
2339     if (j*j > prime) {
2340       break;
2341       //cout << "prime " << prime << endl;
2342     }
2343     prime += 2;
2344   }
2345
2346   Mutex lock("lock");
2347   Cond cond;
2348
2349   int unack = 0;
2350
2351   while (1) {
2352     if (time_to_stop()) break;
2353     
2354     // read or write?
2355     bool write = (rand() % 100) < wrpc;
2356
2357     // choose object
2358     double r = drand48(); // [0..1)
2359     long o;
2360     if (write) {
2361       o = (long)trunc(pow(r, wskew) * (double)nobj);  // exponentially skew towards 0
2362       int pnoremap = (long)(r * 100.0);
2363       if (pnoremap >= overlappc) 
2364         o = (o*prime) % nobj;    // remap
2365     } else {
2366       o = (long)trunc(pow(r, rskew) * (double)nobj);  // exponentially skew towards 0
2367     }
2368     object_t oid = file_object_t(999, o);
2369     object_locator_t oloc(SYNCLIENT_FIRST_POOL);
2370     SnapContext snapc;
2371     
2372     client->client_lock.Lock();
2373     utime_t start = ceph_clock_now();
2374     if (write) {
2375       dout(10) << "write to " << oid << dendl;
2376
2377       ObjectOperation m;
2378       OSDOp op;
2379       op.op.op = CEPH_OSD_OP_WRITE;
2380       op.op.extent.offset = 0;
2381       op.op.extent.length = osize;
2382       op.indata = bl;
2383       m.ops.push_back(op);
2384       client->objecter->mutate(oid, oloc, m, snapc,
2385                                ceph::real_clock::now(), 0,
2386                                new C_Ref(lock, cond, &unack));
2387     } else {
2388       dout(10) << "read from " << oid << dendl;
2389       bufferlist inbl;
2390       client->objecter->read(oid, oloc, 0, osize, CEPH_NOSNAP, &inbl, 0,
2391                              new C_Ref(lock, cond, &unack));
2392     }
2393     client->client_lock.Unlock();
2394
2395     lock.Lock();
2396     while (unack > 0) {
2397       dout(20) << "waiting for " << unack << " unack" << dendl;
2398       cond.Wait(lock);
2399     }
2400     lock.Unlock();
2401
2402     utime_t lat = ceph_clock_now();
2403     lat -= start;
2404   }
2405
2406   return 0;
2407 }
2408
2409
2410
2411
2412
2413 int SyntheticClient::read_random(string& fn, int size, int rdsize)   // size is in MB, wrsize in bytes
2414 {
2415   UserPerm perms = client->pick_my_perms();
2416   uint64_t chunks = (uint64_t)size * (uint64_t)(1024*1024) / (uint64_t)rdsize;
2417   int fd = client->open(fn.c_str(), O_RDWR, perms);
2418   dout(5) << "reading from " << fn << " fd " << fd << dendl;
2419
2420   if (fd < 0) return fd;
2421   int offset = 0;
2422   char * buf = NULL;
2423
2424   for (unsigned i=0; i<2000; i++) {
2425     if (time_to_stop()) break;
2426
2427     bool read=false;
2428
2429     time_t seconds;
2430     time( &seconds);
2431     srand(seconds);
2432
2433     // use rand instead ??
2434     double x = drand48();
2435
2436     // cleanup before call 'new'
2437     if (buf != NULL) {
2438         delete[] buf;
2439         buf = NULL;
2440     }
2441     if (x < 0.5) {
2442         buf = new char[rdsize]; 
2443         memset(buf, 1, rdsize);
2444         read=true;
2445     } else {
2446         buf = new char[rdsize+100];   // 1 MB
2447         memset(buf, 7, rdsize);
2448     }
2449     
2450     if (read) {
2451         offset=(rand())%(chunks+1);
2452         dout(2) << "reading block " << offset << "/" << chunks << dendl;
2453
2454         int r = client->read(fd, buf, rdsize, offset*rdsize);
2455         if (r < rdsize) {
2456           dout(1) << "read_file got r = " << r << ", probably end of file" << dendl;
2457         }
2458     } else {
2459       dout(2) << "writing block " << offset << "/" << chunks << dendl;
2460
2461       // fill buf with a 16 byte fingerprint
2462       // 64 bits : file offset
2463       // 64 bits : client id
2464       // = 128 bits (16 bytes)
2465
2466       offset=(rand())%(chunks+1);
2467       uint64_t *p = (uint64_t*)buf;
2468       while ((char*)p < buf + rdsize) {
2469         *p = offset*rdsize + (char*)p - buf;      
2470         p++;
2471         *p = client->get_nodeid().v;
2472         p++;
2473       }
2474
2475       client->write(fd, buf, rdsize,
2476                         offset*rdsize);
2477     }
2478
2479     // verify fingerprint
2480     if (read) {
2481       int bad = 0;
2482       int64_t *p = (int64_t*)buf;
2483       while ((char*)p + 32 < buf + rdsize) {
2484         int64_t readoff = *p;
2485         int64_t wantoff = offset*rdsize + (int64_t)((char*)p - buf);
2486         p++;
2487         int64_t readclient = *p;
2488         p++;
2489         if (readoff != wantoff || readclient != client->get_nodeid()) {
2490           if (!bad)
2491             dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
2492                     << ", should be offset " << wantoff << " client " << client->get_nodeid()
2493                     << dendl;
2494           bad++;
2495         }
2496       }
2497       if (bad) 
2498         dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << dendl;
2499     }
2500   }
2501   
2502   client->close(fd);
2503   delete[] buf;
2504
2505   return 0;
2506 }
2507
2508 int normdist(int min, int max, int stdev) /* specifies input values */
2509 {
2510   /* min: Minimum value; max: Maximum value; stdev: degree of deviation */
2511   
2512   //int min, max, stdev; {
2513   time_t seconds;
2514   time( &seconds);
2515   srand(seconds);
2516   
2517   int range, iterate, result;
2518   /* declare range, iterate and result as integers, to avoid the need for
2519      floating point math*/
2520   
2521   result = 0;
2522   /* ensure result is initialized to 0 */
2523   
2524   range = max -min;
2525   /* calculate range of possible values between the max and min values */
2526   
2527   iterate = range / stdev;
2528   /* this number of iterations ensures the proper shape of the resulting
2529      curve */
2530   
2531   stdev += 1; /* compensation for integer vs. floating point math */
2532   for (int c = iterate; c != 0; c--) /* loop through iterations */
2533     {
2534       //  result += (uniform (1, 100) * stdev) / 100; /* calculate and
2535       result += ( (rand()%100 + 1)  * stdev) / 100;
2536       // printf("result=%d\n", result );
2537     }
2538   printf("\n final result=%d\n", result );
2539   return result + min; /* send final result back */
2540 }
2541
2542 int SyntheticClient::read_random_ex(string& fn, int size, int rdsize)   // size is in MB, wrsize in bytes
2543 {
2544   uint64_t chunks = (uint64_t)size * (uint64_t)(1024*1024) / (uint64_t)rdsize;
2545   UserPerm perms = client->pick_my_perms();
2546   int fd = client->open(fn.c_str(), O_RDWR, perms);
2547   dout(5) << "reading from " << fn << " fd " << fd << dendl;
2548   
2549   if (fd < 0) return fd;
2550   int offset = 0;
2551   char * buf = NULL;
2552   
2553   for (unsigned i=0; i<2000; i++) {
2554     if (time_to_stop()) break;
2555     
2556     bool read=false;
2557     
2558     time_t seconds;
2559     time( &seconds);
2560     srand(seconds);
2561     
2562     // use rand instead ??
2563     double x = drand48();
2564     
2565     // cleanup before call 'new'
2566     if (buf != NULL) {
2567       delete[] buf;
2568       buf = NULL;
2569     }
2570     if (x < 0.5) {
2571       buf = new char[rdsize]; 
2572       memset(buf, 1, rdsize);
2573       read=true;
2574     } else {
2575       buf = new char[rdsize+100];   // 1 MB
2576       memset(buf, 7, rdsize);
2577     }
2578     
2579     if (read) {
2580       dout(2) << "reading block " << offset << "/" << chunks << dendl;
2581         
2582       int r = client->read(fd, buf, rdsize,
2583                              offset*rdsize);
2584       if (r < rdsize) {
2585         dout(1) << "read_file got r = " << r << ", probably end of file" << dendl;
2586       }
2587     } else {
2588         dout(2) << "writing block " << offset << "/" << chunks << dendl;
2589         
2590         // fill buf with a 16 byte fingerprint
2591         // 64 bits : file offset
2592         // 64 bits : client id
2593         // = 128 bits (16 bytes)
2594         
2595         int count = rand()%10;
2596         
2597         for ( int j=0;j<count; j++ ) {
2598           offset=(rand())%(chunks+1);
2599           uint64_t *p = (uint64_t*)buf;
2600           while ((char*)p < buf + rdsize) {
2601             *p = offset*rdsize + (char*)p - buf;      
2602             p++;
2603             *p = client->get_nodeid().v;
2604             p++;
2605           }
2606             
2607           client->write(fd, buf, rdsize, offset*rdsize);
2608         }
2609     }
2610     
2611     // verify fingerprint
2612     if (read) {
2613       int bad = 0;
2614       int64_t *p = (int64_t*)buf;
2615       while ((char*)p + 32 < buf + rdsize) {
2616         int64_t readoff = *p;
2617         int64_t wantoff = offset*rdsize + (int64_t)((char*)p - buf);
2618         p++;
2619         int64_t readclient = *p;
2620         p++;
2621         if (readoff != wantoff || readclient != client->get_nodeid()) { 
2622           if (!bad)
2623             dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
2624                     << ", should be offset " << wantoff << " client " << client->get_nodeid()
2625                     << dendl;
2626           bad++;
2627         }
2628       }
2629       if (bad) 
2630         dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << dendl;
2631     }
2632   }
2633   
2634   client->close(fd);
2635   delete[] buf;
2636
2637   return 0;
2638 }
2639
2640
2641 int SyntheticClient::random_walk(int num_req)
2642 {
2643   int left = num_req;
2644
2645   //dout(1) << "random_walk() will do " << left << " ops" << dendl;
2646
2647   init_op_dist();  // set up metadata op distribution
2648  
2649   UserPerm perms = client->pick_my_perms();
2650   while (left > 0) {
2651     left--;
2652
2653     if (time_to_stop()) break;
2654
2655     // ascend?
2656     if (cwd.depth() && !roll_die(::pow((double).9, (double)cwd.depth()))) {
2657       dout(DBL) << "die says up" << dendl;
2658       up();
2659       continue;
2660     }
2661
2662     // descend?
2663     if (roll_die(::pow((double).9,(double)cwd.depth())) && !subdirs.empty()) {
2664       string s = get_random_subdir();
2665       cwd.push_dentry( s );
2666       dout(DBL) << "cd " << s << " -> " << cwd << dendl;
2667       clear_dir();
2668       continue;
2669     }
2670
2671     int op = 0;
2672     filepath path;
2673
2674     if (contents.empty() && roll_die(.3)) {
2675       if (did_readdir) {
2676         dout(DBL) << "empty dir, up" << dendl;
2677         up();
2678       } else
2679         op = CEPH_MDS_OP_READDIR;
2680     } else {
2681       op = op_dist.sample();
2682     }
2683     //dout(DBL) << "op is " << op << dendl;
2684
2685     int r = 0;
2686
2687     // do op
2688     if (op == CEPH_MDS_OP_UNLINK) {
2689       if (contents.empty())
2690         op = CEPH_MDS_OP_READDIR;
2691       else 
2692         r = client->unlink(get_random_sub(), perms);   // will fail on dirs
2693     }
2694      
2695     if (op == CEPH_MDS_OP_RENAME) {
2696       if (contents.empty())
2697         op = CEPH_MDS_OP_READDIR;
2698       else {
2699         r = client->rename(get_random_sub(), make_sub("ren"), perms);
2700       }
2701     }
2702     
2703     if (op == CEPH_MDS_OP_MKDIR) {
2704       r = client->mkdir(make_sub("mkdir"), 0755, perms);
2705     }
2706     
2707     if (op == CEPH_MDS_OP_RMDIR) {
2708       if (!subdirs.empty())
2709         r = client->rmdir(get_random_subdir(), perms);
2710       else
2711         r = client->rmdir(cwd.c_str(), perms);     // will pbly fail
2712     }
2713     
2714     if (op == CEPH_MDS_OP_SYMLINK) {
2715     }
2716     /*
2717     if (op == CEPH_MDS_OP_CHMOD) {
2718       if (contents.empty())
2719         op = CEPH_MDS_OP_READDIR;
2720       else
2721         r = client->chmod(get_random_sub(), rand() & 0755, perms);
2722     }
2723     
2724     if (op == CEPH_MDS_OP_CHOWN) {
2725       if (contents.empty())         r = client->chown(cwd.c_str(), rand(), rand(), perms);
2726       else
2727         r = client->chown(get_random_sub(), rand(), rand(), perms);
2728     }
2729      
2730     if (op == CEPH_MDS_OP_UTIME) {
2731       struct utimbuf b;
2732       memset(&b, 1, sizeof(b));
2733       if (contents.empty()) 
2734         r = client->utime(cwd.c_str(), &b, perms);
2735       else
2736         r = client->utime(get_random_sub(), &b, perms);
2737     }
2738     */
2739     if (op == CEPH_MDS_OP_LINK) {
2740     }
2741     
2742     if (op == CEPH_MDS_OP_MKNOD) {
2743       r = client->mknod(make_sub("mknod"), 0644, perms);
2744     }
2745      
2746     if (op == CEPH_MDS_OP_OPEN) {
2747       if (contents.empty())
2748         op = CEPH_MDS_OP_READDIR;
2749       else {
2750         r = client->open(get_random_sub(), O_RDONLY, perms);
2751         if (r > 0) {
2752           assert(open_files.count(r) == 0);
2753           open_files.insert(r);
2754         }
2755       }
2756     }
2757
2758     /*if (op == CEPH_MDS_OP_RELEASE) {   // actually, close
2759       if (open_files.empty())
2760         op = CEPH_MDS_OP_STAT;
2761       else {
2762         int fh = get_random_fh();
2763         r = client->close( fh );
2764         if (r == 0) open_files.erase(fh);
2765       }
2766     }
2767     */
2768     
2769     if (op == CEPH_MDS_OP_GETATTR) {
2770       struct stat st;
2771       if (contents.empty()) {
2772         if (did_readdir) {
2773           if (roll_die(.1)) {
2774             dout(DBL) << "stat in empty dir, up" << dendl;
2775             up();
2776           } else {
2777             op = CEPH_MDS_OP_MKNOD;
2778           }
2779         } else
2780           op = CEPH_MDS_OP_READDIR;
2781       } else
2782         r = client->lstat(get_random_sub(), &st, perms);
2783     }
2784
2785     if (op == CEPH_MDS_OP_READDIR) {
2786       clear_dir();
2787       
2788       list<string> c;
2789       r = client->getdir(cwd.c_str(), c, perms);
2790       
2791       for (list<string>::iterator it = c.begin();
2792            it != c.end();
2793            ++it) {
2794         //dout(DBL) << " got " << *it << dendl;
2795         ceph_abort();
2796         /*contents[*it] = it->second;
2797         if (it->second &&
2798             S_ISDIR(it->second->st_mode)) 
2799           subdirs.insert(*it);
2800         */
2801       }
2802       
2803       did_readdir = true;
2804     }
2805       
2806     // errors?
2807     if (r < 0) {
2808       // reevaluate cwd.
2809       //while (cwd.depth()) {
2810       //if (client->lookup(cwd)) break;   // it's in the cache
2811         
2812       //dout(DBL) << "r = " << r << ", client doesn't have " << cwd << ", cd .." << dendl;
2813       dout(DBL) << "r = " << r << ", client may not have " << cwd << ", cd .." << dendl;
2814       up();
2815       //}      
2816     }
2817   }
2818
2819   // close files
2820   dout(DBL) << "closing files" << dendl;
2821   while (!open_files.empty()) {
2822     int fh = get_random_fh();
2823     int r = client->close( fh );
2824     if (r == 0) open_files.erase(fh);
2825   }
2826
2827   dout(DBL) << "done" << dendl;
2828   return 0;
2829 }
2830
2831
2832
2833
2834 void SyntheticClient::make_dir_mess(const char *basedir, int n)
2835 {
2836   UserPerm perms = client->pick_my_perms();
2837   vector<string> dirs;
2838   
2839   dirs.push_back(basedir);
2840   dirs.push_back(basedir);
2841   
2842   client->mkdir(basedir, 0755, perms);
2843
2844   // motivation:
2845   //  P(dir) ~ subdirs_of(dir) + 2
2846   // from 5-year metadata workload paper in fast'07
2847
2848   // create dirs
2849   for (int i=0; i<n; i++) {
2850     // pick a dir
2851     int k = rand() % dirs.size();
2852     string parent = dirs[k];
2853     
2854     // pick a name
2855     std::stringstream ss;
2856     ss << parent << "/" << i;
2857     string dir = ss.str();
2858
2859     // update dirs
2860     dirs.push_back(parent);
2861     dirs.push_back(dir);
2862     dirs.push_back(dir);
2863
2864     // do it
2865     client->mkdir(dir.c_str(), 0755, perms);
2866   }
2867     
2868   
2869 }
2870
2871
2872
2873 void SyntheticClient::foo()
2874 {
2875   UserPerm perms = client->pick_my_perms();
2876
2877   if (1) {
2878     // make 2 parallel dirs, link/unlink between them.
2879     char a[100], b[100];
2880     client->mkdir("/a", 0755, perms);
2881     client->mkdir("/b", 0755, perms);
2882     for (int i=0; i<10; i++) {
2883       snprintf(a, sizeof(a), "/a/%d", i);
2884       client->mknod(a, 0644, perms);
2885     }
2886     while (1) {
2887       for (int i=0; i<10; i++) {
2888         snprintf(a, sizeof(a), "/a/%d", i);
2889         snprintf(b, sizeof(b), "/b/%d", i);
2890         client->link(a, b, perms);
2891       }
2892       for (int i=0; i<10; i++) {
2893         snprintf(b, sizeof(b), "/b/%d", i);
2894         client->unlink(b, perms);
2895       }
2896     }
2897     return;
2898   }
2899   if (1) {
2900     // bug1.cpp
2901     const char *fn = "blah";
2902     char buffer[8192]; 
2903     client->unlink(fn, perms);
2904     int handle = client->open(fn, O_CREAT|O_RDWR, perms, S_IRWXU);
2905     assert(handle>=0);
2906     int r=client->write(handle,buffer,8192);
2907     assert(r>=0);
2908     r=client->close(handle);
2909     assert(r>=0);
2910          
2911     handle = client->open(fn, O_RDWR, perms); // open the same  file, it must have some data already
2912     assert(handle>=0);      
2913     r=client->read(handle,buffer,8192);
2914     assert(r==8192); //  THIS ASSERTION FAILS with disabled cache
2915     r=client->close(handle);
2916     assert(r>=0);
2917
2918     return;
2919   }
2920   if (1) {
2921     dout(0) << "first" << dendl;
2922     int fd = client->open("tester", O_WRONLY|O_CREAT, perms);
2923     client->write(fd, "hi there", 0, 8);
2924     client->close(fd);
2925     dout(0) << "sleep" << dendl;
2926     sleep(10);
2927     dout(0) << "again" << dendl;
2928     fd = client->open("tester", O_WRONLY|O_CREAT, perms);
2929     client->write(fd, "hi there", 0, 8);
2930     client->close(fd);
2931     return;    
2932   }
2933   if (1) {
2934     // open some files
2935     srand(0);
2936     for (int i=0; i<20; i++) {
2937       int s = 5;
2938       int a = rand() % s;
2939       int b = rand() % s;
2940       int c = rand() % s;
2941       char src[80];
2942       snprintf(src, sizeof(src), "syn.0.0/dir.%d/dir.%d/file.%d", a, b, c);
2943       //int fd = 
2944       client->open(src, O_RDONLY, perms);
2945     }
2946
2947     return;
2948   }
2949
2950   if (0) {
2951     // rename fun
2952     for (int i=0; i<100; i++) {
2953       int s = 5;
2954       int a = rand() % s;
2955       int b = rand() % s;
2956       int c = rand() % s;
2957       int d = rand() % s;
2958       int e = rand() % s;
2959       int f = rand() % s;
2960       char src[80];
2961       char dst[80];
2962       snprintf(src, sizeof(src), "syn.0.0/dir.%d/dir.%d/file.%d", a, b, c);
2963       snprintf(dst, sizeof(dst), "syn.0.0/dir.%d/dir.%d/file.%d", d, e, f);
2964       client->rename(src, dst, perms);
2965     }
2966     return;
2967   }
2968
2969   if (1) {
2970     // link fun
2971     srand(0);
2972     for (int i=0; i<100; i++) {
2973       int s = 5;
2974       int a = rand() % s;
2975       int b = rand() % s;
2976       int c = rand() % s;
2977       int d = rand() % s;
2978       int e = rand() % s;
2979       int f = rand() % s;
2980       char src[80];
2981       char dst[80];
2982       snprintf(src, sizeof(src), "syn.0.0/dir.%d/dir.%d/file.%d", a, b, c);
2983       snprintf(dst, sizeof(dst), "syn.0.0/dir.%d/dir.%d/newlink.%d", d, e, f);
2984       client->link(src, dst, perms);
2985     }
2986     srand(0);
2987     for (int i=0; i<100; i++) {
2988       int s = 5;
2989       int a = rand() % s;
2990       int b = rand() % s;
2991       int c = rand() % s;
2992       int d = rand() % s;
2993       int e = rand() % s;
2994       int f = rand() % s;
2995       char src[80];
2996       char dst[80];
2997       snprintf(src, sizeof(src), "syn.0.0/dir.%d/dir.%d/file.%d", a, b, c);
2998       snprintf(dst, sizeof(dst), "syn.0.0/dir.%d/dir.%d/newlink.%d", d, e, f);
2999       client->unlink(dst, perms);
3000     }
3001
3002     
3003     return;
3004   }
3005
3006   // link fun
3007   client->mknod("one", 0755, perms);
3008   client->mknod("two", 0755, perms);
3009   client->link("one", "three", perms);
3010   client->mkdir("dir", 0755, perms);
3011   client->link("two", "/dir/twolink", perms);
3012   client->link("dir/twolink", "four", perms);
3013   
3014   // unlink fun
3015   client->mknod("a", 0644, perms);
3016   client->unlink("a", perms);
3017   client->mknod("b", 0644, perms);
3018   client->link("b", "c", perms);
3019   client->unlink("c", perms);
3020   client->mkdir("d", 0755, perms);
3021   client->unlink("d", perms);
3022   client->rmdir("d", perms);
3023
3024   // rename fun
3025   client->mknod("p1", 0644, perms);
3026   client->mknod("p2", 0644, perms);
3027   client->rename("p1","p2", perms);
3028   client->mknod("p3", 0644, perms);
3029   client->rename("p3","p4", perms);
3030
3031   // check dest dir ambiguity thing
3032   client->mkdir("dir1", 0755, perms);
3033   client->mkdir("dir2", 0755, perms);
3034   client->rename("p2", "dir1/p2", perms);
3035   client->rename("dir1/p2", "dir2/p2", perms);
3036   client->rename("dir2/p2", "/p2", perms);
3037   
3038   // check primary+remote link merging
3039   client->link("p2","p2.l", perms);
3040   client->link("p4","p4.l", perms);
3041   client->rename("p2.l", "p2", perms);
3042   client->rename("p4", "p4.l", perms);
3043
3044   // check anchor updates
3045   client->mknod("dir1/a", 0644, perms);
3046   client->link("dir1/a", "da1", perms);
3047   client->link("dir1/a", "da2", perms);
3048   client->link("da2","da3", perms);
3049   client->rename("dir1/a", "dir2/a", perms);
3050   client->rename("dir2/a", "da2", perms);
3051   client->rename("da1", "da2", perms);
3052   client->rename("da2", "da3", perms);
3053
3054   // check directory renames
3055   client->mkdir("dir3", 0755, perms);
3056   client->mknod("dir3/asdf", 0644, perms);
3057   client->mkdir("dir4", 0755, perms);
3058   client->mkdir("dir5", 0755, perms);
3059   client->mknod("dir5/asdf", 0644, perms);
3060   client->rename("dir3", "dir4", perms); // ok
3061   client->rename("dir4", "dir5", perms); // fail
3062 }
3063
3064 int SyntheticClient::thrash_links(const char *basedir, int dirs, int files, int depth, int n)
3065 {
3066   dout(1) << "thrash_links " << basedir << " " << dirs << " " << files << " " << depth
3067           << " links " << n
3068           << dendl;
3069
3070   if (time_to_stop()) return 0;
3071
3072   UserPerm perms = client->pick_my_perms();
3073
3074   srand(0);
3075   if (1) {
3076     bool renames = true; // thrash renames too?
3077     for (int k=0; k<n; k++) {
3078       
3079       if (renames && rand() % 10 == 0) {
3080         // rename some directories.  whee!
3081         int dep = (rand() % depth) + 1;
3082         string src = basedir;
3083         {
3084           char t[80];
3085           for (int d=0; d<dep; d++) {
3086             int a = rand() % dirs;
3087             snprintf(t, sizeof(t), "/dir.%d", a);
3088             src += t;
3089           }
3090         }
3091         string dst = basedir;
3092         {
3093           char t[80];
3094           for (int d=0; d<dep; d++) {
3095             int a = rand() % dirs;
3096             snprintf(t, sizeof(t), "/dir.%d", a);
3097             dst += t;
3098           }
3099         }
3100         
3101         if (client->rename(dst.c_str(), "/tmp", perms) == 0) {
3102           client->rename(src.c_str(), dst.c_str(), perms);
3103           client->rename("/tmp", src.c_str(), perms);
3104         }
3105         continue;
3106       } 
3107       
3108       // pick a dest dir
3109       string src = basedir;
3110       {
3111         char t[80];
3112         for (int d=0; d<depth; d++) {
3113           int a = rand() % dirs;
3114           snprintf(t, sizeof(t), "/dir.%d", a);
3115           src += t;
3116         }
3117         int a = rand() % files;
3118         snprintf(t, sizeof(t), "/file.%d", a);
3119         src += t;
3120       }
3121       string dst = basedir;
3122       {
3123         char t[80];
3124         for (int d=0; d<depth; d++) {
3125           int a = rand() % dirs;
3126           snprintf(t, sizeof(t), "/dir.%d", a);
3127           dst += t;
3128         }
3129         int a = rand() % files;
3130         snprintf(t, sizeof(t), "/file.%d", a);
3131         dst += t;
3132       }
3133       
3134       int o = rand() % 4;
3135       switch (o) {
3136       case 0: 
3137         client->mknod(src.c_str(), 0755, perms);
3138         if (renames) client->rename(src.c_str(), dst.c_str(), perms);
3139         break;
3140       case 1: 
3141         client->mknod(src.c_str(), 0755, perms);
3142         client->unlink(dst.c_str(), perms);
3143         client->link(src.c_str(), dst.c_str(), perms); 
3144         break;
3145       case 2: client->unlink(src.c_str(), perms); break;
3146       case 3: client->unlink(dst.c_str(), perms); break;
3147         //case 4: client->mknod(src.c_str(), 0755, perms); break;
3148         //case 5: client->mknod(dst.c_str(), 0755, perms); break;
3149       }
3150     }
3151     return 0;
3152   }
3153
3154   if (1) {
3155     // now link shit up
3156     for (int i=0; i<n; i++) {
3157       if (time_to_stop()) return 0;
3158       
3159       char f[20];
3160       
3161       // pick a file
3162       string file = basedir;
3163       
3164       if (depth) {
3165         int d = rand() % (depth+1);
3166         for (int k=0; k<d; k++) {
3167           snprintf(f, sizeof(f), "/dir.%d", rand() % dirs);
3168           file += f;
3169         }
3170       }
3171       snprintf(f, sizeof(f), "/file.%d", rand() % files);
3172       file += f;
3173       
3174       // pick a dir for our link
3175       string ln = basedir;
3176       if (depth) {
3177         int d = rand() % (depth+1);
3178         for (int k=0; k<d; k++) {
3179           snprintf(f, sizeof(f), "/dir.%d", rand() % dirs);
3180           ln += f;
3181         }
3182       }
3183       snprintf(f, sizeof(f), "/ln.%d", i);
3184       ln += f;
3185       
3186       client->link(file.c_str(), ln.c_str(), perms);
3187     }
3188   }
3189   return 0;
3190 }
3191
3192
3193
3194
3195 void SyntheticClient::import_find(const char *base, const char *find, bool data)
3196 {
3197   dout(1) << "import_find " << base << " from " << find << " data=" << data << dendl;
3198
3199   /* use this to gather the static trace:
3200    *
3201    *  find . -exec ls -dilsn --time-style=+%s \{\} \;
3202    * or if it's wafl,
3203    *  find . -path ./.snapshot -prune -o -exec ls -dilsn --time-style=+%s \{\} \;
3204    *
3205    */
3206
3207   UserPerm process_perms = client->pick_my_perms();
3208
3209   if (base[0] != '-') 
3210     client->mkdir(base, 0755, process_perms);
3211
3212   ifstream f(find);
3213   assert(f.is_open());
3214   
3215   int dirnum = 0;
3216
3217   while (!f.eof()) {
3218     uint64_t ino;
3219     int dunno, nlink;
3220     string modestring;
3221     int uid, gid;
3222     off_t size;
3223     time_t mtime;
3224     string filename;
3225     f >> ino;
3226     if (f.eof()) break;
3227     f >> dunno;
3228     f >> modestring;
3229     f >> nlink;
3230     f >> uid;
3231     f >> gid;
3232     f >> size;
3233     f >> mtime;
3234     f.seekg(1, ios::cur);
3235     getline(f, filename);
3236     UserPerm perms(uid, gid);
3237
3238     // ignore "."
3239     if (filename == ".") continue;
3240
3241     // remove leading ./
3242     assert(filename[0] == '.' && filename[1] == '/');
3243     filename = filename.substr(2);
3244
3245     // new leading dir?
3246     int sp = filename.find("/");
3247     if (sp < 0) dirnum++;
3248
3249     //dout(0) << "leading dir " << filename << " " << dirnum << dendl;
3250     if (dirnum % num_client != client->get_nodeid()) {
3251       dout(20) << "skipping leading dir " << dirnum << " " << filename << dendl;
3252       continue;
3253     }
3254
3255     // parse the mode
3256     assert(modestring.length() == 10);
3257     mode_t mode = 0;
3258     switch (modestring[0]) {
3259     case 'd': mode |= S_IFDIR; break;
3260     case 'l': mode |= S_IFLNK; break;
3261     default:
3262     case '-': mode |= S_IFREG; break;
3263     }
3264     if (modestring[1] == 'r') mode |= 0400;
3265     if (modestring[2] == 'w') mode |= 0200;
3266     if (modestring[3] == 'x') mode |= 0100;
3267     if (modestring[4] == 'r') mode |= 040;
3268     if (modestring[5] == 'w') mode |= 020;
3269     if (modestring[6] == 'x') mode |= 010;
3270     if (modestring[7] == 'r') mode |= 04;
3271     if (modestring[8] == 'w') mode |= 02;
3272     if (modestring[9] == 'x') mode |= 01;
3273
3274     dout(20) << " mode " << modestring << " to " << oct << mode << dec << dendl;
3275
3276     if (S_ISLNK(mode)) {
3277       // target vs destination
3278       int pos = filename.find(" -> ");
3279       assert(pos > 0);
3280       string link;
3281       if (base[0] != '-') {
3282         link = base;
3283         link += "/";
3284       }
3285       link += filename.substr(0, pos);
3286       string target;
3287       if (filename[pos+4] == '/') {
3288         if (base[0] != '-') 
3289           target = base;
3290         target += filename.substr(pos + 4);
3291       } else {
3292         target = filename.substr(pos + 4);
3293       }
3294       dout(10) << "symlink from '" << link << "' -> '" << target << "'" << dendl;
3295       client->symlink(target.c_str(), link.c_str(), perms);
3296     } else {
3297       string f;
3298       if (base[0] != '-') {
3299         f = base;
3300         f += "/";
3301       }
3302       f += filename;
3303       if (S_ISDIR(mode)) {
3304         client->mkdir(f.c_str(), mode, perms);
3305       } else {
3306         int fd = client->open(f.c_str(), O_WRONLY|O_CREAT, perms, mode & 0777);
3307         assert(fd > 0); 
3308         if (data) {
3309           client->write(fd, "", 0, size);
3310         } else {
3311           client->truncate(f.c_str(), size, perms);
3312         }
3313         client->close(fd);
3314
3315         //client->chmod(f.c_str(), mode & 0777, perms, process_perms);
3316         client->chown(f.c_str(), uid, gid, process_perms);
3317
3318         struct utimbuf ut;
3319         ut.modtime = mtime;
3320         ut.actime = mtime;
3321         client->utime(f.c_str(), &ut, perms);
3322       }
3323     }
3324   }
3325   
3326
3327 }
3328
3329
3330 int SyntheticClient::lookup_hash(inodeno_t ino, inodeno_t dirino,
3331                                  const char *name, const UserPerm& perms)
3332 {
3333   int r = client->lookup_hash(ino, dirino, name, perms);
3334   dout(0) << "lookup_hash(" << ino << ", #" << dirino << "/" << name << ") = " << r << dendl;
3335   return r;
3336 }
3337
3338 int SyntheticClient::lookup_ino(inodeno_t ino, const UserPerm& perms)
3339 {
3340   int r = client->lookup_ino(ino, perms);
3341   dout(0) << "lookup_ino(" << ino << ") = " << r << dendl;
3342   return r;
3343 }
3344
3345 int SyntheticClient::chunk_file(string &filename)
3346 {
3347   UserPerm perms = client->pick_my_perms();
3348   int fd = client->open(filename.c_str(), O_RDONLY, perms);
3349   if (fd < 0)
3350     return fd;
3351
3352   struct stat st;
3353   int ret = client->fstat(fd, &st, perms);
3354   if (ret < 0) {
3355     client->close(fd);
3356     return ret;
3357   }
3358   uint64_t size = st.st_size;
3359   dout(0) << "file " << filename << " size is " << size << dendl;
3360
3361   inode_t inode;
3362   memset(&inode, 0, sizeof(inode));
3363   inode.ino = st.st_ino;
3364   ret = client->fdescribe_layout(fd, &inode.layout);
3365   assert(ret == 0); // otherwise fstat did a bad thing
3366
3367   uint64_t pos = 0;
3368   bufferlist from_before;
3369   while (pos < size) {
3370     int get = MIN(size-pos, 1048576);
3371
3372     Mutex flock("synclient chunk_file lock");
3373     Cond cond;
3374     bool done;
3375     bufferlist bl;
3376     
3377     flock.Lock();
3378     Context *onfinish = new C_SafeCond(&flock, &cond, &done);
3379     client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
3380                         onfinish);
3381     while (!done)
3382       cond.Wait(flock);
3383     flock.Unlock();
3384
3385     dout(0) << "got " << bl.length() << " bytes at " << pos << dendl;
3386     
3387     if (from_before.length()) {
3388       dout(0) << " including bit from previous block" << dendl;
3389       pos -= from_before.length();
3390       from_before.claim_append(bl);
3391       bl.swap(from_before);
3392     }      
3393
3394     // ....
3395
3396     // keep last 32 bytes around
3397     from_before.clear();
3398     from_before.substr_of(bl, bl.length()-32, 32);
3399
3400     pos += bl.length();
3401   }
3402
3403   client->close(fd);
3404   return 0;
3405 }
3406
3407
3408
3409 void SyntheticClient::mksnap(const char *base, const char *name, const UserPerm& perms)
3410 {
3411   client->mksnap(base, name, perms);
3412 }
3413
3414 void SyntheticClient::rmsnap(const char *base, const char *name, const UserPerm& perms)
3415 {
3416   client->rmsnap(base, name, perms);
3417 }
3418
3419 void SyntheticClient::mksnapfile(const char *dir)
3420 {
3421   UserPerm perms = client->pick_my_perms();
3422   client->mkdir(dir, 0755, perms);
3423
3424   string f = dir;
3425   f += "/foo";
3426   int fd = client->open(f.c_str(), O_WRONLY|O_CREAT|O_TRUNC, perms);
3427
3428   char buf[1048576*4];
3429   client->write(fd, buf, sizeof(buf), 0);
3430   client->fsync(fd, true);
3431   client->close(fd);
3432   
3433   string s = dir;
3434   s += "/.snap/1";
3435   client->mkdir(s.c_str(), 0755, perms);
3436
3437   fd = client->open(f.c_str(), O_WRONLY, perms);
3438   client->write(fd, buf, 1048576*2, 1048576);
3439   client->fsync(fd, true);
3440   client->close(fd);
3441 }