Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / Thread.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2011 New Dream Network
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "include/compat.h"
16 #include "common/Thread.h"
17 #include "common/code_environment.h"
18 #include "common/debug.h"
19 #include "common/signal.h"
20 #include "common/io_priority.h"
21
22 #ifdef HAVE_SCHED
23 #include <sched.h>
24 #endif
25
26 static int _set_affinity(int id)
27 {
28 #ifdef HAVE_SCHED
29   if (id >= 0 && id < CPU_SETSIZE) {
30     cpu_set_t cpuset;
31     CPU_ZERO(&cpuset);
32
33     CPU_SET(id, &cpuset);
34
35     if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
36       return -errno;
37     /* guaranteed to take effect immediately */
38     sched_yield();
39   }
40 #endif
41   return 0;
42 }
43
44 Thread::Thread()
45   : thread_id(0),
46     pid(0),
47     ioprio_class(-1),
48     ioprio_priority(-1),
49     cpuid(-1),
50     thread_name(NULL)
51 {
52 }
53
54 Thread::~Thread()
55 {
56 }
57
58 void *Thread::_entry_func(void *arg) {
59   void *r = ((Thread*)arg)->entry_wrapper();
60   return r;
61 }
62
63 void *Thread::entry_wrapper()
64 {
65   int p = ceph_gettid(); // may return -ENOSYS on other platforms
66   if (p > 0)
67     pid = p;
68   if (pid &&
69       ioprio_class >= 0 &&
70       ioprio_priority >= 0) {
71     ceph_ioprio_set(IOPRIO_WHO_PROCESS,
72                     pid,
73                     IOPRIO_PRIO_VALUE(ioprio_class, ioprio_priority));
74   }
75   if (pid && cpuid >= 0)
76     _set_affinity(cpuid);
77
78   ceph_pthread_setname(pthread_self(), thread_name);
79   return entry();
80 }
81
82 const pthread_t &Thread::get_thread_id() const
83 {
84   return thread_id;
85 }
86
87 bool Thread::is_started() const
88 {
89   return thread_id != 0;
90 }
91
92 bool Thread::am_self() const
93 {
94   return (pthread_self() == thread_id);
95 }
96
97 int Thread::kill(int signal)
98 {
99   if (thread_id)
100     return pthread_kill(thread_id, signal);
101   else
102     return -EINVAL;
103 }
104
105 int Thread::try_create(size_t stacksize)
106 {
107   pthread_attr_t *thread_attr = NULL;
108   pthread_attr_t thread_attr_loc;
109   
110   stacksize &= CEPH_PAGE_MASK;  // must be multiple of page
111   if (stacksize) {
112     thread_attr = &thread_attr_loc;
113     pthread_attr_init(thread_attr);
114     pthread_attr_setstacksize(thread_attr, stacksize);
115   }
116
117   int r;
118
119   // The child thread will inherit our signal mask.  Set our signal mask to
120   // the set of signals we want to block.  (It's ok to block signals more
121   // signals than usual for a little while-- they will just be delivered to
122   // another thread or delieverd to this thread later.)
123   sigset_t old_sigset;
124   if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
125     block_signals(NULL, &old_sigset);
126   }
127   else {
128     int to_block[] = { SIGPIPE , 0 };
129     block_signals(to_block, &old_sigset);
130   }
131   r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
132   restore_sigset(&old_sigset);
133
134   if (thread_attr) {
135     pthread_attr_destroy(thread_attr);  
136   }
137
138   return r;
139 }
140
141 void Thread::create(const char *name, size_t stacksize)
142 {
143   assert(strlen(name) < 16);
144   thread_name = name;
145
146   int ret = try_create(stacksize);
147   if (ret != 0) {
148     char buf[256];
149     snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create "
150              "failed with error %d", ret);
151     dout_emergency(buf);
152     assert(ret == 0);
153   }
154 }
155
156 int Thread::join(void **prval)
157 {
158   if (thread_id == 0) {
159     assert("join on thread that was never started" == 0);
160     return -EINVAL;
161   }
162
163   int status = pthread_join(thread_id, prval);
164   if (status != 0) {
165     char buf[256];
166     snprintf(buf, sizeof(buf), "Thread::join(): pthread_join "
167              "failed with error %d\n", status);
168     dout_emergency(buf);
169     assert(status == 0);
170   }
171
172   thread_id = 0;
173   return status;
174 }
175
176 int Thread::detach()
177 {
178   return pthread_detach(thread_id);
179 }
180
181 int Thread::set_ioprio(int cls, int prio)
182 {
183   // fixme, maybe: this can race with create()
184   ioprio_class = cls;
185   ioprio_priority = prio;
186   if (pid && cls >= 0 && prio >= 0)
187     return ceph_ioprio_set(IOPRIO_WHO_PROCESS,
188                            pid,
189                            IOPRIO_PRIO_VALUE(cls, prio));
190   return 0;
191 }
192
193 int Thread::set_affinity(int id)
194 {
195   int r = 0;
196   cpuid = id;
197   if (pid && ceph_gettid() == pid)
198     r = _set_affinity(id);
199   return r;
200 }