// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "include/compat.h" #include "include/types.h" #include "include/buffer.h" #include "osd/osd_types.h" #include #include "HashIndex.h" #include "common/errno.h" #include "common/debug.h" #define dout_context cct #define dout_subsys ceph_subsys_filestore const string HashIndex::SUBDIR_ATTR = "contents"; const string HashIndex::SETTINGS_ATTR = "settings"; const string HashIndex::IN_PROGRESS_OP_TAG = "in_progress_op"; /// hex digit to integer value int hex_to_int(char c) { if (c >= '0' && c <= '9') return c - '0'; if (c >= 'A' && c <= 'F') return c - 'A' + 10; ceph_abort(); } /// int value to hex digit char int_to_hex(int v) { assert(v < 16); if (v < 10) return '0' + v; return 'A' + v - 10; } /// reverse bits in a nibble (0..15) int reverse_nibble_bits(int in) { assert(in < 16); return ((in & 8) >> 3) | ((in & 4) >> 1) | ((in & 2) << 1) | ((in & 1) << 3); } /// reverse nibble bits in a hex digit char reverse_hexdigit_bits(char c) { return int_to_hex(reverse_nibble_bits(hex_to_int(c))); } /// reverse nibble bits in a hex string string reverse_hexdigit_bits_string(string s) { for (unsigned i=0; i(), IN_PROGRESS_OP_TAG, bl); if (r < 0) { // No in progress operations! return 0; } bufferlist::iterator i = bl.begin(); InProgressOp in_progress(i); subdir_info_s info; r = get_info(in_progress.path, &info); if (r == -ENOENT) { return end_split_or_merge(in_progress.path); } else if (r < 0) { return r; } if (in_progress.is_split()) return complete_split(in_progress.path, info); else if (in_progress.is_merge()) return complete_merge(in_progress.path, info); else if (in_progress.is_col_split()) { for (vector::iterator i = in_progress.path.begin(); i != in_progress.path.end(); ++i) { vector path(in_progress.path.begin(), i); int r = reset_attr(path); if (r < 0) return r; } return 0; } else return -EINVAL; } int HashIndex::reset_attr( const vector &path) { int exists = 0; int r = path_exists(path, &exists); if (r < 0) return r; if (!exists) return 0; map objects; vector subdirs; r = list_objects(path, 0, 0, &objects); if (r < 0) return r; r = list_subdirs(path, &subdirs); if (r < 0) return r; subdir_info_s info; info.hash_level = path.size(); info.objs = objects.size(); info.subdirs = subdirs.size(); return set_info(path, info); } int HashIndex::col_split_level( HashIndex &from, HashIndex &to, const vector &path, uint32_t inbits, uint32_t match, unsigned *mkdirred) { /* For each subdir, move, recurse, or ignore based on comparing the low order * bits of the hash represented by the subdir path with inbits, match passed * in. */ vector subdirs; int r = from.list_subdirs(path, &subdirs); if (r < 0) return r; map objects; r = from.list_objects(path, 0, 0, &objects); if (r < 0) return r; set to_move; for (vector::iterator i = subdirs.begin(); i != subdirs.end(); ++i) { uint32_t bits = 0; uint32_t hash = 0; vector sub_path(path.begin(), path.end()); sub_path.push_back(*i); path_to_hobject_hash_prefix(sub_path, &bits, &hash); if (bits < inbits) { if (hobject_t::match_hash(hash, bits, match)) { r = col_split_level( from, to, sub_path, inbits, match, mkdirred); if (r < 0) return r; if (*mkdirred > path.size()) *mkdirred = path.size(); } // else, skip, doesn't need to be moved or recursed into } else { if (hobject_t::match_hash(hash, inbits, match)) { to_move.insert(*i); } } // else, skip, doesn't need to be moved or recursed into } /* Then, do the same for each object */ map objs_to_move; for (map::iterator i = objects.begin(); i != objects.end(); ++i) { if (i->second.match(inbits, match)) { objs_to_move.insert(*i); } } if (objs_to_move.empty() && to_move.empty()) return 0; // Make parent directories as needed while (*mkdirred < path.size()) { ++*mkdirred; int exists = 0; vector creating_path(path.begin(), path.begin()+*mkdirred); r = to.path_exists(creating_path, &exists); if (r < 0) return r; if (exists) continue; subdir_info_s info; info.objs = 0; info.subdirs = 0; info.hash_level = creating_path.size(); if (*mkdirred < path.size() - 1) info.subdirs = 1; r = to.start_col_split(creating_path); if (r < 0) return r; r = to.create_path(creating_path); if (r < 0) return r; r = to.set_info(creating_path, info); if (r < 0) return r; r = to.end_split_or_merge(creating_path); if (r < 0) return r; } subdir_info_s from_info; subdir_info_s to_info; r = from.get_info(path, &from_info); if (r < 0) return r; r = to.get_info(path, &to_info); if (r < 0) return r; from.start_col_split(path); to.start_col_split(path); // Do subdir moves for (set::iterator i = to_move.begin(); i != to_move.end(); ++i) { from_info.subdirs--; to_info.subdirs++; r = move_subdir(from, to, path, *i); if (r < 0) return r; } for (map::iterator i = objs_to_move.begin(); i != objs_to_move.end(); ++i) { from_info.objs--; to_info.objs++; r = move_object(from, to, path, *i); if (r < 0) return r; } r = to.set_info(path, to_info); if (r < 0) return r; r = from.set_info(path, from_info); if (r < 0) return r; from.end_split_or_merge(path); to.end_split_or_merge(path); return 0; } int HashIndex::_split( uint32_t match, uint32_t bits, CollectionIndex* dest) { assert(collection_version() == dest->collection_version()); unsigned mkdirred = 0; return col_split_level( *this, *static_cast(dest), vector(), bits, match, &mkdirred); } int HashIndex::split_dirs(const vector &path) { dout(20) << __func__ << " " << path << dendl; subdir_info_s info; int r = get_info(path, &info); if (r < 0) { dout(10) << "error looking up info for " << path << ": " << cpp_strerror(r) << dendl; return r; } if (must_split(info)) { dout(1) << __func__ << " " << path << " has " << info.objs << " objects, starting split." << dendl; r = initiate_split(path, info); if (r < 0) { dout(10) << "error initiating split on " << path << ": " << cpp_strerror(r) << dendl; return r; } r = complete_split(path, info); dout(1) << __func__ << " " << path << " split completed." << dendl; if (r < 0) { dout(10) << "error completing split on " << path << ": " << cpp_strerror(r) << dendl; return r; } } vector subdirs; r = list_subdirs(path, &subdirs); if (r < 0) { dout(10) << "error listing subdirs of " << path << ": " << cpp_strerror(r) << dendl; return r; } for (vector::const_iterator it = subdirs.begin(); it != subdirs.end(); ++it) { vector subdir_path(path); subdir_path.push_back(*it); r = split_dirs(subdir_path); if (r < 0) { return r; } } return r; } int HashIndex::apply_layout_settings() { vector path; dout(10) << __func__ << " split multiple = " << split_multiplier << " merge threshold = " << merge_threshold << " split rand factor = " << cct->_conf->filestore_split_rand_factor << dendl; int r = write_settings(); if (r < 0) return r; return split_dirs(path); } int HashIndex::_init() { subdir_info_s info; vector path; int r = set_info(path, info); if (r < 0) return r; return write_settings(); } int HashIndex::write_settings() { if (cct->_conf->filestore_split_rand_factor > 0) { settings.split_rand_factor = rand() % cct->_conf->filestore_split_rand_factor; } else { settings.split_rand_factor = 0; } vector path; bufferlist bl; settings.encode(bl); return add_attr_path(path, SETTINGS_ATTR, bl); } int HashIndex::read_settings() { vector path; bufferlist bl; int r = get_attr_path(path, SETTINGS_ATTR, bl); if (r == -ENODATA) return 0; if (r < 0) { derr << __func__ << " error reading settings: " << cpp_strerror(r) << dendl; return r; } bufferlist::iterator it = bl.begin(); settings.decode(it); dout(20) << __func__ << " split_rand_factor = " << settings.split_rand_factor << dendl; return 0; } /* LFNIndex virtual method implementations */ int HashIndex::_created(const vector &path, const ghobject_t &oid, const string &mangled_name) { subdir_info_s info; int r; r = get_info(path, &info); if (r < 0) return r; info.objs++; r = set_info(path, info); if (r < 0) return r; if (must_split(info)) { dout(1) << __func__ << " " << path << " has " << info.objs << " objects, starting split." << dendl; int r = initiate_split(path, info); if (r < 0) return r; r = complete_split(path, info); dout(1) << __func__ << " " << path << " split completed." << dendl; return r; } else { return 0; } } int HashIndex::_remove(const vector &path, const ghobject_t &oid, const string &mangled_name) { int r; r = remove_object(path, oid); if (r < 0) return r; subdir_info_s info; r = get_info(path, &info); if (r < 0) return r; info.objs--; r = set_info(path, info); if (r < 0) return r; if (must_merge(info)) { r = initiate_merge(path, info); if (r < 0) return r; return complete_merge(path, info); } else { return 0; } } int HashIndex::_lookup(const ghobject_t &oid, vector *path, string *mangled_name, int *hardlink) { vector path_comp; get_path_components(oid, &path_comp); vector::iterator next = path_comp.begin(); int exists; while (1) { int r = path_exists(*path, &exists); if (r < 0) return r; if (!exists) { if (path->empty()) return -ENOENT; path->pop_back(); break; } if (next == path_comp.end()) break; path->push_back(*(next++)); } return get_mangled_name(*path, oid, mangled_name, hardlink); } int HashIndex::_collection_list_partial(const ghobject_t &start, const ghobject_t &end, int max_count, vector *ls, ghobject_t *next) { vector path; ghobject_t _next; if (!next) next = &_next; *next = start; dout(20) << __func__ << " start:" << start << " end:" << end << "-" << max_count << " ls.size " << ls->size() << dendl; return list_by_hash(path, end, max_count, next, ls); } int HashIndex::prep_delete() { return recursive_remove(vector()); } int HashIndex::_pre_hash_collection(uint32_t pg_num, uint64_t expected_num_objs) { int ret; vector path; subdir_info_s root_info; // Make sure there is neither objects nor sub-folders // in this collection ret = get_info(path, &root_info); if (ret < 0) return ret; // Do the folder splitting first ret = pre_split_folder(pg_num, expected_num_objs); if (ret < 0) return ret; // Initialize the folder info starting from root return init_split_folder(path, 0); } int HashIndex::pre_split_folder(uint32_t pg_num, uint64_t expected_num_objs) { // If folder merging is enabled (by setting the threshold positive), // no need to split if (merge_threshold > 0) return 0; const coll_t c = coll(); // Do not split if the expected number of objects in this collection is zero (by default) if (expected_num_objs == 0) return 0; // Calculate the number of leaf folders (which actually store files) // need to be created const uint64_t objs_per_folder = ((uint64_t)(abs(merge_threshold)) * (uint64_t)split_multiplier + settings.split_rand_factor) * 16; uint64_t leavies = expected_num_objs / objs_per_folder ; // No need to split if (leavies == 0 || expected_num_objs == objs_per_folder) return 0; spg_t spgid; if (!c.is_pg_prefix(&spgid)) return -EINVAL; const ps_t ps = spgid.pgid.ps(); // the most significant bits of pg_num const int pg_num_bits = calc_num_bits(pg_num - 1); ps_t tmp_id = ps; // calculate the number of levels we only create one sub folder int num = pg_num_bits / 4; // pg num's hex value is like 1xxx,xxxx,xxxx but not 1111,1111,1111, // so that splitting starts at level 3 if (pg_num_bits % 4 == 0 && pg_num < ((uint32_t)1 << pg_num_bits)) { --num; } int ret; // Start with creation that only has one subfolder vector paths; int dump_num = num; while (num-- > 0) { ps_t v = tmp_id & 0x0000000f; paths.push_back(to_hex(v)); ret = create_path(paths); if (ret < 0 && ret != -EEXIST) return ret; tmp_id = tmp_id >> 4; } // Starting from here, we can split by creating multiple subfolders const int left_bits = pg_num_bits - dump_num * 4; // this variable denotes how many bits (for this level) that can be // used for sub folder splitting int split_bits = 4 - left_bits; // the below logic is inspired by rados.h#ceph_stable_mod, // it basically determines how many sub-folders should we // create for splitting assert(pg_num_bits > 0); // otherwise BAD_SHIFT if (((1 << (pg_num_bits - 1)) | ps) >= pg_num) { ++split_bits; } const uint32_t subs = (1 << split_bits); // Calculate how many levels we create starting from here int level = 0; leavies /= subs; while (leavies > 1) { ++level; leavies = leavies >> 4; } for (uint32_t i = 0; i < subs; ++i) { assert(split_bits <= 4); // otherwise BAD_SHIFT int v = tmp_id | (i << ((4 - split_bits) % 4)); paths.push_back(to_hex(v)); ret = create_path(paths); if (ret < 0 && ret != -EEXIST) return ret; ret = recursive_create_path(paths, level); if (ret < 0) return ret; paths.pop_back(); } return 0; } int HashIndex::init_split_folder(vector &path, uint32_t hash_level) { // Get the number of sub directories for the current path vector subdirs; int ret = list_subdirs(path, &subdirs); if (ret < 0) return ret; subdir_info_s info; info.subdirs = subdirs.size(); info.hash_level = hash_level; ret = set_info(path, info); if (ret < 0) return ret; ret = fsync_dir(path); if (ret < 0) return ret; // Do the same for subdirs vector::const_iterator iter; for (iter = subdirs.begin(); iter != subdirs.end(); ++iter) { path.push_back(*iter); ret = init_split_folder(path, hash_level + 1); if (ret < 0) return ret; path.pop_back(); } return 0; } int HashIndex::recursive_create_path(vector& path, int level) { if (level == 0) return 0; for (int i = 0; i < 16; ++i) { path.push_back(to_hex(i)); int ret = create_path(path); if (ret < 0 && ret != -EEXIST) return ret; ret = recursive_create_path(path, level - 1); if (ret < 0) return ret; path.pop_back(); } return 0; } int HashIndex::recursive_remove(const vector &path) { return _recursive_remove(path, true); } int HashIndex::_recursive_remove(const vector &path, bool top) { vector subdirs; dout(20) << __func__ << " path=" << path << dendl; int r = list_subdirs(path, &subdirs); if (r < 0) return r; map objects; r = list_objects(path, 0, 0, &objects); if (r < 0) return r; if (!objects.empty()) return -ENOTEMPTY; vector subdir(path); for (vector::iterator i = subdirs.begin(); i != subdirs.end(); ++i) { subdir.push_back(*i); r = _recursive_remove(subdir, false); if (r < 0) return r; subdir.pop_back(); } if (top) return 0; else return remove_path(path); } int HashIndex::start_col_split(const vector &path) { bufferlist bl; InProgressOp op_tag(InProgressOp::COL_SPLIT, path); op_tag.encode(bl); int r = add_attr_path(vector(), IN_PROGRESS_OP_TAG, bl); if (r < 0) return r; return fsync_dir(vector()); } int HashIndex::start_split(const vector &path) { bufferlist bl; InProgressOp op_tag(InProgressOp::SPLIT, path); op_tag.encode(bl); int r = add_attr_path(vector(), IN_PROGRESS_OP_TAG, bl); if (r < 0) return r; return fsync_dir(vector()); } int HashIndex::start_merge(const vector &path) { bufferlist bl; InProgressOp op_tag(InProgressOp::MERGE, path); op_tag.encode(bl); int r = add_attr_path(vector(), IN_PROGRESS_OP_TAG, bl); if (r < 0) return r; return fsync_dir(vector()); } int HashIndex::end_split_or_merge(const vector &path) { return remove_attr_path(vector(), IN_PROGRESS_OP_TAG); } int HashIndex::get_info(const vector &path, subdir_info_s *info) { bufferlist buf; int r = get_attr_path(path, SUBDIR_ATTR, buf); if (r < 0) return r; bufferlist::iterator bufiter = buf.begin(); info->decode(bufiter); assert(path.size() == (unsigned)info->hash_level); return 0; } int HashIndex::set_info(const vector &path, const subdir_info_s &info) { bufferlist buf; assert(path.size() == (unsigned)info.hash_level); info.encode(buf); return add_attr_path(path, SUBDIR_ATTR, buf); } bool HashIndex::must_merge(const subdir_info_s &info) { return (info.hash_level > 0 && merge_threshold > 0 && info.objs < (unsigned)merge_threshold && info.subdirs == 0); } bool HashIndex::must_split(const subdir_info_s &info) { return (info.hash_level < (unsigned)MAX_HASH_LEVEL && info.objs > ((unsigned)(abs(merge_threshold) * split_multiplier + settings.split_rand_factor) * 16)); } int HashIndex::initiate_merge(const vector &path, subdir_info_s info) { return start_merge(path); } int HashIndex::complete_merge(const vector &path, subdir_info_s info) { vector dst = path; dst.pop_back(); subdir_info_s dstinfo; int r, exists; r = path_exists(path, &exists); if (r < 0) return r; r = get_info(dst, &dstinfo); if (r < 0) return r; if (exists) { r = move_objects(path, dst); if (r < 0) return r; r = reset_attr(dst); if (r < 0) return r; r = remove_path(path); if (r < 0) return r; } if (must_merge(dstinfo)) { r = initiate_merge(dst, dstinfo); if (r < 0) return r; r = fsync_dir(dst); if (r < 0) return r; return complete_merge(dst, dstinfo); } r = fsync_dir(dst); if (r < 0) return r; return end_split_or_merge(path); } int HashIndex::initiate_split(const vector &path, subdir_info_s info) { return start_split(path); } int HashIndex::complete_split(const vector &path, subdir_info_s info) { int level = info.hash_level; map objects; vector dst = path; int r; dst.push_back(""); r = list_objects(path, 0, 0, &objects); if (r < 0) return r; vector subdirs_vec; r = list_subdirs(path, &subdirs_vec); if (r < 0) return r; set subdirs; subdirs.insert(subdirs_vec.begin(), subdirs_vec.end()); map > mapped; map moved; int num_moved = 0; for (map::iterator i = objects.begin(); i != objects.end(); ++i) { vector new_path; get_path_components(i->second, &new_path); mapped[new_path[level]][i->first] = i->second; } for (map >::iterator i = mapped.begin(); i != mapped.end(); ) { dst[level] = i->first; /* If the info already exists, it must be correct, * we may be picking up a partially finished split */ subdir_info_s temp; // subdir has already been fully copied if (subdirs.count(i->first) && !get_info(dst, &temp)) { for (map::iterator j = i->second.begin(); j != i->second.end(); ++j) { moved[j->first] = j->second; num_moved++; objects.erase(j->first); } ++i; continue; } subdir_info_s info_new; info_new.objs = i->second.size(); info_new.subdirs = 0; info_new.hash_level = level + 1; if (must_merge(info_new) && !subdirs.count(i->first)) { mapped.erase(i++); continue; } // Subdir doesn't yet exist if (!subdirs.count(i->first)) { info.subdirs += 1; r = create_path(dst); if (r < 0) return r; } // else subdir has been created but only partially copied for (map::iterator j = i->second.begin(); j != i->second.end(); ++j) { moved[j->first] = j->second; num_moved++; objects.erase(j->first); r = link_object(path, dst, j->second, j->first); // May be a partially finished split if (r < 0 && r != -EEXIST) { return r; } } r = fsync_dir(dst); if (r < 0) return r; // Presence of info must imply that all objects have been copied r = set_info(dst, info_new); if (r < 0) return r; r = fsync_dir(dst); if (r < 0) return r; ++i; } r = remove_objects(path, moved, &objects); if (r < 0) return r; info.objs = objects.size(); r = reset_attr(path); if (r < 0) return r; r = fsync_dir(path); if (r < 0) return r; return end_split_or_merge(path); } void HashIndex::get_path_components(const ghobject_t &oid, vector *path) { char buf[MAX_HASH_LEVEL + 1]; snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, (uint32_t)oid.hobj.get_nibblewise_key()); // Path components are the hex characters of oid.hobj.hash, least // significant first for (int i = 0; i < MAX_HASH_LEVEL; ++i) { path->push_back(string(&buf[i], 1)); } } string HashIndex::get_hash_str(uint32_t hash) { char buf[MAX_HASH_LEVEL + 1]; snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, hash); string retval; for (int i = 0; i < MAX_HASH_LEVEL; ++i) { retval.push_back(buf[MAX_HASH_LEVEL - 1 - i]); } return retval; } string HashIndex::get_path_str(const ghobject_t &oid) { assert(!oid.is_max()); return get_hash_str(oid.hobj.get_hash()); } uint32_t HashIndex::hash_prefix_to_hash(string prefix) { while (prefix.size() < sizeof(uint32_t) * 2) { prefix.push_back('0'); } uint32_t hash; sscanf(prefix.c_str(), "%x", &hash); // nibble reverse hash = ((hash & 0x0f0f0f0f) << 4) | ((hash & 0xf0f0f0f0) >> 4); hash = ((hash & 0x00ff00ff) << 8) | ((hash & 0xff00ff00) >> 8); hash = ((hash & 0x0000ffff) << 16) | ((hash & 0xffff0000) >> 16); return hash; } int HashIndex::get_path_contents_by_hash_bitwise( const vector &path, const ghobject_t *next_object, set *hash_prefixes, set, CmpPairBitwise> *objects) { map rev_objects; int r; r = list_objects(path, 0, 0, &rev_objects); if (r < 0) return r; // bitwise sort for (map::iterator i = rev_objects.begin(); i != rev_objects.end(); ++i) { if (next_object && i->second < *next_object) continue; string hash_prefix = get_path_str(i->second); hash_prefixes->insert(hash_prefix); objects->insert(pair(hash_prefix, i->second)); } vector subdirs; r = list_subdirs(path, &subdirs); if (r < 0) return r; // sort subdirs bitwise (by reversing hex digit nibbles) std::sort(subdirs.begin(), subdirs.end(), cmp_hexdigit_bitwise); // Local to this function, we will convert the prefix strings // (previously simply the reversed hex digits) to also have each // digit's nibbles reversed. This will make the strings sort // bitwise. string cur_prefix; for (vector::const_iterator i = path.begin(); i != path.end(); ++i) { cur_prefix.append(reverse_hexdigit_bits_string(*i)); } string next_object_string; if (next_object) next_object_string = reverse_hexdigit_bits_string(get_path_str(*next_object)); for (vector::iterator i = subdirs.begin(); i != subdirs.end(); ++i) { string candidate = cur_prefix + reverse_hexdigit_bits_string(*i); if (next_object) { if (next_object->is_max()) continue; if (candidate < next_object_string.substr(0, candidate.size())) continue; } // re-reverse the hex digit nibbles for the caller hash_prefixes->insert(reverse_hexdigit_bits_string(candidate)); } return 0; } int HashIndex::list_by_hash(const vector &path, const ghobject_t &end, int max_count, ghobject_t *next, vector *out) { assert(out); return list_by_hash_bitwise(path, end, max_count, next, out); } int HashIndex::list_by_hash_bitwise( const vector &path, const ghobject_t& end, int max_count, ghobject_t *next, vector *out) { vector next_path = path; next_path.push_back(""); set hash_prefixes; set, CmpPairBitwise> objects; int r = get_path_contents_by_hash_bitwise(path, next, &hash_prefixes, &objects); if (r < 0) return r; for (set::iterator i = hash_prefixes.begin(); i != hash_prefixes.end(); ++i) { dout(20) << __func__ << " prefix " << *i << dendl; set, CmpPairBitwise>::iterator j = objects.lower_bound( make_pair(*i, ghobject_t())); if (j == objects.end() || j->first != *i) { *(next_path.rbegin()) = *(i->rbegin()); ghobject_t next_recurse; if (next) next_recurse = *next; r = list_by_hash_bitwise(next_path, end, max_count, &next_recurse, out); if (r < 0) return r; if (!next_recurse.is_max()) { if (next) *next = next_recurse; return 0; } } else { while (j != objects.end() && j->first == *i) { if (max_count > 0 && out->size() == (unsigned)max_count) { if (next) *next = j->second; return 0; } if (j->second >= end) { if (next) *next = j->second; return 0; } if (!next || j->second >= *next) { dout(20) << __func__ << " prefix " << *i << " ob " << j->second << dendl; out->push_back(j->second); } ++j; } } } if (next) *next = ghobject_t::get_max(); return 0; }