#include "../../include/linux/libcfs/libcfs.h"
-
#include "../include/lustre_dlm.h"
#include "../include/lustre_net.h"
#include "../include/lustre/lustre_user.h"
#include "../include/lustre_param.h"
#include "../include/lustre_fid.h"
#include "../include/obd_class.h"
+#include "../include/obd.h"
#include "osc_internal.h"
#include "osc_cl_internal.h"
+atomic_t osc_pool_req_count;
+unsigned int osc_reqpool_maxreqcount;
+struct ptlrpc_request_pool *osc_rq_pool;
+
+/* max memory used for request pool, unit is MB */
+static unsigned int osc_reqpool_mem_max = 5;
+module_param(osc_reqpool_mem_max, uint, 0444);
+
struct osc_brw_async_args {
struct obdo *aa_oa;
int aa_requested_nob;
struct client_obd *aa_cli;
struct list_head aa_oaps;
struct list_head aa_exts;
- struct obd_capa *aa_ocapa;
struct cl_req *aa_clerq;
};
return lmm_size;
if (*lmmp != NULL && lsm == NULL) {
- OBD_FREE(*lmmp, lmm_size);
+ kfree(*lmmp);
*lmmp = NULL;
return 0;
} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
}
if (*lmmp == NULL) {
- OBD_ALLOC(*lmmp, lmm_size);
- if (*lmmp == NULL)
+ *lmmp = kzalloc(lmm_size, GFP_NOFS);
+ if (!*lmmp)
return -ENOMEM;
}
return lsm_size;
if (*lsmp != NULL && lmm == NULL) {
- OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
- OBD_FREE(*lsmp, lsm_size);
+ kfree((*lsmp)->lsm_oinfo[0]);
+ kfree(*lsmp);
*lsmp = NULL;
return 0;
}
if (*lsmp == NULL) {
- OBD_ALLOC(*lsmp, lsm_size);
+ *lsmp = kzalloc(lsm_size, GFP_NOFS);
if (unlikely(*lsmp == NULL))
return -ENOMEM;
- OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+ (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
+ GFP_NOFS);
if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
- OBD_FREE(*lsmp, lsm_size);
+ kfree(*lsmp);
return -ENOMEM;
}
loi_init((*lsmp)->lsm_oinfo[0]);
return lsm_size;
}
-static inline void osc_pack_capa(struct ptlrpc_request *req,
- struct ost_body *body, void *capa)
-{
- struct obd_capa *oc = (struct obd_capa *)capa;
- struct lustre_capa *c;
-
- if (!capa)
- return;
-
- c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- LASSERT(c);
- capa_cpy(c, oc);
- body->oa.o_valid |= OBD_MD_FLOSSCAPA;
- DEBUG_CAPA(D_SEC, c, "pack");
-}
-
static inline void osc_pack_req_body(struct ptlrpc_request *req,
struct obd_info *oinfo)
{
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
oinfo->oi_oa);
- osc_pack_capa(req, body, oinfo->oi_capa);
-}
-
-static inline void osc_set_capa_size(struct ptlrpc_request *req,
- const struct req_msg_field *field,
- struct obd_capa *oc)
-{
- if (oc == NULL)
- req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
- else
- /* it is already calculated as sizeof struct obd_capa */
- ;
}
static int osc_getattr_interpret(const struct lu_env *env,
{
struct ptlrpc_request *req;
struct osc_async_args *aa;
- int rc;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
if (rc) {
ptlrpc_request_free(req);
struct obd_info *oinfo)
{
struct ptlrpc_request *req;
- struct ost_body *body;
- int rc;
+ struct ost_body *body;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
if (rc) {
ptlrpc_request_free(req);
struct obd_info *oinfo, struct obd_trans_info *oti)
{
struct ptlrpc_request *req;
- struct ost_body *body;
- int rc;
+ struct ost_body *body;
+ int rc;
LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
if (rc) {
ptlrpc_request_free(req);
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req;
struct osc_setattr_args *sa;
- int rc;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
if (rc) {
ptlrpc_request_free(req);
/* do mds to ost setattr asynchronously */
if (!rqset) {
/* Do not wait for response. */
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
} else {
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_setattr_interpret;
- CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+ CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
sa = ptlrpc_req_async_args(req);
sa->sa_oa = oinfo->oi_oa;
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
else
ptlrpc_set_add_req(rqset, req);
}
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
struct ptlrpc_request *req;
- struct ost_body *body;
- struct lov_stripe_md *lsm;
- int rc;
+ struct ost_body *body;
+ struct lov_stripe_md *lsm;
+ int rc;
LASSERT(oa);
LASSERT(ea);
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req;
struct osc_setattr_args *sa;
- struct ost_body *body;
- int rc;
+ struct ost_body *body;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
if (rc) {
ptlrpc_request_free(req);
LASSERT(body);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
oinfo->oi_oa);
- osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
- CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+ CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
sa = ptlrpc_req_async_args(req);
- sa->sa_oa = oinfo->oi_oa;
+ sa->sa_oa = oinfo->oi_oa;
sa->sa_upcall = upcall;
sa->sa_cookie = cookie;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
else
ptlrpc_set_add_req(rqset, req);
body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL) {
- CERROR ("can't unpack ost_body\n");
+ CERROR("can't unpack ost_body\n");
rc = -EPROTO;
goto out;
}
struct ptlrpc_request_set *rqset)
{
struct ptlrpc_request *req;
- struct ost_body *body;
+ struct ost_body *body;
struct osc_fsync_args *fa;
- int rc;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
if (req == NULL)
return -ENOMEM;
- osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
if (rc) {
ptlrpc_request_free(req);
LASSERT(body);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
oinfo->oi_oa);
- osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_sync_interpret;
fa->fa_cookie = cookie;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
else
ptlrpc_set_add_req(rqset, req);
* cookies to the MDS after committing destroy transactions. */
static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa, struct lov_stripe_md *ea,
- struct obd_trans_info *oti, struct obd_export *md_export,
- void *capa)
+ struct obd_trans_info *oti, struct obd_export *md_export)
{
- struct client_obd *cli = &exp->exp_obd->u.cli;
+ struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *req;
- struct ost_body *body;
+ struct ost_body *body;
LIST_HEAD(cancels);
int rc, count;
return -ENOMEM;
}
- osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
0, &cancels, count);
if (rc) {
LASSERT(body);
lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
- osc_pack_capa(req, body, (struct obd_capa *)capa);
ptlrpc_request_set_replen(req);
/* If osc_destroy is for destroying the unlink orphan,
}
/* Do not wait for response */
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
return 0;
}
LASSERT(body);
osc_update_grant(cli, body);
out:
- OBDO_FREE(oa);
+ kmem_cache_free(obdo_cachep, oa);
return rc;
}
int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
{
- int rc = 0;
+ int rc = 0;
struct ost_body *body;
client_obd_list_lock(&cli->cl_loi_list_lock);
}
client_obd_list_unlock(&cli->cl_loi_list_lock);
- OBD_ALLOC_PTR(body);
+ body = kzalloc(sizeof(*body), GFP_NOFS);
if (!body)
return -ENOMEM;
sizeof(*body), body, NULL);
if (rc != 0)
__osc_update_grant(cli, body->oa.o_grant);
- OBD_FREE_PTR(body);
+ kfree(body);
return rc;
}
if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
client->cl_avail_grant > brw_size)
return 1;
- else
- osc_update_next_shrink(client);
+
+ osc_update_next_shrink(client);
}
return 0;
}
/* skip bytes read OK */
while (nob_read > 0) {
- LASSERT (page_count > 0);
+ LASSERT(page_count > 0);
if (pga[i]->count > nob_read) {
/* EOF inside this page */
int requested_nob, int niocount,
u32 page_count, struct brw_page **pga)
{
- int i;
- __u32 *remote_rcs;
+ int i;
+ __u32 *remote_rcs;
remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
sizeof(*remote_rcs) *
}
static u32 osc_checksum_bulk(int nob, u32 pg_count,
- struct brw_page **pga, int opc,
- cksum_type_t cksum_type)
+ struct brw_page **pga, int opc,
+ cksum_type_t cksum_type)
{
- __u32 cksum;
- int i = 0;
- struct cfs_crypto_hash_desc *hdesc;
- unsigned int bufsize;
- int err;
- unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
+ __u32 cksum;
+ int i = 0;
+ struct cfs_crypto_hash_desc *hdesc;
+ unsigned int bufsize;
+ int err;
+ unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
LASSERT(pg_count > 0);
OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
unsigned char *ptr = kmap(pga[i]->pg);
int off = pga[i]->off & ~CFS_PAGE_MASK;
+
memcpy(ptr + off, "bad1", min(4, nob));
kunmap(pga[i]->pg);
}
struct lov_stripe_md *lsm, u32 page_count,
struct brw_page **pga,
struct ptlrpc_request **reqp,
- struct obd_capa *ocapa, int reserve,
+ int reserve,
int resend)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
- struct ost_body *body;
- struct obd_ioobj *ioobj;
- struct niobuf_remote *niobuf;
+ struct ost_body *body;
+ struct obd_ioobj *ioobj;
+ struct niobuf_remote *niobuf;
int niocount, i, requested_nob, opc, rc;
struct osc_brw_async_args *aa;
- struct req_capsule *pill;
+ struct req_capsule *pill;
struct brw_page *pg_prev;
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
if ((cmd & OBD_BRW_WRITE) != 0) {
opc = OST_WRITE;
req = ptlrpc_request_alloc_pool(cli->cl_import,
- cli->cl_import->imp_rq_pool,
+ osc_rq_pool,
&RQF_OST_BRW_WRITE);
} else {
opc = OST_READ;
sizeof(*ioobj));
req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
niocount * sizeof(*niobuf));
- osc_set_capa_size(req, &RMF_CAPA1, ocapa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
if (rc) {
* "max - 1" for old client compatibility sending "0", and also so the
* the actual maximum is a power-of-two number, not one less. LU-1431 */
ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
- osc_pack_capa(req, body, ocapa);
LASSERT(page_count > 0);
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
niobuf->len += pg->count;
} else {
niobuf->offset = pg->off;
- niobuf->len = pg->count;
- niobuf->flags = pg->flag;
+ niobuf->len = pg->count;
+ niobuf->flags = pg->flag;
}
pg_prev = pg;
}
aa->aa_ppga = pga;
aa->aa_cli = cli;
INIT_LIST_HEAD(&aa->aa_oaps);
- if (ocapa && reserve)
- aa->aa_ocapa = capa_get(ocapa);
*reqp = req;
return 0;
}
if (rc != req->rq_bulk->bd_nob_transferred) {
- CERROR ("Unexpected rc %d (%d transferred)\n",
+ CERROR("Unexpected rc %d (%d transferred)\n",
rc, req->rq_bulk->bd_nob_transferred);
return -EPROTO;
}
if (body->oa.o_valid & OBD_MD_FLCKSUM) {
static int cksum_counter;
- __u32 server_cksum = body->oa.o_cksum;
- char *via;
- char *router;
+ __u32 server_cksum = body->oa.o_cksum;
+ char *via = "";
+ char *router = "";
cksum_type_t cksum_type;
- cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
+ cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
body->oa.o_flags : 0);
client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
aa->aa_ppga, OST_READ,
cksum_type);
- if (peer->nid == req->rq_bulk->bd_sender) {
- via = router = "";
- } else {
+ if (peer->nid != req->rq_bulk->bd_sender) {
via = " via ";
router = libcfs_nid2str(req->rq_bulk->bd_sender);
}
"redo for recoverable error %d", rc);
rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
- OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
+ OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
aa->aa_cli, aa->aa_oa,
NULL /* lsm unused by osc currently */,
aa->aa_page_count, aa->aa_ppga,
- &new_req, aa->aa_ocapa, 0, 1);
+ &new_req, 0, 1);
if (rc)
return rc;
/* cap resend delay to the current request timeout, this is similar to
* what ptlrpc does (see after_reply()) */
if (aa->aa_resends > new_req->rq_timeout)
- new_req->rq_sent = get_seconds() + new_req->rq_timeout;
+ new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
else
- new_req->rq_sent = get_seconds() + aa->aa_resends;
+ new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
new_req->rq_generation_set = 1;
new_req->rq_import_generation = request->rq_import_generation;
}
}
- new_aa->aa_ocapa = aa->aa_ocapa;
- aa->aa_ocapa = NULL;
-
/* XXX: This code will run into problem if we're going to support
* to add a series of BRW RPCs into a self-defined ptlrpc_request_set
* and wait for all of them to be finished. We should inherit request
* set from old request. */
- ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
+ ptlrpcd_add_req(new_req);
DEBUG_REQ(D_INFO, new_req, "new request");
return 0;
static void osc_release_ppga(struct brw_page **ppga, u32 count)
{
LASSERT(ppga != NULL);
- OBD_FREE(ppga, sizeof(*ppga) * count);
+ kfree(ppga);
}
static int brw_interpret(const struct lu_env *env,
struct osc_brw_async_args *aa = data;
struct osc_extent *ext;
struct osc_extent *tmp;
- struct cl_object *obj = NULL;
+ struct cl_object *obj = NULL;
struct client_obd *cli = aa->aa_cli;
rc = osc_brw_fini_request(req, rc);
rc = -EIO;
}
- if (aa->aa_ocapa) {
- capa_put(aa->aa_ocapa);
- aa->aa_ocapa = NULL;
- }
-
list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
if (obj == NULL && rc == 0) {
obj = osc2cl(ext->oe_obj);
}
cl_object_put(env, obj);
}
- OBDO_FREE(aa->aa_oa);
+ kmem_cache_free(obdo_cachep, aa->aa_oa);
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
osc_wake_cache_waiters(cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
- osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ osc_io_unplug(env, cli, NULL);
return rc;
}
* Extents in the list must be in OES_RPC state.
*/
int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
- struct list_head *ext_list, int cmd, pdl_policy_t pol)
-{
- struct ptlrpc_request *req = NULL;
- struct osc_extent *ext;
- struct brw_page **pga = NULL;
- struct osc_brw_async_args *aa = NULL;
- struct obdo *oa = NULL;
- struct osc_async_page *oap;
- struct osc_async_page *tmp;
- struct cl_req *clerq = NULL;
- enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
- CRT_READ;
- struct ldlm_lock *lock = NULL;
- struct cl_req_attr *crattr = NULL;
- u64 starting_offset = OBD_OBJECT_EOF;
- u64 ending_offset = 0;
- int mpflag = 0;
- int mem_tight = 0;
- int page_count = 0;
- int i;
- int rc;
- struct ost_body *body;
+ struct list_head *ext_list, int cmd)
+{
+ struct ptlrpc_request *req = NULL;
+ struct osc_extent *ext;
+ struct brw_page **pga = NULL;
+ struct osc_brw_async_args *aa = NULL;
+ struct obdo *oa = NULL;
+ struct osc_async_page *oap;
+ struct osc_async_page *tmp;
+ struct cl_req *clerq = NULL;
+ enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
+ struct ldlm_lock *lock = NULL;
+ struct cl_req_attr *crattr = NULL;
+ u64 starting_offset = OBD_OBJECT_EOF;
+ u64 ending_offset = 0;
+ int mpflag = 0;
+ int mem_tight = 0;
+ int page_count = 0;
+ int i;
+ int rc;
+ struct ost_body *body;
LIST_HEAD(rpc_list);
LASSERT(!list_empty(ext_list));
if (mem_tight)
mpflag = cfs_memory_pressure_get_and_set();
- OBD_ALLOC(crattr, sizeof(*crattr));
- if (crattr == NULL) {
+ crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
+ if (!crattr) {
rc = -ENOMEM;
goto out;
}
- OBD_ALLOC(pga, sizeof(*pga) * page_count);
+ pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
if (pga == NULL) {
rc = -ENOMEM;
goto out;
}
- OBDO_ALLOC(oa);
+ oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
if (oa == NULL) {
rc = -ENOMEM;
goto out;
i = 0;
list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
struct cl_page *page = oap2cl_page(oap);
+
if (clerq == NULL) {
clerq = cl_req_alloc(env, page, crt,
1 /* only 1-object rpcs for now */);
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
- pga, &req, crattr->cra_capa, 1, 0);
+ pga, &req, 1, 0);
if (rc != 0) {
CERROR("prep_req failed: %d\n", rc);
goto out;
page_count, aa, cli->cl_r_in_flight,
cli->cl_w_in_flight);
- /* XXX: Maybe the caller can check the RPC bulk descriptor to
- * see which CPU/NUMA node the majority of pages were allocated
- * on, and try to assign the async RPC to the CPU core
- * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
- *
- * But on the other hand, we expect that multiple ptlrpcd
- * threads and the initial write sponsor can run in parallel,
- * especially when data checksum is enabled, which is CPU-bound
- * operation and single ptlrpcd thread cannot process in time.
- * So more ptlrpcd threads sharing BRW load
- * (with PDL_POLICY_ROUND) seems better.
- */
- ptlrpcd_add_req(req, pol, -1);
+ ptlrpcd_add_req(req);
rc = 0;
out:
if (mem_tight != 0)
cfs_memory_pressure_restore(mpflag);
- if (crattr != NULL) {
- capa_put(crattr->cra_capa);
- OBD_FREE(crattr, sizeof(*crattr));
- }
+ kfree(crattr);
if (rc != 0) {
LASSERT(req == NULL);
if (oa)
- OBDO_FREE(oa);
- if (pga)
- OBD_FREE(pga, sizeof(*pga) * page_count);
+ kmem_cache_free(obdo_cachep, oa);
+ kfree(pga);
/* this should happen rarely and is pretty bad, it makes the
* pending list not follow the dirty order */
while (!list_empty(ext_list)) {
/* The request was created before ldlm_cli_enqueue call. */
if (rc == ELDLM_LOCK_ABORTED) {
struct ldlm_reply *rep;
+
rep = req_capsule_server_get(&req->rq_pill,
&RMF_DLM_REP);
ldlm_lock_decref(lockh, mode);
LDLM_LOCK_PUT(matched);
return -ECANCELED;
- } else if (osc_set_lock_data_with_check(matched, einfo)) {
+ }
+
+ if (osc_set_lock_data_with_check(matched, einfo)) {
*flags |= LDLM_FL_LVB_READY;
/* addref the lock only if not async requests and PW
* lock is matched whereas we asked for PR. */
ldlm_lock_decref(lockh, einfo->ei_mode);
LDLM_LOCK_PUT(matched);
return ELDLM_OK;
- } else {
- ldlm_lock_decref(lockh, mode);
- LDLM_LOCK_PUT(matched);
}
+
+ ldlm_lock_decref(lockh, mode);
+ LDLM_LOCK_PUT(matched);
}
no_match:
if (intent) {
LIST_HEAD(cancels);
+
req = ptlrpc_request_alloc(class_exp2cliimp(exp),
&RQF_LDLM_ENQUEUE_LVB);
if (req == NULL)
if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
+
CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
aa->oa_ei = einfo;
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_enqueue_interpret;
if (rqset == PTLRPCD_SET)
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
else
ptlrpc_set_add_req(rqset, req);
} else if (intent) {
struct obd_info *oinfo, __u64 max_age,
struct ptlrpc_request_set *rqset)
{
- struct obd_device *obd = class_exp2obd(exp);
+ struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
struct osc_async_args *aa;
- int rc;
+ int rc;
/* We could possibly pass max_age in the request (as an absolute
* timestamp or a "seconds.usec ago") so the target can avoid doing
static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
struct obd_statfs *osfs, __u64 max_age, __u32 flags)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct obd_statfs *msfs;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct obd_statfs *msfs;
struct ptlrpc_request *req;
- struct obd_import *imp = NULL;
+ struct obd_import *imp = NULL;
int rc;
/*Since the request might also come from lprocfs, so we need
* because lov_user_md_vX and lov_mds_md_vX have the same size */
if (lum.lmm_stripe_count > 0) {
lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
- OBD_ALLOC(lumk, lum_size);
+ lumk = kzalloc(lum_size, GFP_NOFS);
if (!lumk)
return -ENOMEM;
rc = -EFAULT;
if (lumk != &lum)
- OBD_FREE(lumk, lum_size);
+ kfree(lumk);
return rc;
}
-
static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void *uarg)
{
buf = NULL;
len = 0;
- if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
+ if (obd_ioctl_getdata(&buf, &len, uarg)) {
err = -EINVAL;
goto out;
}
memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
- err = copy_to_user((void *)uarg, buf, len);
+ err = copy_to_user(uarg, buf, len);
if (err)
err = -EFAULT;
obd_ioctl_freedata(buf, len);
data->ioc_offset);
goto out;
case OBD_IOC_POLL_QUOTACHECK:
- err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
+ err = osc_quota_poll_check(exp, karg);
goto out;
case OBD_IOC_PING_TARGET:
err = ptlrpc_obd_ping(obd);
return 0;
} else if (KEY_IS(KEY_LAST_ID)) {
struct ptlrpc_request *req;
- u64 *reply;
- char *tmp;
- int rc;
+ u64 *reply;
+ char *tmp;
+ int rc;
req = ptlrpc_request_alloc(class_exp2cliimp(exp),
&RQF_OST_GET_INFO_LAST_ID);
ptlrpc_req_finished(req);
return rc;
} else if (KEY_IS(KEY_FIEMAP)) {
- struct ll_fiemap_info_key *fm_key =
- (struct ll_fiemap_info_key *)key;
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- struct lustre_handle lockh;
- ldlm_mode_t mode = 0;
- struct ptlrpc_request *req;
- struct ll_user_fiemap *reply;
- char *tmp;
- int rc;
+ struct ll_fiemap_info_key *fm_key = key;
+ struct ldlm_res_id res_id;
+ ldlm_policy_data_t policy;
+ struct lustre_handle lockh;
+ ldlm_mode_t mode = 0;
+ struct ptlrpc_request *req;
+ struct ll_user_fiemap *reply;
+ char *tmp;
+ int rc;
if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
goto skip_locking;
void *val, struct ptlrpc_request_set *set)
{
struct ptlrpc_request *req;
- struct obd_device *obd = exp->exp_obd;
- struct obd_import *imp = class_exp2cliimp(exp);
- char *tmp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ char *tmp;
+ int rc;
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
struct client_obd *cli = &obd->u.cli;
LASSERT(cli->cl_cache == NULL); /* only once */
- cli->cl_cache = (struct cl_client_cache *)val;
+ cli->cl_cache = val;
atomic_inc(&cli->cl_cache->ccc_users);
cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
- OBDO_ALLOC(oa);
+ oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
if (!oa) {
ptlrpc_req_finished(req);
return -ENOMEM;
LASSERT(set != NULL);
ptlrpc_set_add_req(set, req);
ptlrpc_check_set(NULL, set);
- } else
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ } else {
+ ptlrpcd_add_req(req);
+ }
return 0;
}
}
case IMP_EVENT_INVALIDATE: {
struct ldlm_namespace *ns = obd->obd_namespace;
- struct lu_env *env;
- int refcheck;
+ struct lu_env *env;
+ int refcheck;
env = cl_env_get(&refcheck);
if (!IS_ERR(env)) {
cli = &obd->u.cli;
/* all pages go to failing rpcs due to the invalid
* import */
- osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
+ osc_io_unplug(env, cli, NULL);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
cl_env_put(env, &refcheck);
/* See bug 7198 */
if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
- imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
+ imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
break;
CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
- osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ osc_io_unplug(env, cli, NULL);
return 0;
}
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct lprocfs_static_vars lvars = { NULL };
- struct client_obd *cli = &obd->u.cli;
- void *handler;
- int rc;
+ struct client_obd *cli = &obd->u.cli;
+ void *handler;
+ int rc;
+ int adding;
+ int added;
+ int req_count;
rc = ptlrpcd_addref();
if (rc)
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
- if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
+ if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
lproc_osc_attach_seqstat(obd);
sptlrpc_lprocfs_cliobd_attach(obd);
ptlrpc_lprocfs_register_obd(obd);
}
- /* We need to allocate a few requests more, because
- * brw_interpret tries to create new requests before freeing
- * previous ones, Ideally we want to have 2x max_rpcs_in_flight
- * reserved, but I'm afraid that might be too much wasted RAM
- * in fact, so 2 is just my guess and still should work. */
- cli->cl_import->imp_rq_pool =
- ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
- OST_MAXREQSIZE,
- ptlrpc_add_rqs_to_pool);
+ /*
+ * We try to control the total number of requests with a upper limit
+ * osc_reqpool_maxreqcount. There might be some race which will cause
+ * over-limit allocation, but it is fine.
+ */
+ req_count = atomic_read(&osc_pool_req_count);
+ if (req_count < osc_reqpool_maxreqcount) {
+ adding = cli->cl_max_rpcs_in_flight + 2;
+ if (req_count + adding > osc_reqpool_maxreqcount)
+ adding = osc_reqpool_maxreqcount - req_count;
+
+ added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
+ atomic_add(added, &osc_pool_req_count);
+ }
INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
switch (stage) {
case OBD_CLEANUP_EARLY: {
struct obd_import *imp;
+
imp = obd->u.cli.cl_import;
CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
static int __init osc_init(void)
{
struct lprocfs_static_vars lvars = { NULL };
+ unsigned int reqpool_size;
+ unsigned int reqsize;
int rc;
/* print an address of _any_ initialized kernel symbol from this
lprocfs_osc_init_vars(&lvars);
- rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
+ rc = class_register_type(&osc_obd_ops, NULL,
LUSTRE_OSC_NAME, &osc_device_type);
- if (rc) {
- lu_kmem_fini(osc_caches);
- return rc;
- }
+ if (rc)
+ goto out_kmem;
spin_lock_init(&osc_ast_guard);
lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
+ /* This is obviously too much memory, only prevent overflow here */
+ if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
+ rc = -EINVAL;
+ goto out_type;
+ }
+
+ reqpool_size = osc_reqpool_mem_max << 20;
+
+ reqsize = 1;
+ while (reqsize < OST_MAXREQSIZE)
+ reqsize = reqsize << 1;
+
+ /*
+ * We don't enlarge the request count in OSC pool according to
+ * cl_max_rpcs_in_flight. The allocation from the pool will only be
+ * tried after normal allocation failed. So a small OSC pool won't
+ * cause much performance degression in most of cases.
+ */
+ osc_reqpool_maxreqcount = reqpool_size / reqsize;
+
+ atomic_set(&osc_pool_req_count, 0);
+ osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
+ ptlrpc_add_rqs_to_pool);
+
+ if (osc_rq_pool)
+ return 0;
+
+ rc = -ENOMEM;
+
+out_type:
+ class_unregister_type(LUSTRE_OSC_NAME);
+out_kmem:
+ lu_kmem_fini(osc_caches);
return rc;
}
{
class_unregister_type(LUSTRE_OSC_NAME);
lu_kmem_fini(osc_caches);
+ ptlrpc_free_rq_pool(osc_rq_pool);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");