Example as code, documentation template for sphinx build
[kvmfornfv.git] / qemu / migration / ram.c
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 #include <stdint.h>
29 #include <zlib.h>
30 #include "qemu/bitops.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/timer.h"
33 #include "qemu/main-loop.h"
34 #include "migration/migration.h"
35 #include "exec/address-spaces.h"
36 #include "migration/page_cache.h"
37 #include "qemu/error-report.h"
38 #include "trace.h"
39 #include "exec/ram_addr.h"
40 #include "qemu/rcu_queue.h"
41
42 #ifdef DEBUG_MIGRATION_RAM
43 #define DPRINTF(fmt, ...) \
44     do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49
50 static bool mig_throttle_on;
51 static int dirty_rate_high_cnt;
52 static void check_guest_throttling(void);
53
54 static uint64_t bitmap_sync_count;
55
56 /***********************************************************/
57 /* ram save/restore */
58
59 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
60 #define RAM_SAVE_FLAG_COMPRESS 0x02
61 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
62 #define RAM_SAVE_FLAG_PAGE     0x08
63 #define RAM_SAVE_FLAG_EOS      0x10
64 #define RAM_SAVE_FLAG_CONTINUE 0x20
65 #define RAM_SAVE_FLAG_XBZRLE   0x40
66 /* 0x80 is reserved in migration.h start with 0x100 next */
67 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
68
69 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
70
71 static inline bool is_zero_range(uint8_t *p, uint64_t size)
72 {
73     return buffer_find_nonzero_offset(p, size) == size;
74 }
75
76 /* struct contains XBZRLE cache and a static page
77    used by the compression */
78 static struct {
79     /* buffer used for XBZRLE encoding */
80     uint8_t *encoded_buf;
81     /* buffer for storing page content */
82     uint8_t *current_buf;
83     /* Cache for XBZRLE, Protected by lock. */
84     PageCache *cache;
85     QemuMutex lock;
86 } XBZRLE;
87
88 /* buffer used for XBZRLE decoding */
89 static uint8_t *xbzrle_decoded_buf;
90
91 static void XBZRLE_cache_lock(void)
92 {
93     if (migrate_use_xbzrle())
94         qemu_mutex_lock(&XBZRLE.lock);
95 }
96
97 static void XBZRLE_cache_unlock(void)
98 {
99     if (migrate_use_xbzrle())
100         qemu_mutex_unlock(&XBZRLE.lock);
101 }
102
103 /*
104  * called from qmp_migrate_set_cache_size in main thread, possibly while
105  * a migration is in progress.
106  * A running migration maybe using the cache and might finish during this
107  * call, hence changes to the cache are protected by XBZRLE.lock().
108  */
109 int64_t xbzrle_cache_resize(int64_t new_size)
110 {
111     PageCache *new_cache;
112     int64_t ret;
113
114     if (new_size < TARGET_PAGE_SIZE) {
115         return -1;
116     }
117
118     XBZRLE_cache_lock();
119
120     if (XBZRLE.cache != NULL) {
121         if (pow2floor(new_size) == migrate_xbzrle_cache_size()) {
122             goto out_new_size;
123         }
124         new_cache = cache_init(new_size / TARGET_PAGE_SIZE,
125                                         TARGET_PAGE_SIZE);
126         if (!new_cache) {
127             error_report("Error creating cache");
128             ret = -1;
129             goto out;
130         }
131
132         cache_fini(XBZRLE.cache);
133         XBZRLE.cache = new_cache;
134     }
135
136 out_new_size:
137     ret = pow2floor(new_size);
138 out:
139     XBZRLE_cache_unlock();
140     return ret;
141 }
142
143 /* accounting for migration statistics */
144 typedef struct AccountingInfo {
145     uint64_t dup_pages;
146     uint64_t skipped_pages;
147     uint64_t norm_pages;
148     uint64_t iterations;
149     uint64_t xbzrle_bytes;
150     uint64_t xbzrle_pages;
151     uint64_t xbzrle_cache_miss;
152     double xbzrle_cache_miss_rate;
153     uint64_t xbzrle_overflows;
154 } AccountingInfo;
155
156 static AccountingInfo acct_info;
157
158 static void acct_clear(void)
159 {
160     memset(&acct_info, 0, sizeof(acct_info));
161 }
162
163 uint64_t dup_mig_bytes_transferred(void)
164 {
165     return acct_info.dup_pages * TARGET_PAGE_SIZE;
166 }
167
168 uint64_t dup_mig_pages_transferred(void)
169 {
170     return acct_info.dup_pages;
171 }
172
173 uint64_t skipped_mig_bytes_transferred(void)
174 {
175     return acct_info.skipped_pages * TARGET_PAGE_SIZE;
176 }
177
178 uint64_t skipped_mig_pages_transferred(void)
179 {
180     return acct_info.skipped_pages;
181 }
182
183 uint64_t norm_mig_bytes_transferred(void)
184 {
185     return acct_info.norm_pages * TARGET_PAGE_SIZE;
186 }
187
188 uint64_t norm_mig_pages_transferred(void)
189 {
190     return acct_info.norm_pages;
191 }
192
193 uint64_t xbzrle_mig_bytes_transferred(void)
194 {
195     return acct_info.xbzrle_bytes;
196 }
197
198 uint64_t xbzrle_mig_pages_transferred(void)
199 {
200     return acct_info.xbzrle_pages;
201 }
202
203 uint64_t xbzrle_mig_pages_cache_miss(void)
204 {
205     return acct_info.xbzrle_cache_miss;
206 }
207
208 double xbzrle_mig_cache_miss_rate(void)
209 {
210     return acct_info.xbzrle_cache_miss_rate;
211 }
212
213 uint64_t xbzrle_mig_pages_overflow(void)
214 {
215     return acct_info.xbzrle_overflows;
216 }
217
218 /* This is the last block that we have visited serching for dirty pages
219  */
220 static RAMBlock *last_seen_block;
221 /* This is the last block from where we have sent data */
222 static RAMBlock *last_sent_block;
223 static ram_addr_t last_offset;
224 static unsigned long *migration_bitmap;
225 static QemuMutex migration_bitmap_mutex;
226 static uint64_t migration_dirty_pages;
227 static uint32_t last_version;
228 static bool ram_bulk_stage;
229
230 struct CompressParam {
231     bool start;
232     bool done;
233     QEMUFile *file;
234     QemuMutex mutex;
235     QemuCond cond;
236     RAMBlock *block;
237     ram_addr_t offset;
238 };
239 typedef struct CompressParam CompressParam;
240
241 struct DecompressParam {
242     bool start;
243     QemuMutex mutex;
244     QemuCond cond;
245     void *des;
246     uint8 *compbuf;
247     int len;
248 };
249 typedef struct DecompressParam DecompressParam;
250
251 static CompressParam *comp_param;
252 static QemuThread *compress_threads;
253 /* comp_done_cond is used to wake up the migration thread when
254  * one of the compression threads has finished the compression.
255  * comp_done_lock is used to co-work with comp_done_cond.
256  */
257 static QemuMutex *comp_done_lock;
258 static QemuCond *comp_done_cond;
259 /* The empty QEMUFileOps will be used by file in CompressParam */
260 static const QEMUFileOps empty_ops = { };
261
262 static bool compression_switch;
263 static bool quit_comp_thread;
264 static bool quit_decomp_thread;
265 static DecompressParam *decomp_param;
266 static QemuThread *decompress_threads;
267 static uint8_t *compressed_data_buf;
268
269 static int do_compress_ram_page(CompressParam *param);
270
271 static void *do_data_compress(void *opaque)
272 {
273     CompressParam *param = opaque;
274
275     while (!quit_comp_thread) {
276         qemu_mutex_lock(&param->mutex);
277         /* Re-check the quit_comp_thread in case of
278          * terminate_compression_threads is called just before
279          * qemu_mutex_lock(&param->mutex) and after
280          * while(!quit_comp_thread), re-check it here can make
281          * sure the compression thread terminate as expected.
282          */
283         while (!param->start && !quit_comp_thread) {
284             qemu_cond_wait(&param->cond, &param->mutex);
285         }
286         if (!quit_comp_thread) {
287             do_compress_ram_page(param);
288         }
289         param->start = false;
290         qemu_mutex_unlock(&param->mutex);
291
292         qemu_mutex_lock(comp_done_lock);
293         param->done = true;
294         qemu_cond_signal(comp_done_cond);
295         qemu_mutex_unlock(comp_done_lock);
296     }
297
298     return NULL;
299 }
300
301 static inline void terminate_compression_threads(void)
302 {
303     int idx, thread_count;
304
305     thread_count = migrate_compress_threads();
306     quit_comp_thread = true;
307     for (idx = 0; idx < thread_count; idx++) {
308         qemu_mutex_lock(&comp_param[idx].mutex);
309         qemu_cond_signal(&comp_param[idx].cond);
310         qemu_mutex_unlock(&comp_param[idx].mutex);
311     }
312 }
313
314 void migrate_compress_threads_join(void)
315 {
316     int i, thread_count;
317
318     if (!migrate_use_compression()) {
319         return;
320     }
321     terminate_compression_threads();
322     thread_count = migrate_compress_threads();
323     for (i = 0; i < thread_count; i++) {
324         qemu_thread_join(compress_threads + i);
325         qemu_fclose(comp_param[i].file);
326         qemu_mutex_destroy(&comp_param[i].mutex);
327         qemu_cond_destroy(&comp_param[i].cond);
328     }
329     qemu_mutex_destroy(comp_done_lock);
330     qemu_cond_destroy(comp_done_cond);
331     g_free(compress_threads);
332     g_free(comp_param);
333     g_free(comp_done_cond);
334     g_free(comp_done_lock);
335     compress_threads = NULL;
336     comp_param = NULL;
337     comp_done_cond = NULL;
338     comp_done_lock = NULL;
339 }
340
341 void migrate_compress_threads_create(void)
342 {
343     int i, thread_count;
344
345     if (!migrate_use_compression()) {
346         return;
347     }
348     quit_comp_thread = false;
349     compression_switch = true;
350     thread_count = migrate_compress_threads();
351     compress_threads = g_new0(QemuThread, thread_count);
352     comp_param = g_new0(CompressParam, thread_count);
353     comp_done_cond = g_new0(QemuCond, 1);
354     comp_done_lock = g_new0(QemuMutex, 1);
355     qemu_cond_init(comp_done_cond);
356     qemu_mutex_init(comp_done_lock);
357     for (i = 0; i < thread_count; i++) {
358         /* com_param[i].file is just used as a dummy buffer to save data, set
359          * it's ops to empty.
360          */
361         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
362         comp_param[i].done = true;
363         qemu_mutex_init(&comp_param[i].mutex);
364         qemu_cond_init(&comp_param[i].cond);
365         qemu_thread_create(compress_threads + i, "compress",
366                            do_data_compress, comp_param + i,
367                            QEMU_THREAD_JOINABLE);
368     }
369 }
370
371 /**
372  * save_page_header: Write page header to wire
373  *
374  * If this is the 1st block, it also writes the block identification
375  *
376  * Returns: Number of bytes written
377  *
378  * @f: QEMUFile where to send the data
379  * @block: block that contains the page we want to send
380  * @offset: offset inside the block for the page
381  *          in the lower bits, it contains flags
382  */
383 static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
384 {
385     size_t size, len;
386
387     qemu_put_be64(f, offset);
388     size = 8;
389
390     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
391         len = strlen(block->idstr);
392         qemu_put_byte(f, len);
393         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
394         size += 1 + len;
395     }
396     return size;
397 }
398
399 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
400  * The important thing is that a stale (not-yet-0'd) page be replaced
401  * by the new data.
402  * As a bonus, if the page wasn't in the cache it gets added so that
403  * when a small write is made into the 0'd page it gets XBZRLE sent
404  */
405 static void xbzrle_cache_zero_page(ram_addr_t current_addr)
406 {
407     if (ram_bulk_stage || !migrate_use_xbzrle()) {
408         return;
409     }
410
411     /* We don't care if this fails to allocate a new cache page
412      * as long as it updated an old one */
413     cache_insert(XBZRLE.cache, current_addr, ZERO_TARGET_PAGE,
414                  bitmap_sync_count);
415 }
416
417 #define ENCODING_FLAG_XBZRLE 0x1
418
419 /**
420  * save_xbzrle_page: compress and send current page
421  *
422  * Returns: 1 means that we wrote the page
423  *          0 means that page is identical to the one already sent
424  *          -1 means that xbzrle would be longer than normal
425  *
426  * @f: QEMUFile where to send the data
427  * @current_data:
428  * @current_addr:
429  * @block: block that contains the page we want to send
430  * @offset: offset inside the block for the page
431  * @last_stage: if we are at the completion stage
432  * @bytes_transferred: increase it with the number of transferred bytes
433  */
434 static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
435                             ram_addr_t current_addr, RAMBlock *block,
436                             ram_addr_t offset, bool last_stage,
437                             uint64_t *bytes_transferred)
438 {
439     int encoded_len = 0, bytes_xbzrle;
440     uint8_t *prev_cached_page;
441
442     if (!cache_is_cached(XBZRLE.cache, current_addr, bitmap_sync_count)) {
443         acct_info.xbzrle_cache_miss++;
444         if (!last_stage) {
445             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
446                              bitmap_sync_count) == -1) {
447                 return -1;
448             } else {
449                 /* update *current_data when the page has been
450                    inserted into cache */
451                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
452             }
453         }
454         return -1;
455     }
456
457     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
458
459     /* save current buffer into memory */
460     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
461
462     /* XBZRLE encoding (if there is no overflow) */
463     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
464                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
465                                        TARGET_PAGE_SIZE);
466     if (encoded_len == 0) {
467         DPRINTF("Skipping unmodified page\n");
468         return 0;
469     } else if (encoded_len == -1) {
470         DPRINTF("Overflow\n");
471         acct_info.xbzrle_overflows++;
472         /* update data in the cache */
473         if (!last_stage) {
474             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
475             *current_data = prev_cached_page;
476         }
477         return -1;
478     }
479
480     /* we need to update the data in the cache, in order to get the same data */
481     if (!last_stage) {
482         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
483     }
484
485     /* Send XBZRLE based compressed page */
486     bytes_xbzrle = save_page_header(f, block, offset | RAM_SAVE_FLAG_XBZRLE);
487     qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
488     qemu_put_be16(f, encoded_len);
489     qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
490     bytes_xbzrle += encoded_len + 1 + 2;
491     acct_info.xbzrle_pages++;
492     acct_info.xbzrle_bytes += bytes_xbzrle;
493     *bytes_transferred += bytes_xbzrle;
494
495     return 1;
496 }
497
498 /* Called with rcu_read_lock() to protect migration_bitmap */
499 static inline
500 ram_addr_t migration_bitmap_find_and_reset_dirty(MemoryRegion *mr,
501                                                  ram_addr_t start)
502 {
503     unsigned long base = mr->ram_addr >> TARGET_PAGE_BITS;
504     unsigned long nr = base + (start >> TARGET_PAGE_BITS);
505     uint64_t mr_size = TARGET_PAGE_ALIGN(memory_region_size(mr));
506     unsigned long size = base + (mr_size >> TARGET_PAGE_BITS);
507     unsigned long *bitmap;
508
509     unsigned long next;
510
511     bitmap = atomic_rcu_read(&migration_bitmap);
512     if (ram_bulk_stage && nr > base) {
513         next = nr + 1;
514     } else {
515         next = find_next_bit(bitmap, size, nr);
516     }
517
518     if (next < size) {
519         clear_bit(next, bitmap);
520         migration_dirty_pages--;
521     }
522     return (next - base) << TARGET_PAGE_BITS;
523 }
524
525 /* Called with rcu_read_lock() to protect migration_bitmap */
526 static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
527 {
528     unsigned long *bitmap;
529     bitmap = atomic_rcu_read(&migration_bitmap);
530     migration_dirty_pages +=
531         cpu_physical_memory_sync_dirty_bitmap(bitmap, start, length);
532 }
533
534
535 /* Fix me: there are too many global variables used in migration process. */
536 static int64_t start_time;
537 static int64_t bytes_xfer_prev;
538 static int64_t num_dirty_pages_period;
539 static uint64_t xbzrle_cache_miss_prev;
540 static uint64_t iterations_prev;
541
542 static void migration_bitmap_sync_init(void)
543 {
544     start_time = 0;
545     bytes_xfer_prev = 0;
546     num_dirty_pages_period = 0;
547     xbzrle_cache_miss_prev = 0;
548     iterations_prev = 0;
549 }
550
551 /* Called with iothread lock held, to protect ram_list.dirty_memory[] */
552 static void migration_bitmap_sync(void)
553 {
554     RAMBlock *block;
555     uint64_t num_dirty_pages_init = migration_dirty_pages;
556     MigrationState *s = migrate_get_current();
557     int64_t end_time;
558     int64_t bytes_xfer_now;
559
560     bitmap_sync_count++;
561
562     if (!bytes_xfer_prev) {
563         bytes_xfer_prev = ram_bytes_transferred();
564     }
565
566     if (!start_time) {
567         start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
568     }
569
570     trace_migration_bitmap_sync_start();
571     address_space_sync_dirty_bitmap(&address_space_memory);
572
573     qemu_mutex_lock(&migration_bitmap_mutex);
574     rcu_read_lock();
575     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
576         migration_bitmap_sync_range(block->mr->ram_addr, block->used_length);
577     }
578     rcu_read_unlock();
579     qemu_mutex_unlock(&migration_bitmap_mutex);
580
581     trace_migration_bitmap_sync_end(migration_dirty_pages
582                                     - num_dirty_pages_init);
583     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
584     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
585
586     /* more than 1 second = 1000 millisecons */
587     if (end_time > start_time + 1000) {
588         if (migrate_auto_converge()) {
589             /* The following detection logic can be refined later. For now:
590                Check to see if the dirtied bytes is 50% more than the approx.
591                amount of bytes that just got transferred since the last time we
592                were in this routine. If that happens >N times (for now N==4)
593                we turn on the throttle down logic */
594             bytes_xfer_now = ram_bytes_transferred();
595             if (s->dirty_pages_rate &&
596                (num_dirty_pages_period * TARGET_PAGE_SIZE >
597                    (bytes_xfer_now - bytes_xfer_prev)/2) &&
598                (dirty_rate_high_cnt++ > 4)) {
599                     trace_migration_throttle();
600                     mig_throttle_on = true;
601                     dirty_rate_high_cnt = 0;
602              }
603              bytes_xfer_prev = bytes_xfer_now;
604         } else {
605              mig_throttle_on = false;
606         }
607         if (migrate_use_xbzrle()) {
608             if (iterations_prev != acct_info.iterations) {
609                 acct_info.xbzrle_cache_miss_rate =
610                    (double)(acct_info.xbzrle_cache_miss -
611                             xbzrle_cache_miss_prev) /
612                    (acct_info.iterations - iterations_prev);
613             }
614             iterations_prev = acct_info.iterations;
615             xbzrle_cache_miss_prev = acct_info.xbzrle_cache_miss;
616         }
617         s->dirty_pages_rate = num_dirty_pages_period * 1000
618             / (end_time - start_time);
619         s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
620         start_time = end_time;
621         num_dirty_pages_period = 0;
622     }
623     s->dirty_sync_count = bitmap_sync_count;
624 }
625
626 /**
627  * save_zero_page: Send the zero page to the stream
628  *
629  * Returns: Number of pages written.
630  *
631  * @f: QEMUFile where to send the data
632  * @block: block that contains the page we want to send
633  * @offset: offset inside the block for the page
634  * @p: pointer to the page
635  * @bytes_transferred: increase it with the number of transferred bytes
636  */
637 static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
638                           uint8_t *p, uint64_t *bytes_transferred)
639 {
640     int pages = -1;
641
642     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
643         acct_info.dup_pages++;
644         *bytes_transferred += save_page_header(f, block,
645                                                offset | RAM_SAVE_FLAG_COMPRESS);
646         qemu_put_byte(f, 0);
647         *bytes_transferred += 1;
648         pages = 1;
649     }
650
651     return pages;
652 }
653
654 /**
655  * ram_save_page: Send the given page to the stream
656  *
657  * Returns: Number of pages written.
658  *
659  * @f: QEMUFile where to send the data
660  * @block: block that contains the page we want to send
661  * @offset: offset inside the block for the page
662  * @last_stage: if we are at the completion stage
663  * @bytes_transferred: increase it with the number of transferred bytes
664  */
665 static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
666                          bool last_stage, uint64_t *bytes_transferred)
667 {
668     int pages = -1;
669     uint64_t bytes_xmit;
670     ram_addr_t current_addr;
671     MemoryRegion *mr = block->mr;
672     uint8_t *p;
673     int ret;
674     bool send_async = true;
675
676     p = memory_region_get_ram_ptr(mr) + offset;
677
678     /* In doubt sent page as normal */
679     bytes_xmit = 0;
680     ret = ram_control_save_page(f, block->offset,
681                            offset, TARGET_PAGE_SIZE, &bytes_xmit);
682     if (bytes_xmit) {
683         *bytes_transferred += bytes_xmit;
684         pages = 1;
685     }
686
687     XBZRLE_cache_lock();
688
689     current_addr = block->offset + offset;
690
691     if (block == last_sent_block) {
692         offset |= RAM_SAVE_FLAG_CONTINUE;
693     }
694     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
695         if (ret != RAM_SAVE_CONTROL_DELAYED) {
696             if (bytes_xmit > 0) {
697                 acct_info.norm_pages++;
698             } else if (bytes_xmit == 0) {
699                 acct_info.dup_pages++;
700             }
701         }
702     } else {
703         pages = save_zero_page(f, block, offset, p, bytes_transferred);
704         if (pages > 0) {
705             /* Must let xbzrle know, otherwise a previous (now 0'd) cached
706              * page would be stale
707              */
708             xbzrle_cache_zero_page(current_addr);
709         } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
710             pages = save_xbzrle_page(f, &p, current_addr, block,
711                                      offset, last_stage, bytes_transferred);
712             if (!last_stage) {
713                 /* Can't send this cached data async, since the cache page
714                  * might get updated before it gets to the wire
715                  */
716                 send_async = false;
717             }
718         }
719     }
720
721     /* XBZRLE overflow or normal page */
722     if (pages == -1) {
723         *bytes_transferred += save_page_header(f, block,
724                                                offset | RAM_SAVE_FLAG_PAGE);
725         if (send_async) {
726             qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
727         } else {
728             qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
729         }
730         *bytes_transferred += TARGET_PAGE_SIZE;
731         pages = 1;
732         acct_info.norm_pages++;
733     }
734
735     XBZRLE_cache_unlock();
736
737     return pages;
738 }
739
740 static int do_compress_ram_page(CompressParam *param)
741 {
742     int bytes_sent, blen;
743     uint8_t *p;
744     RAMBlock *block = param->block;
745     ram_addr_t offset = param->offset;
746
747     p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
748
749     bytes_sent = save_page_header(param->file, block, offset |
750                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
751     blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
752                                      migrate_compress_level());
753     bytes_sent += blen;
754
755     return bytes_sent;
756 }
757
758 static inline void start_compression(CompressParam *param)
759 {
760     param->done = false;
761     qemu_mutex_lock(&param->mutex);
762     param->start = true;
763     qemu_cond_signal(&param->cond);
764     qemu_mutex_unlock(&param->mutex);
765 }
766
767 static inline void start_decompression(DecompressParam *param)
768 {
769     qemu_mutex_lock(&param->mutex);
770     param->start = true;
771     qemu_cond_signal(&param->cond);
772     qemu_mutex_unlock(&param->mutex);
773 }
774
775 static uint64_t bytes_transferred;
776
777 static void flush_compressed_data(QEMUFile *f)
778 {
779     int idx, len, thread_count;
780
781     if (!migrate_use_compression()) {
782         return;
783     }
784     thread_count = migrate_compress_threads();
785     for (idx = 0; idx < thread_count; idx++) {
786         if (!comp_param[idx].done) {
787             qemu_mutex_lock(comp_done_lock);
788             while (!comp_param[idx].done && !quit_comp_thread) {
789                 qemu_cond_wait(comp_done_cond, comp_done_lock);
790             }
791             qemu_mutex_unlock(comp_done_lock);
792         }
793         if (!quit_comp_thread) {
794             len = qemu_put_qemu_file(f, comp_param[idx].file);
795             bytes_transferred += len;
796         }
797     }
798 }
799
800 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
801                                        ram_addr_t offset)
802 {
803     param->block = block;
804     param->offset = offset;
805 }
806
807 static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
808                                            ram_addr_t offset,
809                                            uint64_t *bytes_transferred)
810 {
811     int idx, thread_count, bytes_xmit = -1, pages = -1;
812
813     thread_count = migrate_compress_threads();
814     qemu_mutex_lock(comp_done_lock);
815     while (true) {
816         for (idx = 0; idx < thread_count; idx++) {
817             if (comp_param[idx].done) {
818                 bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
819                 set_compress_params(&comp_param[idx], block, offset);
820                 start_compression(&comp_param[idx]);
821                 pages = 1;
822                 acct_info.norm_pages++;
823                 *bytes_transferred += bytes_xmit;
824                 break;
825             }
826         }
827         if (pages > 0) {
828             break;
829         } else {
830             qemu_cond_wait(comp_done_cond, comp_done_lock);
831         }
832     }
833     qemu_mutex_unlock(comp_done_lock);
834
835     return pages;
836 }
837
838 /**
839  * ram_save_compressed_page: compress the given page and send it to the stream
840  *
841  * Returns: Number of pages written.
842  *
843  * @f: QEMUFile where to send the data
844  * @block: block that contains the page we want to send
845  * @offset: offset inside the block for the page
846  * @last_stage: if we are at the completion stage
847  * @bytes_transferred: increase it with the number of transferred bytes
848  */
849 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
850                                     ram_addr_t offset, bool last_stage,
851                                     uint64_t *bytes_transferred)
852 {
853     int pages = -1;
854     uint64_t bytes_xmit;
855     MemoryRegion *mr = block->mr;
856     uint8_t *p;
857     int ret;
858
859     p = memory_region_get_ram_ptr(mr) + offset;
860
861     bytes_xmit = 0;
862     ret = ram_control_save_page(f, block->offset,
863                                 offset, TARGET_PAGE_SIZE, &bytes_xmit);
864     if (bytes_xmit) {
865         *bytes_transferred += bytes_xmit;
866         pages = 1;
867     }
868     if (block == last_sent_block) {
869         offset |= RAM_SAVE_FLAG_CONTINUE;
870     }
871     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
872         if (ret != RAM_SAVE_CONTROL_DELAYED) {
873             if (bytes_xmit > 0) {
874                 acct_info.norm_pages++;
875             } else if (bytes_xmit == 0) {
876                 acct_info.dup_pages++;
877             }
878         }
879     } else {
880         /* When starting the process of a new block, the first page of
881          * the block should be sent out before other pages in the same
882          * block, and all the pages in last block should have been sent
883          * out, keeping this order is important, because the 'cont' flag
884          * is used to avoid resending the block name.
885          */
886         if (block != last_sent_block) {
887             flush_compressed_data(f);
888             pages = save_zero_page(f, block, offset, p, bytes_transferred);
889             if (pages == -1) {
890                 set_compress_params(&comp_param[0], block, offset);
891                 /* Use the qemu thread to compress the data to make sure the
892                  * first page is sent out before other pages
893                  */
894                 bytes_xmit = do_compress_ram_page(&comp_param[0]);
895                 acct_info.norm_pages++;
896                 qemu_put_qemu_file(f, comp_param[0].file);
897                 *bytes_transferred += bytes_xmit;
898                 pages = 1;
899             }
900         } else {
901             pages = save_zero_page(f, block, offset, p, bytes_transferred);
902             if (pages == -1) {
903                 pages = compress_page_with_multi_thread(f, block, offset,
904                                                         bytes_transferred);
905             }
906         }
907     }
908
909     return pages;
910 }
911
912 /**
913  * ram_find_and_save_block: Finds a dirty page and sends it to f
914  *
915  * Called within an RCU critical section.
916  *
917  * Returns:  The number of pages written
918  *           0 means no dirty pages
919  *
920  * @f: QEMUFile where to send the data
921  * @last_stage: if we are at the completion stage
922  * @bytes_transferred: increase it with the number of transferred bytes
923  */
924
925 static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
926                                    uint64_t *bytes_transferred)
927 {
928     RAMBlock *block = last_seen_block;
929     ram_addr_t offset = last_offset;
930     bool complete_round = false;
931     int pages = 0;
932     MemoryRegion *mr;
933
934     if (!block)
935         block = QLIST_FIRST_RCU(&ram_list.blocks);
936
937     while (true) {
938         mr = block->mr;
939         offset = migration_bitmap_find_and_reset_dirty(mr, offset);
940         if (complete_round && block == last_seen_block &&
941             offset >= last_offset) {
942             break;
943         }
944         if (offset >= block->used_length) {
945             offset = 0;
946             block = QLIST_NEXT_RCU(block, next);
947             if (!block) {
948                 block = QLIST_FIRST_RCU(&ram_list.blocks);
949                 complete_round = true;
950                 ram_bulk_stage = false;
951                 if (migrate_use_xbzrle()) {
952                     /* If xbzrle is on, stop using the data compression at this
953                      * point. In theory, xbzrle can do better than compression.
954                      */
955                     flush_compressed_data(f);
956                     compression_switch = false;
957                 }
958             }
959         } else {
960             if (compression_switch && migrate_use_compression()) {
961                 pages = ram_save_compressed_page(f, block, offset, last_stage,
962                                                  bytes_transferred);
963             } else {
964                 pages = ram_save_page(f, block, offset, last_stage,
965                                       bytes_transferred);
966             }
967
968             /* if page is unmodified, continue to the next */
969             if (pages > 0) {
970                 last_sent_block = block;
971                 break;
972             }
973         }
974     }
975
976     last_seen_block = block;
977     last_offset = offset;
978
979     return pages;
980 }
981
982 void acct_update_position(QEMUFile *f, size_t size, bool zero)
983 {
984     uint64_t pages = size / TARGET_PAGE_SIZE;
985     if (zero) {
986         acct_info.dup_pages += pages;
987     } else {
988         acct_info.norm_pages += pages;
989         bytes_transferred += size;
990         qemu_update_position(f, size);
991     }
992 }
993
994 static ram_addr_t ram_save_remaining(void)
995 {
996     return migration_dirty_pages;
997 }
998
999 uint64_t ram_bytes_remaining(void)
1000 {
1001     return ram_save_remaining() * TARGET_PAGE_SIZE;
1002 }
1003
1004 uint64_t ram_bytes_transferred(void)
1005 {
1006     return bytes_transferred;
1007 }
1008
1009 uint64_t ram_bytes_total(void)
1010 {
1011     RAMBlock *block;
1012     uint64_t total = 0;
1013
1014     rcu_read_lock();
1015     QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
1016         total += block->used_length;
1017     rcu_read_unlock();
1018     return total;
1019 }
1020
1021 void free_xbzrle_decoded_buf(void)
1022 {
1023     g_free(xbzrle_decoded_buf);
1024     xbzrle_decoded_buf = NULL;
1025 }
1026
1027 static void migration_end(void)
1028 {
1029     /* caller have hold iothread lock or is in a bh, so there is
1030      * no writing race against this migration_bitmap
1031      */
1032     unsigned long *bitmap = migration_bitmap;
1033     atomic_rcu_set(&migration_bitmap, NULL);
1034     if (bitmap) {
1035         memory_global_dirty_log_stop();
1036         synchronize_rcu();
1037         g_free(bitmap);
1038     }
1039
1040     XBZRLE_cache_lock();
1041     if (XBZRLE.cache) {
1042         cache_fini(XBZRLE.cache);
1043         g_free(XBZRLE.encoded_buf);
1044         g_free(XBZRLE.current_buf);
1045         XBZRLE.cache = NULL;
1046         XBZRLE.encoded_buf = NULL;
1047         XBZRLE.current_buf = NULL;
1048     }
1049     XBZRLE_cache_unlock();
1050 }
1051
1052 static void ram_migration_cancel(void *opaque)
1053 {
1054     migration_end();
1055 }
1056
1057 static void reset_ram_globals(void)
1058 {
1059     last_seen_block = NULL;
1060     last_sent_block = NULL;
1061     last_offset = 0;
1062     last_version = ram_list.version;
1063     ram_bulk_stage = true;
1064 }
1065
1066 #define MAX_WAIT 50 /* ms, half buffered_file limit */
1067
1068 void migration_bitmap_extend(ram_addr_t old, ram_addr_t new)
1069 {
1070     /* called in qemu main thread, so there is
1071      * no writing race against this migration_bitmap
1072      */
1073     if (migration_bitmap) {
1074         unsigned long *old_bitmap = migration_bitmap, *bitmap;
1075         bitmap = bitmap_new(new);
1076
1077         /* prevent migration_bitmap content from being set bit
1078          * by migration_bitmap_sync_range() at the same time.
1079          * it is safe to migration if migration_bitmap is cleared bit
1080          * at the same time.
1081          */
1082         qemu_mutex_lock(&migration_bitmap_mutex);
1083         bitmap_copy(bitmap, old_bitmap, old);
1084         bitmap_set(bitmap, old, new - old);
1085         atomic_rcu_set(&migration_bitmap, bitmap);
1086         qemu_mutex_unlock(&migration_bitmap_mutex);
1087         migration_dirty_pages += new - old;
1088         synchronize_rcu();
1089         g_free(old_bitmap);
1090     }
1091 }
1092
1093 /* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
1094  * long-running RCU critical section.  When rcu-reclaims in the code
1095  * start to become numerous it will be necessary to reduce the
1096  * granularity of these critical sections.
1097  */
1098
1099 static int ram_save_setup(QEMUFile *f, void *opaque)
1100 {
1101     RAMBlock *block;
1102     int64_t ram_bitmap_pages; /* Size of bitmap in pages, including gaps */
1103
1104     mig_throttle_on = false;
1105     dirty_rate_high_cnt = 0;
1106     bitmap_sync_count = 0;
1107     migration_bitmap_sync_init();
1108     qemu_mutex_init(&migration_bitmap_mutex);
1109
1110     if (migrate_use_xbzrle()) {
1111         XBZRLE_cache_lock();
1112         XBZRLE.cache = cache_init(migrate_xbzrle_cache_size() /
1113                                   TARGET_PAGE_SIZE,
1114                                   TARGET_PAGE_SIZE);
1115         if (!XBZRLE.cache) {
1116             XBZRLE_cache_unlock();
1117             error_report("Error creating cache");
1118             return -1;
1119         }
1120         XBZRLE_cache_unlock();
1121
1122         /* We prefer not to abort if there is no memory */
1123         XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
1124         if (!XBZRLE.encoded_buf) {
1125             error_report("Error allocating encoded_buf");
1126             return -1;
1127         }
1128
1129         XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
1130         if (!XBZRLE.current_buf) {
1131             error_report("Error allocating current_buf");
1132             g_free(XBZRLE.encoded_buf);
1133             XBZRLE.encoded_buf = NULL;
1134             return -1;
1135         }
1136
1137         acct_clear();
1138     }
1139
1140     /* iothread lock needed for ram_list.dirty_memory[] */
1141     qemu_mutex_lock_iothread();
1142     qemu_mutex_lock_ramlist();
1143     rcu_read_lock();
1144     bytes_transferred = 0;
1145     reset_ram_globals();
1146
1147     ram_bitmap_pages = last_ram_offset() >> TARGET_PAGE_BITS;
1148     migration_bitmap = bitmap_new(ram_bitmap_pages);
1149     bitmap_set(migration_bitmap, 0, ram_bitmap_pages);
1150
1151     /*
1152      * Count the total number of pages used by ram blocks not including any
1153      * gaps due to alignment or unplugs.
1154      */
1155     migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
1156
1157     memory_global_dirty_log_start();
1158     migration_bitmap_sync();
1159     qemu_mutex_unlock_ramlist();
1160     qemu_mutex_unlock_iothread();
1161
1162     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
1163
1164     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1165         qemu_put_byte(f, strlen(block->idstr));
1166         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
1167         qemu_put_be64(f, block->used_length);
1168     }
1169
1170     rcu_read_unlock();
1171
1172     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
1173     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
1174
1175     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1176
1177     return 0;
1178 }
1179
1180 static int ram_save_iterate(QEMUFile *f, void *opaque)
1181 {
1182     int ret;
1183     int i;
1184     int64_t t0;
1185     int pages_sent = 0;
1186
1187     rcu_read_lock();
1188     if (ram_list.version != last_version) {
1189         reset_ram_globals();
1190     }
1191
1192     /* Read version before ram_list.blocks */
1193     smp_rmb();
1194
1195     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
1196
1197     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1198     i = 0;
1199     while ((ret = qemu_file_rate_limit(f)) == 0) {
1200         int pages;
1201
1202         pages = ram_find_and_save_block(f, false, &bytes_transferred);
1203         /* no more pages to sent */
1204         if (pages == 0) {
1205             break;
1206         }
1207         pages_sent += pages;
1208         acct_info.iterations++;
1209         check_guest_throttling();
1210         /* we want to check in the 1st loop, just in case it was the 1st time
1211            and we had to sync the dirty bitmap.
1212            qemu_get_clock_ns() is a bit expensive, so we only check each some
1213            iterations
1214         */
1215         if ((i & 63) == 0) {
1216             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
1217             if (t1 > MAX_WAIT) {
1218                 DPRINTF("big wait: %" PRIu64 " milliseconds, %d iterations\n",
1219                         t1, i);
1220                 break;
1221             }
1222         }
1223         i++;
1224     }
1225     flush_compressed_data(f);
1226     rcu_read_unlock();
1227
1228     /*
1229      * Must occur before EOS (or any QEMUFile operation)
1230      * because of RDMA protocol.
1231      */
1232     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
1233
1234     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1235     bytes_transferred += 8;
1236
1237     ret = qemu_file_get_error(f);
1238     if (ret < 0) {
1239         return ret;
1240     }
1241
1242     return pages_sent;
1243 }
1244
1245 /* Called with iothread lock */
1246 static int ram_save_complete(QEMUFile *f, void *opaque)
1247 {
1248     rcu_read_lock();
1249
1250     migration_bitmap_sync();
1251
1252     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
1253
1254     /* try transferring iterative blocks of memory */
1255
1256     /* flush all remaining blocks regardless of rate limiting */
1257     while (true) {
1258         int pages;
1259
1260         pages = ram_find_and_save_block(f, true, &bytes_transferred);
1261         /* no more blocks to sent */
1262         if (pages == 0) {
1263             break;
1264         }
1265     }
1266
1267     flush_compressed_data(f);
1268     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
1269
1270     rcu_read_unlock();
1271
1272     migration_end();
1273     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1274
1275     return 0;
1276 }
1277
1278 static uint64_t ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
1279 {
1280     uint64_t remaining_size;
1281
1282     remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1283
1284     if (remaining_size < max_size) {
1285         qemu_mutex_lock_iothread();
1286         rcu_read_lock();
1287         migration_bitmap_sync();
1288         rcu_read_unlock();
1289         qemu_mutex_unlock_iothread();
1290         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1291     }
1292     return remaining_size;
1293 }
1294
1295 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
1296 {
1297     unsigned int xh_len;
1298     int xh_flags;
1299
1300     if (!xbzrle_decoded_buf) {
1301         xbzrle_decoded_buf = g_malloc(TARGET_PAGE_SIZE);
1302     }
1303
1304     /* extract RLE header */
1305     xh_flags = qemu_get_byte(f);
1306     xh_len = qemu_get_be16(f);
1307
1308     if (xh_flags != ENCODING_FLAG_XBZRLE) {
1309         error_report("Failed to load XBZRLE page - wrong compression!");
1310         return -1;
1311     }
1312
1313     if (xh_len > TARGET_PAGE_SIZE) {
1314         error_report("Failed to load XBZRLE page - len overflow!");
1315         return -1;
1316     }
1317     /* load data and decode */
1318     qemu_get_buffer(f, xbzrle_decoded_buf, xh_len);
1319
1320     /* decode RLE */
1321     if (xbzrle_decode_buffer(xbzrle_decoded_buf, xh_len, host,
1322                              TARGET_PAGE_SIZE) == -1) {
1323         error_report("Failed to load XBZRLE page - decode error!");
1324         return -1;
1325     }
1326
1327     return 0;
1328 }
1329
1330 /* Must be called from within a rcu critical section.
1331  * Returns a pointer from within the RCU-protected ram_list.
1332  */
1333 static inline void *host_from_stream_offset(QEMUFile *f,
1334                                             ram_addr_t offset,
1335                                             int flags)
1336 {
1337     static RAMBlock *block = NULL;
1338     char id[256];
1339     uint8_t len;
1340
1341     if (flags & RAM_SAVE_FLAG_CONTINUE) {
1342         if (!block || block->max_length <= offset) {
1343             error_report("Ack, bad migration stream!");
1344             return NULL;
1345         }
1346
1347         return memory_region_get_ram_ptr(block->mr) + offset;
1348     }
1349
1350     len = qemu_get_byte(f);
1351     qemu_get_buffer(f, (uint8_t *)id, len);
1352     id[len] = 0;
1353
1354     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1355         if (!strncmp(id, block->idstr, sizeof(id)) &&
1356             block->max_length > offset) {
1357             return memory_region_get_ram_ptr(block->mr) + offset;
1358         }
1359     }
1360
1361     error_report("Can't find block %s!", id);
1362     return NULL;
1363 }
1364
1365 /*
1366  * If a page (or a whole RDMA chunk) has been
1367  * determined to be zero, then zap it.
1368  */
1369 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
1370 {
1371     if (ch != 0 || !is_zero_range(host, size)) {
1372         memset(host, ch, size);
1373     }
1374 }
1375
1376 static void *do_data_decompress(void *opaque)
1377 {
1378     DecompressParam *param = opaque;
1379     unsigned long pagesize;
1380
1381     while (!quit_decomp_thread) {
1382         qemu_mutex_lock(&param->mutex);
1383         while (!param->start && !quit_decomp_thread) {
1384             qemu_cond_wait(&param->cond, &param->mutex);
1385             pagesize = TARGET_PAGE_SIZE;
1386             if (!quit_decomp_thread) {
1387                 /* uncompress() will return failed in some case, especially
1388                  * when the page is dirted when doing the compression, it's
1389                  * not a problem because the dirty page will be retransferred
1390                  * and uncompress() won't break the data in other pages.
1391                  */
1392                 uncompress((Bytef *)param->des, &pagesize,
1393                            (const Bytef *)param->compbuf, param->len);
1394             }
1395             param->start = false;
1396         }
1397         qemu_mutex_unlock(&param->mutex);
1398     }
1399
1400     return NULL;
1401 }
1402
1403 void migrate_decompress_threads_create(void)
1404 {
1405     int i, thread_count;
1406
1407     thread_count = migrate_decompress_threads();
1408     decompress_threads = g_new0(QemuThread, thread_count);
1409     decomp_param = g_new0(DecompressParam, thread_count);
1410     compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1411     quit_decomp_thread = false;
1412     for (i = 0; i < thread_count; i++) {
1413         qemu_mutex_init(&decomp_param[i].mutex);
1414         qemu_cond_init(&decomp_param[i].cond);
1415         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1416         qemu_thread_create(decompress_threads + i, "decompress",
1417                            do_data_decompress, decomp_param + i,
1418                            QEMU_THREAD_JOINABLE);
1419     }
1420 }
1421
1422 void migrate_decompress_threads_join(void)
1423 {
1424     int i, thread_count;
1425
1426     quit_decomp_thread = true;
1427     thread_count = migrate_decompress_threads();
1428     for (i = 0; i < thread_count; i++) {
1429         qemu_mutex_lock(&decomp_param[i].mutex);
1430         qemu_cond_signal(&decomp_param[i].cond);
1431         qemu_mutex_unlock(&decomp_param[i].mutex);
1432     }
1433     for (i = 0; i < thread_count; i++) {
1434         qemu_thread_join(decompress_threads + i);
1435         qemu_mutex_destroy(&decomp_param[i].mutex);
1436         qemu_cond_destroy(&decomp_param[i].cond);
1437         g_free(decomp_param[i].compbuf);
1438     }
1439     g_free(decompress_threads);
1440     g_free(decomp_param);
1441     g_free(compressed_data_buf);
1442     decompress_threads = NULL;
1443     decomp_param = NULL;
1444     compressed_data_buf = NULL;
1445 }
1446
1447 static void decompress_data_with_multi_threads(uint8_t *compbuf,
1448                                                void *host, int len)
1449 {
1450     int idx, thread_count;
1451
1452     thread_count = migrate_decompress_threads();
1453     while (true) {
1454         for (idx = 0; idx < thread_count; idx++) {
1455             if (!decomp_param[idx].start) {
1456                 memcpy(decomp_param[idx].compbuf, compbuf, len);
1457                 decomp_param[idx].des = host;
1458                 decomp_param[idx].len = len;
1459                 start_decompression(&decomp_param[idx]);
1460                 break;
1461             }
1462         }
1463         if (idx < thread_count) {
1464             break;
1465         }
1466     }
1467 }
1468
1469 static int ram_load(QEMUFile *f, void *opaque, int version_id)
1470 {
1471     int flags = 0, ret = 0;
1472     static uint64_t seq_iter;
1473     int len = 0;
1474
1475     seq_iter++;
1476
1477     if (version_id != 4) {
1478         ret = -EINVAL;
1479     }
1480
1481     /* This RCU critical section can be very long running.
1482      * When RCU reclaims in the code start to become numerous,
1483      * it will be necessary to reduce the granularity of this
1484      * critical section.
1485      */
1486     rcu_read_lock();
1487     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
1488         ram_addr_t addr, total_ram_bytes;
1489         void *host;
1490         uint8_t ch;
1491
1492         addr = qemu_get_be64(f);
1493         flags = addr & ~TARGET_PAGE_MASK;
1494         addr &= TARGET_PAGE_MASK;
1495
1496         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
1497         case RAM_SAVE_FLAG_MEM_SIZE:
1498             /* Synchronize RAM block list */
1499             total_ram_bytes = addr;
1500             while (!ret && total_ram_bytes) {
1501                 RAMBlock *block;
1502                 char id[256];
1503                 ram_addr_t length;
1504
1505                 len = qemu_get_byte(f);
1506                 qemu_get_buffer(f, (uint8_t *)id, len);
1507                 id[len] = 0;
1508                 length = qemu_get_be64(f);
1509
1510                 QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1511                     if (!strncmp(id, block->idstr, sizeof(id))) {
1512                         if (length != block->used_length) {
1513                             Error *local_err = NULL;
1514
1515                             ret = qemu_ram_resize(block->offset, length, &local_err);
1516                             if (local_err) {
1517                                 error_report_err(local_err);
1518                             }
1519                         }
1520                         ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
1521                                               block->idstr);
1522                         break;
1523                     }
1524                 }
1525
1526                 if (!block) {
1527                     error_report("Unknown ramblock \"%s\", cannot "
1528                                  "accept migration", id);
1529                     ret = -EINVAL;
1530                 }
1531
1532                 total_ram_bytes -= length;
1533             }
1534             break;
1535         case RAM_SAVE_FLAG_COMPRESS:
1536             host = host_from_stream_offset(f, addr, flags);
1537             if (!host) {
1538                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1539                 ret = -EINVAL;
1540                 break;
1541             }
1542             ch = qemu_get_byte(f);
1543             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
1544             break;
1545         case RAM_SAVE_FLAG_PAGE:
1546             host = host_from_stream_offset(f, addr, flags);
1547             if (!host) {
1548                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1549                 ret = -EINVAL;
1550                 break;
1551             }
1552             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
1553             break;
1554         case RAM_SAVE_FLAG_COMPRESS_PAGE:
1555             host = host_from_stream_offset(f, addr, flags);
1556             if (!host) {
1557                 error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
1558                 ret = -EINVAL;
1559                 break;
1560             }
1561
1562             len = qemu_get_be32(f);
1563             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
1564                 error_report("Invalid compressed data length: %d", len);
1565                 ret = -EINVAL;
1566                 break;
1567             }
1568             qemu_get_buffer(f, compressed_data_buf, len);
1569             decompress_data_with_multi_threads(compressed_data_buf, host, len);
1570             break;
1571         case RAM_SAVE_FLAG_XBZRLE:
1572             host = host_from_stream_offset(f, addr, flags);
1573             if (!host) {
1574                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1575                 ret = -EINVAL;
1576                 break;
1577             }
1578             if (load_xbzrle(f, addr, host) < 0) {
1579                 error_report("Failed to decompress XBZRLE page at "
1580                              RAM_ADDR_FMT, addr);
1581                 ret = -EINVAL;
1582                 break;
1583             }
1584             break;
1585         case RAM_SAVE_FLAG_EOS:
1586             /* normal exit */
1587             break;
1588         default:
1589             if (flags & RAM_SAVE_FLAG_HOOK) {
1590                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
1591             } else {
1592                 error_report("Unknown combination of migration flags: %#x",
1593                              flags);
1594                 ret = -EINVAL;
1595             }
1596         }
1597         if (!ret) {
1598             ret = qemu_file_get_error(f);
1599         }
1600     }
1601
1602     rcu_read_unlock();
1603     DPRINTF("Completed load of VM with exit code %d seq iteration "
1604             "%" PRIu64 "\n", ret, seq_iter);
1605     return ret;
1606 }
1607
1608 static SaveVMHandlers savevm_ram_handlers = {
1609     .save_live_setup = ram_save_setup,
1610     .save_live_iterate = ram_save_iterate,
1611     .save_live_complete = ram_save_complete,
1612     .save_live_pending = ram_save_pending,
1613     .load_state = ram_load,
1614     .cancel = ram_migration_cancel,
1615 };
1616
1617 void ram_mig_init(void)
1618 {
1619     qemu_mutex_init(&XBZRLE.lock);
1620     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
1621 }
1622 /* Stub function that's gets run on the vcpu when its brought out of the
1623    VM to run inside qemu via async_run_on_cpu()*/
1624
1625 static void mig_sleep_cpu(void *opq)
1626 {
1627     qemu_mutex_unlock_iothread();
1628     g_usleep(30*1000);
1629     qemu_mutex_lock_iothread();
1630 }
1631
1632 /* To reduce the dirty rate explicitly disallow the VCPUs from spending
1633    much time in the VM. The migration thread will try to catchup.
1634    Workload will experience a performance drop.
1635 */
1636 static void mig_throttle_guest_down(void)
1637 {
1638     CPUState *cpu;
1639
1640     qemu_mutex_lock_iothread();
1641     CPU_FOREACH(cpu) {
1642         async_run_on_cpu(cpu, mig_sleep_cpu, NULL);
1643     }
1644     qemu_mutex_unlock_iothread();
1645 }
1646
1647 static void check_guest_throttling(void)
1648 {
1649     static int64_t t0;
1650     int64_t        t1;
1651
1652     if (!mig_throttle_on) {
1653         return;
1654     }
1655
1656     if (!t0)  {
1657         t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1658         return;
1659     }
1660
1661     t1 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1662
1663     /* If it has been more than 40 ms since the last time the guest
1664      * was throttled then do it again.
1665      */
1666     if (40 < (t1-t0)/1000000) {
1667         mig_throttle_guest_down();
1668         t0 = t1;
1669     }
1670 }