MongoDB sorts the results in memory, and the default mem limitation is
32M, if the sort operation consumes more than that it will return an
error:
OperationFailure: Executor error during find command: OperationFailed
Sort operation used more than the maximum
33554432 bytes of RAM. Add an
index, or specify a smaller limit.
To solve this problem, here we leverage aggregate() and
allowDiskUse=True, it is said will not be limited by memory
Change-Id: Id698ad1d02912e8b350a33a926fcccc390814fcc
Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
query = {}
data = []
sort = kwargs.get('sort')
- page = kwargs.get('page')
- last = kwargs.get('last')
- per_page = kwargs.get('per_page')
+ page = kwargs.get('page', 0)
+ last = kwargs.get('last', 0)
+ per_page = kwargs.get('per_page', 0)
cursor = self._eval_db(self.table, 'find', query)
+ records_count = yield cursor.count()
+ records_nr = records_count
+ if (records_count > last) and (last > 0):
+ records_nr = last
+
+ pipelines = list()
+ if query:
+ pipelines.append({'$match': query})
if sort:
- cursor = cursor.sort(sort)
- if last and last != 0:
- cursor = cursor.limit(last)
- if page:
- records_count = yield cursor.count()
- total_pages, remainder = divmod(records_count, per_page)
+ pipelines.append({'$sort': sort})
+
+ if page > 0:
+ total_pages, remainder = divmod(records_nr, per_page)
if remainder > 0:
total_pages += 1
- cursor = cursor.skip((page - 1) * per_page).limit(per_page)
+ pipelines.append({'$skip': (page - 1) * per_page})
+ pipelines.append({'$limit': per_page})
+ else:
+ pipelines.append({'$limit': records_nr})
+
+ cursor = self._eval_db(self.table,
+ 'aggregate',
+ pipelines,
+ allowDiskUse=True)
+
while (yield cursor.fetch_next):
data.append(self.format_data(cursor.next_object()))
if res_op is None:
@in trust_indicator: query
@required trust_indicator: False
"""
- limitations = {'sort': [('start_date', -1)]}
+ limitations = {'sort': {'start_date': -1}}
last = self.get_query_argument('last', 0)
if last is not None:
last = self.get_int('last', last)
limitations.update({'last': last})
- page = self.get_query_argument('page', 1)
+ page = self.get_query_argument('page', None)
if page is not None:
page = self.get_int('page', page)
limitations.update({'page': page,
return self.collection.pop()
def sort(self, key_or_list):
- key = key_or_list[0][0]
- if key_or_list[0][1] == -1:
- reverse = True
- else:
- reverse = False
+ for k, v in key_or_list.iteritems():
+ if v == -1:
+ reverse = True
+ else:
+ reverse = False
- if key_or_list is not None:
self.collection = sorted(self.collection,
- key=itemgetter(key), reverse=reverse)
+ key=itemgetter(k), reverse=reverse)
return self
def limit(self, limit):
def find(self, *args):
return MemCursor(self._find(*args))
+ def _aggregate(self, *args, **kwargs):
+ res = self.contents
+ print args
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$match':
+ res = self._find(v)
+ cursor = MemCursor(res)
+ for arg in args[0]:
+ for k, v in arg.iteritems():
+ if k == '$sort':
+ cursor = cursor.sort(v)
+ elif k == '$skip':
+ cursor = cursor.skip(v)
+ elif k == '$limit':
+ cursor = cursor.limit(v)
+ return cursor
+
+ def aggregate(self, *args, **kwargs):
+ return self._aggregate(*args, **kwargs)
+
def _update(self, spec, document, check_keys=True):
updated = False