migration: do cleanup operation after completion
[kvmfornfv.git] / qemu / migration / ram.c
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 #include <stdint.h>
29 #include <zlib.h>
30 #include "qemu/bitops.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/timer.h"
33 #include "qemu/main-loop.h"
34 #include "migration/migration.h"
35 #include "exec/address-spaces.h"
36 #include "migration/page_cache.h"
37 #include "qemu/error-report.h"
38 #include "trace.h"
39 #include "exec/ram_addr.h"
40 #include "qemu/rcu_queue.h"
41
42 #ifdef DEBUG_MIGRATION_RAM
43 #define DPRINTF(fmt, ...) \
44     do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49
50 static bool mig_throttle_on;
51 static int dirty_rate_high_cnt;
52 static void check_guest_throttling(void);
53
54 static uint64_t bitmap_sync_count;
55
56 /***********************************************************/
57 /* ram save/restore */
58
59 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
60 #define RAM_SAVE_FLAG_COMPRESS 0x02
61 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
62 #define RAM_SAVE_FLAG_PAGE     0x08
63 #define RAM_SAVE_FLAG_EOS      0x10
64 #define RAM_SAVE_FLAG_CONTINUE 0x20
65 #define RAM_SAVE_FLAG_XBZRLE   0x40
66 /* 0x80 is reserved in migration.h start with 0x100 next */
67 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
68
69 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
70
71 static inline bool is_zero_range(uint8_t *p, uint64_t size)
72 {
73     return buffer_find_nonzero_offset(p, size) == size;
74 }
75
76 /* struct contains XBZRLE cache and a static page
77    used by the compression */
78 static struct {
79     /* buffer used for XBZRLE encoding */
80     uint8_t *encoded_buf;
81     /* buffer for storing page content */
82     uint8_t *current_buf;
83     /* Cache for XBZRLE, Protected by lock. */
84     PageCache *cache;
85     QemuMutex lock;
86 } XBZRLE;
87
88 /* buffer used for XBZRLE decoding */
89 static uint8_t *xbzrle_decoded_buf;
90
91 static void XBZRLE_cache_lock(void)
92 {
93     if (migrate_use_xbzrle())
94         qemu_mutex_lock(&XBZRLE.lock);
95 }
96
97 static void XBZRLE_cache_unlock(void)
98 {
99     if (migrate_use_xbzrle())
100         qemu_mutex_unlock(&XBZRLE.lock);
101 }
102
103 /*
104  * called from qmp_migrate_set_cache_size in main thread, possibly while
105  * a migration is in progress.
106  * A running migration maybe using the cache and might finish during this
107  * call, hence changes to the cache are protected by XBZRLE.lock().
108  */
109 int64_t xbzrle_cache_resize(int64_t new_size)
110 {
111     PageCache *new_cache;
112     int64_t ret;
113
114     if (new_size < TARGET_PAGE_SIZE) {
115         return -1;
116     }
117
118     XBZRLE_cache_lock();
119
120     if (XBZRLE.cache != NULL) {
121         if (pow2floor(new_size) == migrate_xbzrle_cache_size()) {
122             goto out_new_size;
123         }
124         new_cache = cache_init(new_size / TARGET_PAGE_SIZE,
125                                         TARGET_PAGE_SIZE);
126         if (!new_cache) {
127             error_report("Error creating cache");
128             ret = -1;
129             goto out;
130         }
131
132         cache_fini(XBZRLE.cache);
133         XBZRLE.cache = new_cache;
134     }
135
136 out_new_size:
137     ret = pow2floor(new_size);
138 out:
139     XBZRLE_cache_unlock();
140     return ret;
141 }
142
143 /* accounting for migration statistics */
144 typedef struct AccountingInfo {
145     uint64_t dup_pages;
146     uint64_t skipped_pages;
147     uint64_t norm_pages;
148     uint64_t iterations;
149     uint64_t xbzrle_bytes;
150     uint64_t xbzrle_pages;
151     uint64_t xbzrle_cache_miss;
152     double xbzrle_cache_miss_rate;
153     uint64_t xbzrle_overflows;
154 } AccountingInfo;
155
156 static AccountingInfo acct_info;
157
158 static void acct_clear(void)
159 {
160     memset(&acct_info, 0, sizeof(acct_info));
161 }
162
163 uint64_t dup_mig_bytes_transferred(void)
164 {
165     return acct_info.dup_pages * TARGET_PAGE_SIZE;
166 }
167
168 uint64_t dup_mig_pages_transferred(void)
169 {
170     return acct_info.dup_pages;
171 }
172
173 uint64_t skipped_mig_bytes_transferred(void)
174 {
175     return acct_info.skipped_pages * TARGET_PAGE_SIZE;
176 }
177
178 uint64_t skipped_mig_pages_transferred(void)
179 {
180     return acct_info.skipped_pages;
181 }
182
183 uint64_t norm_mig_bytes_transferred(void)
184 {
185     return acct_info.norm_pages * TARGET_PAGE_SIZE;
186 }
187
188 uint64_t norm_mig_pages_transferred(void)
189 {
190     return acct_info.norm_pages;
191 }
192
193 uint64_t xbzrle_mig_bytes_transferred(void)
194 {
195     return acct_info.xbzrle_bytes;
196 }
197
198 uint64_t xbzrle_mig_pages_transferred(void)
199 {
200     return acct_info.xbzrle_pages;
201 }
202
203 uint64_t xbzrle_mig_pages_cache_miss(void)
204 {
205     return acct_info.xbzrle_cache_miss;
206 }
207
208 double xbzrle_mig_cache_miss_rate(void)
209 {
210     return acct_info.xbzrle_cache_miss_rate;
211 }
212
213 uint64_t xbzrle_mig_pages_overflow(void)
214 {
215     return acct_info.xbzrle_overflows;
216 }
217
218 /* This is the last block that we have visited serching for dirty pages
219  */
220 static RAMBlock *last_seen_block;
221 /* This is the last block from where we have sent data */
222 static RAMBlock *last_sent_block;
223 static ram_addr_t last_offset;
224 static unsigned long *migration_bitmap;
225 static QemuMutex migration_bitmap_mutex;
226 static uint64_t migration_dirty_pages;
227 static uint32_t last_version;
228 static bool ram_bulk_stage;
229
230 struct CompressParam {
231     bool start;
232     bool done;
233     QEMUFile *file;
234     QemuMutex mutex;
235     QemuCond cond;
236     RAMBlock *block;
237     ram_addr_t offset;
238 };
239 typedef struct CompressParam CompressParam;
240
241 struct DecompressParam {
242     bool start;
243     QemuMutex mutex;
244     QemuCond cond;
245     void *des;
246     uint8 *compbuf;
247     int len;
248 };
249 typedef struct DecompressParam DecompressParam;
250
251 static CompressParam *comp_param;
252 static QemuThread *compress_threads;
253 /* comp_done_cond is used to wake up the migration thread when
254  * one of the compression threads has finished the compression.
255  * comp_done_lock is used to co-work with comp_done_cond.
256  */
257 static QemuMutex *comp_done_lock;
258 static QemuCond *comp_done_cond;
259 /* The empty QEMUFileOps will be used by file in CompressParam */
260 static const QEMUFileOps empty_ops = { };
261
262 static bool compression_switch;
263 static bool quit_comp_thread;
264 static bool quit_decomp_thread;
265 static DecompressParam *decomp_param;
266 static QemuThread *decompress_threads;
267 static uint8_t *compressed_data_buf;
268
269 static int do_compress_ram_page(CompressParam *param);
270
271 static void *do_data_compress(void *opaque)
272 {
273     CompressParam *param = opaque;
274
275     while (!quit_comp_thread) {
276         qemu_mutex_lock(&param->mutex);
277         /* Re-check the quit_comp_thread in case of
278          * terminate_compression_threads is called just before
279          * qemu_mutex_lock(&param->mutex) and after
280          * while(!quit_comp_thread), re-check it here can make
281          * sure the compression thread terminate as expected.
282          */
283         while (!param->start && !quit_comp_thread) {
284             qemu_cond_wait(&param->cond, &param->mutex);
285         }
286         if (!quit_comp_thread) {
287             do_compress_ram_page(param);
288         }
289         param->start = false;
290         qemu_mutex_unlock(&param->mutex);
291
292         qemu_mutex_lock(comp_done_lock);
293         param->done = true;
294         qemu_cond_signal(comp_done_cond);
295         qemu_mutex_unlock(comp_done_lock);
296     }
297
298     return NULL;
299 }
300
301 static inline void terminate_compression_threads(void)
302 {
303     int idx, thread_count;
304
305     thread_count = migrate_compress_threads();
306     quit_comp_thread = true;
307     for (idx = 0; idx < thread_count; idx++) {
308         qemu_mutex_lock(&comp_param[idx].mutex);
309         qemu_cond_signal(&comp_param[idx].cond);
310         qemu_mutex_unlock(&comp_param[idx].mutex);
311     }
312 }
313
314 void migrate_compress_threads_join(void)
315 {
316     int i, thread_count;
317
318     if (!migrate_use_compression()) {
319         return;
320     }
321     terminate_compression_threads();
322     thread_count = migrate_compress_threads();
323     for (i = 0; i < thread_count; i++) {
324         qemu_thread_join(compress_threads + i);
325         qemu_fclose(comp_param[i].file);
326         qemu_mutex_destroy(&comp_param[i].mutex);
327         qemu_cond_destroy(&comp_param[i].cond);
328     }
329     qemu_mutex_destroy(comp_done_lock);
330     qemu_cond_destroy(comp_done_cond);
331     g_free(compress_threads);
332     g_free(comp_param);
333     g_free(comp_done_cond);
334     g_free(comp_done_lock);
335     compress_threads = NULL;
336     comp_param = NULL;
337     comp_done_cond = NULL;
338     comp_done_lock = NULL;
339 }
340
341 void migrate_compress_threads_create(void)
342 {
343     int i, thread_count;
344
345     if (!migrate_use_compression()) {
346         return;
347     }
348     quit_comp_thread = false;
349     compression_switch = true;
350     thread_count = migrate_compress_threads();
351     compress_threads = g_new0(QemuThread, thread_count);
352     comp_param = g_new0(CompressParam, thread_count);
353     comp_done_cond = g_new0(QemuCond, 1);
354     comp_done_lock = g_new0(QemuMutex, 1);
355     qemu_cond_init(comp_done_cond);
356     qemu_mutex_init(comp_done_lock);
357     for (i = 0; i < thread_count; i++) {
358         /* com_param[i].file is just used as a dummy buffer to save data, set
359          * it's ops to empty.
360          */
361         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
362         comp_param[i].done = true;
363         qemu_mutex_init(&comp_param[i].mutex);
364         qemu_cond_init(&comp_param[i].cond);
365         qemu_thread_create(compress_threads + i, "compress",
366                            do_data_compress, comp_param + i,
367                            QEMU_THREAD_JOINABLE);
368     }
369 }
370
371 /**
372  * save_page_header: Write page header to wire
373  *
374  * If this is the 1st block, it also writes the block identification
375  *
376  * Returns: Number of bytes written
377  *
378  * @f: QEMUFile where to send the data
379  * @block: block that contains the page we want to send
380  * @offset: offset inside the block for the page
381  *          in the lower bits, it contains flags
382  */
383 static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
384 {
385     size_t size, len;
386
387     qemu_put_be64(f, offset);
388     size = 8;
389
390     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
391         len = strlen(block->idstr);
392         qemu_put_byte(f, len);
393         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
394         size += 1 + len;
395     }
396     return size;
397 }
398
399 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
400  * The important thing is that a stale (not-yet-0'd) page be replaced
401  * by the new data.
402  * As a bonus, if the page wasn't in the cache it gets added so that
403  * when a small write is made into the 0'd page it gets XBZRLE sent
404  */
405 static void xbzrle_cache_zero_page(ram_addr_t current_addr)
406 {
407     if (ram_bulk_stage || !migrate_use_xbzrle()) {
408         return;
409     }
410
411     /* We don't care if this fails to allocate a new cache page
412      * as long as it updated an old one */
413     cache_insert(XBZRLE.cache, current_addr, ZERO_TARGET_PAGE,
414                  bitmap_sync_count);
415 }
416
417 #define ENCODING_FLAG_XBZRLE 0x1
418
419 /**
420  * save_xbzrle_page: compress and send current page
421  *
422  * Returns: 1 means that we wrote the page
423  *          0 means that page is identical to the one already sent
424  *          -1 means that xbzrle would be longer than normal
425  *
426  * @f: QEMUFile where to send the data
427  * @current_data:
428  * @current_addr:
429  * @block: block that contains the page we want to send
430  * @offset: offset inside the block for the page
431  * @last_stage: if we are at the completion stage
432  * @bytes_transferred: increase it with the number of transferred bytes
433  */
434 static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
435                             ram_addr_t current_addr, RAMBlock *block,
436                             ram_addr_t offset, bool last_stage,
437                             uint64_t *bytes_transferred)
438 {
439     int encoded_len = 0, bytes_xbzrle;
440     uint8_t *prev_cached_page;
441
442     if (!cache_is_cached(XBZRLE.cache, current_addr, bitmap_sync_count)) {
443         acct_info.xbzrle_cache_miss++;
444         if (!last_stage) {
445             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
446                              bitmap_sync_count) == -1) {
447                 return -1;
448             } else {
449                 /* update *current_data when the page has been
450                    inserted into cache */
451                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
452             }
453         }
454         return -1;
455     }
456
457     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
458
459     /* save current buffer into memory */
460     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
461
462     /* XBZRLE encoding (if there is no overflow) */
463     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
464                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
465                                        TARGET_PAGE_SIZE);
466     if (encoded_len == 0) {
467         DPRINTF("Skipping unmodified page\n");
468         return 0;
469     } else if (encoded_len == -1) {
470         DPRINTF("Overflow\n");
471         acct_info.xbzrle_overflows++;
472         /* update data in the cache */
473         if (!last_stage) {
474             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
475             *current_data = prev_cached_page;
476         }
477         return -1;
478     }
479
480     /* we need to update the data in the cache, in order to get the same data */
481     if (!last_stage) {
482         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
483     }
484
485     /* Send XBZRLE based compressed page */
486     bytes_xbzrle = save_page_header(f, block, offset | RAM_SAVE_FLAG_XBZRLE);
487     qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
488     qemu_put_be16(f, encoded_len);
489     qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
490     bytes_xbzrle += encoded_len + 1 + 2;
491     acct_info.xbzrle_pages++;
492     acct_info.xbzrle_bytes += bytes_xbzrle;
493     *bytes_transferred += bytes_xbzrle;
494
495     return 1;
496 }
497
498 /* Called with rcu_read_lock() to protect migration_bitmap */
499 static inline
500 ram_addr_t migration_bitmap_find_and_reset_dirty(MemoryRegion *mr,
501                                                  ram_addr_t start)
502 {
503     unsigned long base = mr->ram_addr >> TARGET_PAGE_BITS;
504     unsigned long nr = base + (start >> TARGET_PAGE_BITS);
505     uint64_t mr_size = TARGET_PAGE_ALIGN(memory_region_size(mr));
506     unsigned long size = base + (mr_size >> TARGET_PAGE_BITS);
507     unsigned long *bitmap;
508
509     unsigned long next;
510
511     bitmap = atomic_rcu_read(&migration_bitmap);
512     if (ram_bulk_stage && nr > base) {
513         next = nr + 1;
514     } else {
515         next = find_next_bit(bitmap, size, nr);
516     }
517
518     if (next < size) {
519         clear_bit(next, bitmap);
520         migration_dirty_pages--;
521     }
522     return (next - base) << TARGET_PAGE_BITS;
523 }
524
525 /* Called with rcu_read_lock() to protect migration_bitmap */
526 static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
527 {
528     unsigned long *bitmap;
529     bitmap = atomic_rcu_read(&migration_bitmap);
530     migration_dirty_pages +=
531         cpu_physical_memory_sync_dirty_bitmap(bitmap, start, length);
532 }
533
534
535 /* Fix me: there are too many global variables used in migration process. */
536 static int64_t start_time;
537 static int64_t bytes_xfer_prev;
538 static int64_t num_dirty_pages_period;
539 static uint64_t xbzrle_cache_miss_prev;
540 static uint64_t iterations_prev;
541
542 static void migration_bitmap_sync_init(void)
543 {
544     start_time = 0;
545     bytes_xfer_prev = 0;
546     num_dirty_pages_period = 0;
547     xbzrle_cache_miss_prev = 0;
548     iterations_prev = 0;
549 }
550
551 /* Called with iothread lock held, to protect ram_list.dirty_memory[] */
552 static void migration_bitmap_sync(void)
553 {
554     RAMBlock *block;
555     uint64_t num_dirty_pages_init = migration_dirty_pages;
556     MigrationState *s = migrate_get_current();
557     int64_t end_time;
558     int64_t bytes_xfer_now;
559
560     bitmap_sync_count++;
561
562     if (!bytes_xfer_prev) {
563         bytes_xfer_prev = ram_bytes_transferred();
564     }
565
566     if (!start_time) {
567         start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
568     }
569
570     trace_migration_bitmap_sync_start();
571     address_space_sync_dirty_bitmap(&address_space_memory);
572
573     qemu_mutex_lock(&migration_bitmap_mutex);
574     rcu_read_lock();
575     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
576         migration_bitmap_sync_range(block->mr->ram_addr, block->used_length);
577     }
578     rcu_read_unlock();
579     qemu_mutex_unlock(&migration_bitmap_mutex);
580
581     trace_migration_bitmap_sync_end(migration_dirty_pages
582                                     - num_dirty_pages_init);
583     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
584     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
585
586     /* more than 1 second = 1000 millisecons */
587     if (end_time > start_time + 1000) {
588         if (migrate_auto_converge()) {
589             /* The following detection logic can be refined later. For now:
590                Check to see if the dirtied bytes is 50% more than the approx.
591                amount of bytes that just got transferred since the last time we
592                were in this routine. If that happens >N times (for now N==4)
593                we turn on the throttle down logic */
594             bytes_xfer_now = ram_bytes_transferred();
595             if (s->dirty_pages_rate &&
596                (num_dirty_pages_period * TARGET_PAGE_SIZE >
597                    (bytes_xfer_now - bytes_xfer_prev)/2) &&
598                (dirty_rate_high_cnt++ > 4)) {
599                     trace_migration_throttle();
600                     mig_throttle_on = true;
601                     dirty_rate_high_cnt = 0;
602              }
603              bytes_xfer_prev = bytes_xfer_now;
604         } else {
605              mig_throttle_on = false;
606         }
607         if (migrate_use_xbzrle()) {
608             if (iterations_prev != acct_info.iterations) {
609                 acct_info.xbzrle_cache_miss_rate =
610                    (double)(acct_info.xbzrle_cache_miss -
611                             xbzrle_cache_miss_prev) /
612                    (acct_info.iterations - iterations_prev);
613             }
614             iterations_prev = acct_info.iterations;
615             xbzrle_cache_miss_prev = acct_info.xbzrle_cache_miss;
616         }
617         s->dirty_pages_rate = num_dirty_pages_period * 1000
618             / (end_time - start_time);
619         s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
620         start_time = end_time;
621         num_dirty_pages_period = 0;
622     }
623     s->dirty_sync_count = bitmap_sync_count;
624 }
625
626 /**
627  * save_zero_page: Send the zero page to the stream
628  *
629  * Returns: Number of pages written.
630  *
631  * @f: QEMUFile where to send the data
632  * @block: block that contains the page we want to send
633  * @offset: offset inside the block for the page
634  * @p: pointer to the page
635  * @bytes_transferred: increase it with the number of transferred bytes
636  */
637 static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
638                           uint8_t *p, uint64_t *bytes_transferred)
639 {
640     int pages = -1;
641
642     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
643         acct_info.dup_pages++;
644         *bytes_transferred += save_page_header(f, block,
645                                                offset | RAM_SAVE_FLAG_COMPRESS);
646         qemu_put_byte(f, 0);
647         *bytes_transferred += 1;
648         pages = 1;
649     }
650
651     return pages;
652 }
653
654 /**
655  * ram_save_page: Send the given page to the stream
656  *
657  * Returns: Number of pages written.
658  *
659  * @f: QEMUFile where to send the data
660  * @block: block that contains the page we want to send
661  * @offset: offset inside the block for the page
662  * @last_stage: if we are at the completion stage
663  * @bytes_transferred: increase it with the number of transferred bytes
664  */
665 static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
666                          bool last_stage, uint64_t *bytes_transferred)
667 {
668     int pages = -1;
669     uint64_t bytes_xmit;
670     ram_addr_t current_addr;
671     MemoryRegion *mr = block->mr;
672     uint8_t *p;
673     int ret;
674     bool send_async = true;
675
676     p = memory_region_get_ram_ptr(mr) + offset;
677
678     /* In doubt sent page as normal */
679     bytes_xmit = 0;
680     ret = ram_control_save_page(f, block->offset,
681                            offset, TARGET_PAGE_SIZE, &bytes_xmit);
682     if (bytes_xmit) {
683         *bytes_transferred += bytes_xmit;
684         pages = 1;
685     }
686
687     XBZRLE_cache_lock();
688
689     current_addr = block->offset + offset;
690
691     if (block == last_sent_block) {
692         offset |= RAM_SAVE_FLAG_CONTINUE;
693     }
694     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
695         if (ret != RAM_SAVE_CONTROL_DELAYED) {
696             if (bytes_xmit > 0) {
697                 acct_info.norm_pages++;
698             } else if (bytes_xmit == 0) {
699                 acct_info.dup_pages++;
700             }
701         }
702     } else {
703         pages = save_zero_page(f, block, offset, p, bytes_transferred);
704         if (pages > 0) {
705             /* Must let xbzrle know, otherwise a previous (now 0'd) cached
706              * page would be stale
707              */
708             xbzrle_cache_zero_page(current_addr);
709         } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
710             pages = save_xbzrle_page(f, &p, current_addr, block,
711                                      offset, last_stage, bytes_transferred);
712             if (!last_stage) {
713                 /* Can't send this cached data async, since the cache page
714                  * might get updated before it gets to the wire
715                  */
716                 send_async = false;
717             }
718         }
719     }
720
721     /* XBZRLE overflow or normal page */
722     if (pages == -1) {
723         *bytes_transferred += save_page_header(f, block,
724                                                offset | RAM_SAVE_FLAG_PAGE);
725         if (send_async) {
726             qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
727         } else {
728             qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
729         }
730         *bytes_transferred += TARGET_PAGE_SIZE;
731         pages = 1;
732         acct_info.norm_pages++;
733     }
734
735     XBZRLE_cache_unlock();
736
737     return pages;
738 }
739
740 static int do_compress_ram_page(CompressParam *param)
741 {
742     int bytes_sent, blen;
743     uint8_t *p;
744     RAMBlock *block = param->block;
745     ram_addr_t offset = param->offset;
746
747     p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
748
749     bytes_sent = save_page_header(param->file, block, offset |
750                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
751     blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
752                                      migrate_compress_level());
753     bytes_sent += blen;
754
755     return bytes_sent;
756 }
757
758 static inline void start_compression(CompressParam *param)
759 {
760     param->done = false;
761     qemu_mutex_lock(&param->mutex);
762     param->start = true;
763     qemu_cond_signal(&param->cond);
764     qemu_mutex_unlock(&param->mutex);
765 }
766
767 static inline void start_decompression(DecompressParam *param)
768 {
769     qemu_mutex_lock(&param->mutex);
770     param->start = true;
771     qemu_cond_signal(&param->cond);
772     qemu_mutex_unlock(&param->mutex);
773 }
774
775 static uint64_t bytes_transferred;
776
777 static void flush_compressed_data(QEMUFile *f)
778 {
779     int idx, len, thread_count;
780
781     if (!migrate_use_compression()) {
782         return;
783     }
784     thread_count = migrate_compress_threads();
785     for (idx = 0; idx < thread_count; idx++) {
786         if (!comp_param[idx].done) {
787             qemu_mutex_lock(comp_done_lock);
788             while (!comp_param[idx].done && !quit_comp_thread) {
789                 qemu_cond_wait(comp_done_cond, comp_done_lock);
790             }
791             qemu_mutex_unlock(comp_done_lock);
792         }
793         if (!quit_comp_thread) {
794             len = qemu_put_qemu_file(f, comp_param[idx].file);
795             bytes_transferred += len;
796         }
797     }
798 }
799
800 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
801                                        ram_addr_t offset)
802 {
803     param->block = block;
804     param->offset = offset;
805 }
806
807 static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
808                                            ram_addr_t offset,
809                                            uint64_t *bytes_transferred)
810 {
811     int idx, thread_count, bytes_xmit = -1, pages = -1;
812
813     thread_count = migrate_compress_threads();
814     qemu_mutex_lock(comp_done_lock);
815     while (true) {
816         for (idx = 0; idx < thread_count; idx++) {
817             if (comp_param[idx].done) {
818                 bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
819                 set_compress_params(&comp_param[idx], block, offset);
820                 start_compression(&comp_param[idx]);
821                 pages = 1;
822                 acct_info.norm_pages++;
823                 *bytes_transferred += bytes_xmit;
824                 break;
825             }
826         }
827         if (pages > 0) {
828             break;
829         } else {
830             qemu_cond_wait(comp_done_cond, comp_done_lock);
831         }
832     }
833     qemu_mutex_unlock(comp_done_lock);
834
835     return pages;
836 }
837
838 /**
839  * ram_save_compressed_page: compress the given page and send it to the stream
840  *
841  * Returns: Number of pages written.
842  *
843  * @f: QEMUFile where to send the data
844  * @block: block that contains the page we want to send
845  * @offset: offset inside the block for the page
846  * @last_stage: if we are at the completion stage
847  * @bytes_transferred: increase it with the number of transferred bytes
848  */
849 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
850                                     ram_addr_t offset, bool last_stage,
851                                     uint64_t *bytes_transferred)
852 {
853     int pages = -1;
854     uint64_t bytes_xmit;
855     MemoryRegion *mr = block->mr;
856     uint8_t *p;
857     int ret;
858
859     p = memory_region_get_ram_ptr(mr) + offset;
860
861     bytes_xmit = 0;
862     ret = ram_control_save_page(f, block->offset,
863                                 offset, TARGET_PAGE_SIZE, &bytes_xmit);
864     if (bytes_xmit) {
865         *bytes_transferred += bytes_xmit;
866         pages = 1;
867     }
868     if (block == last_sent_block) {
869         offset |= RAM_SAVE_FLAG_CONTINUE;
870     }
871     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
872         if (ret != RAM_SAVE_CONTROL_DELAYED) {
873             if (bytes_xmit > 0) {
874                 acct_info.norm_pages++;
875             } else if (bytes_xmit == 0) {
876                 acct_info.dup_pages++;
877             }
878         }
879     } else {
880         /* When starting the process of a new block, the first page of
881          * the block should be sent out before other pages in the same
882          * block, and all the pages in last block should have been sent
883          * out, keeping this order is important, because the 'cont' flag
884          * is used to avoid resending the block name.
885          */
886         if (block != last_sent_block) {
887             flush_compressed_data(f);
888             pages = save_zero_page(f, block, offset, p, bytes_transferred);
889             if (pages == -1) {
890                 set_compress_params(&comp_param[0], block, offset);
891                 /* Use the qemu thread to compress the data to make sure the
892                  * first page is sent out before other pages
893                  */
894                 bytes_xmit = do_compress_ram_page(&comp_param[0]);
895                 acct_info.norm_pages++;
896                 qemu_put_qemu_file(f, comp_param[0].file);
897                 *bytes_transferred += bytes_xmit;
898                 pages = 1;
899             }
900         } else {
901             pages = save_zero_page(f, block, offset, p, bytes_transferred);
902             if (pages == -1) {
903                 pages = compress_page_with_multi_thread(f, block, offset,
904                                                         bytes_transferred);
905             }
906         }
907     }
908
909     return pages;
910 }
911
912 /**
913  * ram_find_and_save_block: Finds a dirty page and sends it to f
914  *
915  * Called within an RCU critical section.
916  *
917  * Returns:  The number of pages written
918  *           0 means no dirty pages
919  *
920  * @f: QEMUFile where to send the data
921  * @last_stage: if we are at the completion stage
922  * @bytes_transferred: increase it with the number of transferred bytes
923  */
924
925 static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
926                                    uint64_t *bytes_transferred)
927 {
928     RAMBlock *block = last_seen_block;
929     ram_addr_t offset = last_offset;
930     bool complete_round = false;
931     int pages = 0;
932     MemoryRegion *mr;
933
934     if (!block)
935         block = QLIST_FIRST_RCU(&ram_list.blocks);
936
937     while (true) {
938         mr = block->mr;
939         offset = migration_bitmap_find_and_reset_dirty(mr, offset);
940         if (complete_round && block == last_seen_block &&
941             offset >= last_offset) {
942             break;
943         }
944         if (offset >= block->used_length) {
945             offset = 0;
946             block = QLIST_NEXT_RCU(block, next);
947             if (!block) {
948                 block = QLIST_FIRST_RCU(&ram_list.blocks);
949                 complete_round = true;
950                 ram_bulk_stage = false;
951                 if (migrate_use_xbzrle()) {
952                     /* If xbzrle is on, stop using the data compression at this
953                      * point. In theory, xbzrle can do better than compression.
954                      */
955                     flush_compressed_data(f);
956                     compression_switch = false;
957                 }
958             }
959         } else {
960             if (compression_switch && migrate_use_compression()) {
961                 pages = ram_save_compressed_page(f, block, offset, last_stage,
962                                                  bytes_transferred);
963             } else {
964                 pages = ram_save_page(f, block, offset, last_stage,
965                                       bytes_transferred);
966             }
967
968             /* if page is unmodified, continue to the next */
969             if (pages > 0) {
970                 last_sent_block = block;
971                 break;
972             }
973         }
974     }
975
976     last_seen_block = block;
977     last_offset = offset;
978
979     return pages;
980 }
981
982 void acct_update_position(QEMUFile *f, size_t size, bool zero)
983 {
984     uint64_t pages = size / TARGET_PAGE_SIZE;
985     if (zero) {
986         acct_info.dup_pages += pages;
987     } else {
988         acct_info.norm_pages += pages;
989         bytes_transferred += size;
990         qemu_update_position(f, size);
991     }
992 }
993
994 static ram_addr_t ram_save_remaining(void)
995 {
996     return migration_dirty_pages;
997 }
998
999 uint64_t ram_bytes_remaining(void)
1000 {
1001     return ram_save_remaining() * TARGET_PAGE_SIZE;
1002 }
1003
1004 uint64_t ram_bytes_transferred(void)
1005 {
1006     return bytes_transferred;
1007 }
1008
1009 uint64_t ram_bytes_total(void)
1010 {
1011     RAMBlock *block;
1012     uint64_t total = 0;
1013
1014     rcu_read_lock();
1015     QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
1016         total += block->used_length;
1017     rcu_read_unlock();
1018     return total;
1019 }
1020
1021 void free_xbzrle_decoded_buf(void)
1022 {
1023     g_free(xbzrle_decoded_buf);
1024     xbzrle_decoded_buf = NULL;
1025 }
1026
1027 static void migration_end(void)
1028 {
1029     /* caller have hold iothread lock or is in a bh, so there is
1030      * no writing race against this migration_bitmap
1031      */
1032     unsigned long *bitmap = migration_bitmap;
1033     atomic_rcu_set(&migration_bitmap, NULL);
1034     if (bitmap) {
1035         memory_global_dirty_log_stop();
1036         synchronize_rcu();
1037         g_free(bitmap);
1038     }
1039
1040     XBZRLE_cache_lock();
1041     if (XBZRLE.cache) {
1042         cache_fini(XBZRLE.cache);
1043         g_free(XBZRLE.encoded_buf);
1044         g_free(XBZRLE.current_buf);
1045         XBZRLE.cache = NULL;
1046         XBZRLE.encoded_buf = NULL;
1047         XBZRLE.current_buf = NULL;
1048     }
1049     XBZRLE_cache_unlock();
1050 }
1051
1052 static void ram_migration_cancel(void *opaque)
1053 {
1054     migration_end();
1055 }
1056
1057 static void reset_ram_globals(void)
1058 {
1059     last_seen_block = NULL;
1060     last_sent_block = NULL;
1061     last_offset = 0;
1062     last_version = ram_list.version;
1063     ram_bulk_stage = true;
1064 }
1065
1066 #define MAX_WAIT 50 /* ms, half buffered_file limit */
1067
1068 void migration_bitmap_extend(ram_addr_t old, ram_addr_t new)
1069 {
1070     /* called in qemu main thread, so there is
1071      * no writing race against this migration_bitmap
1072      */
1073     if (migration_bitmap) {
1074         unsigned long *old_bitmap = migration_bitmap, *bitmap;
1075         bitmap = bitmap_new(new);
1076
1077         /* prevent migration_bitmap content from being set bit
1078          * by migration_bitmap_sync_range() at the same time.
1079          * it is safe to migration if migration_bitmap is cleared bit
1080          * at the same time.
1081          */
1082         qemu_mutex_lock(&migration_bitmap_mutex);
1083         bitmap_copy(bitmap, old_bitmap, old);
1084         bitmap_set(bitmap, old, new - old);
1085         atomic_rcu_set(&migration_bitmap, bitmap);
1086         qemu_mutex_unlock(&migration_bitmap_mutex);
1087         migration_dirty_pages += new - old;
1088         synchronize_rcu();
1089         g_free(old_bitmap);
1090     }
1091 }
1092
1093 /* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
1094  * long-running RCU critical section.  When rcu-reclaims in the code
1095  * start to become numerous it will be necessary to reduce the
1096  * granularity of these critical sections.
1097  */
1098
1099 static int ram_save_setup(QEMUFile *f, void *opaque)
1100 {
1101     RAMBlock *block;
1102     int64_t ram_bitmap_pages; /* Size of bitmap in pages, including gaps */
1103
1104     mig_throttle_on = false;
1105     dirty_rate_high_cnt = 0;
1106     bitmap_sync_count = 0;
1107     migration_bitmap_sync_init();
1108     qemu_mutex_init(&migration_bitmap_mutex);
1109
1110     if (migrate_use_xbzrle()) {
1111         XBZRLE_cache_lock();
1112         XBZRLE.cache = cache_init(migrate_xbzrle_cache_size() /
1113                                   TARGET_PAGE_SIZE,
1114                                   TARGET_PAGE_SIZE);
1115         if (!XBZRLE.cache) {
1116             XBZRLE_cache_unlock();
1117             error_report("Error creating cache");
1118             return -1;
1119         }
1120         XBZRLE_cache_unlock();
1121
1122         /* We prefer not to abort if there is no memory */
1123         XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
1124         if (!XBZRLE.encoded_buf) {
1125             error_report("Error allocating encoded_buf");
1126             return -1;
1127         }
1128
1129         XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
1130         if (!XBZRLE.current_buf) {
1131             error_report("Error allocating current_buf");
1132             g_free(XBZRLE.encoded_buf);
1133             XBZRLE.encoded_buf = NULL;
1134             return -1;
1135         }
1136
1137         acct_clear();
1138     }
1139
1140     /* iothread lock needed for ram_list.dirty_memory[] */
1141     qemu_mutex_lock_iothread();
1142     qemu_mutex_lock_ramlist();
1143     rcu_read_lock();
1144     bytes_transferred = 0;
1145     reset_ram_globals();
1146
1147     ram_bitmap_pages = last_ram_offset() >> TARGET_PAGE_BITS;
1148     migration_bitmap = bitmap_new(ram_bitmap_pages);
1149     bitmap_set(migration_bitmap, 0, ram_bitmap_pages);
1150
1151     /*
1152      * Count the total number of pages used by ram blocks not including any
1153      * gaps due to alignment or unplugs.
1154      */
1155     migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
1156
1157     memory_global_dirty_log_start();
1158     migration_bitmap_sync();
1159     qemu_mutex_unlock_ramlist();
1160     qemu_mutex_unlock_iothread();
1161
1162     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
1163
1164     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1165         qemu_put_byte(f, strlen(block->idstr));
1166         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
1167         qemu_put_be64(f, block->used_length);
1168     }
1169
1170     rcu_read_unlock();
1171
1172     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
1173     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
1174
1175     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1176
1177     return 0;
1178 }
1179
1180 static int ram_save_iterate(QEMUFile *f, void *opaque)
1181 {
1182     int ret;
1183     int i;
1184     int64_t t0;
1185     int pages_sent = 0;
1186
1187     rcu_read_lock();
1188     if (ram_list.version != last_version) {
1189         reset_ram_globals();
1190     }
1191
1192     /* Read version before ram_list.blocks */
1193     smp_rmb();
1194
1195     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
1196
1197     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1198     i = 0;
1199     while ((ret = qemu_file_rate_limit(f)) == 0) {
1200         int pages;
1201
1202         pages = ram_find_and_save_block(f, false, &bytes_transferred);
1203         /* no more pages to sent */
1204         if (pages == 0) {
1205             break;
1206         }
1207         pages_sent += pages;
1208         acct_info.iterations++;
1209         check_guest_throttling();
1210         /* we want to check in the 1st loop, just in case it was the 1st time
1211            and we had to sync the dirty bitmap.
1212            qemu_get_clock_ns() is a bit expensive, so we only check each some
1213            iterations
1214         */
1215         if ((i & 63) == 0) {
1216             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
1217             if (t1 > MAX_WAIT) {
1218                 DPRINTF("big wait: %" PRIu64 " milliseconds, %d iterations\n",
1219                         t1, i);
1220                 break;
1221             }
1222         }
1223         i++;
1224     }
1225     flush_compressed_data(f);
1226     rcu_read_unlock();
1227
1228     /*
1229      * Must occur before EOS (or any QEMUFile operation)
1230      * because of RDMA protocol.
1231      */
1232     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
1233
1234     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1235     bytes_transferred += 8;
1236
1237     ret = qemu_file_get_error(f);
1238     if (ret < 0) {
1239         return ret;
1240     }
1241
1242     return pages_sent;
1243 }
1244
1245 /* Called with iothread lock */
1246 static int ram_save_complete(QEMUFile *f, void *opaque)
1247 {
1248     rcu_read_lock();
1249
1250     migration_bitmap_sync();
1251
1252     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
1253
1254     /* try transferring iterative blocks of memory */
1255
1256     /* flush all remaining blocks regardless of rate limiting */
1257     while (true) {
1258         int pages;
1259
1260         pages = ram_find_and_save_block(f, true, &bytes_transferred);
1261         /* no more blocks to sent */
1262         if (pages == 0) {
1263             break;
1264         }
1265     }
1266
1267     flush_compressed_data(f);
1268     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
1269
1270     rcu_read_unlock();
1271
1272     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1273
1274     return 0;
1275 }
1276
1277 static uint64_t ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
1278 {
1279     uint64_t remaining_size;
1280
1281     remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1282
1283     if (remaining_size < max_size) {
1284         qemu_mutex_lock_iothread();
1285         rcu_read_lock();
1286         migration_bitmap_sync();
1287         rcu_read_unlock();
1288         qemu_mutex_unlock_iothread();
1289         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1290     }
1291     return remaining_size;
1292 }
1293
1294 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
1295 {
1296     unsigned int xh_len;
1297     int xh_flags;
1298
1299     if (!xbzrle_decoded_buf) {
1300         xbzrle_decoded_buf = g_malloc(TARGET_PAGE_SIZE);
1301     }
1302
1303     /* extract RLE header */
1304     xh_flags = qemu_get_byte(f);
1305     xh_len = qemu_get_be16(f);
1306
1307     if (xh_flags != ENCODING_FLAG_XBZRLE) {
1308         error_report("Failed to load XBZRLE page - wrong compression!");
1309         return -1;
1310     }
1311
1312     if (xh_len > TARGET_PAGE_SIZE) {
1313         error_report("Failed to load XBZRLE page - len overflow!");
1314         return -1;
1315     }
1316     /* load data and decode */
1317     qemu_get_buffer(f, xbzrle_decoded_buf, xh_len);
1318
1319     /* decode RLE */
1320     if (xbzrle_decode_buffer(xbzrle_decoded_buf, xh_len, host,
1321                              TARGET_PAGE_SIZE) == -1) {
1322         error_report("Failed to load XBZRLE page - decode error!");
1323         return -1;
1324     }
1325
1326     return 0;
1327 }
1328
1329 /* Must be called from within a rcu critical section.
1330  * Returns a pointer from within the RCU-protected ram_list.
1331  */
1332 static inline void *host_from_stream_offset(QEMUFile *f,
1333                                             ram_addr_t offset,
1334                                             int flags)
1335 {
1336     static RAMBlock *block = NULL;
1337     char id[256];
1338     uint8_t len;
1339
1340     if (flags & RAM_SAVE_FLAG_CONTINUE) {
1341         if (!block || block->max_length <= offset) {
1342             error_report("Ack, bad migration stream!");
1343             return NULL;
1344         }
1345
1346         return memory_region_get_ram_ptr(block->mr) + offset;
1347     }
1348
1349     len = qemu_get_byte(f);
1350     qemu_get_buffer(f, (uint8_t *)id, len);
1351     id[len] = 0;
1352
1353     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1354         if (!strncmp(id, block->idstr, sizeof(id)) &&
1355             block->max_length > offset) {
1356             return memory_region_get_ram_ptr(block->mr) + offset;
1357         }
1358     }
1359
1360     error_report("Can't find block %s!", id);
1361     return NULL;
1362 }
1363
1364 /*
1365  * If a page (or a whole RDMA chunk) has been
1366  * determined to be zero, then zap it.
1367  */
1368 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
1369 {
1370     if (ch != 0 || !is_zero_range(host, size)) {
1371         memset(host, ch, size);
1372     }
1373 }
1374
1375 static void *do_data_decompress(void *opaque)
1376 {
1377     DecompressParam *param = opaque;
1378     unsigned long pagesize;
1379
1380     while (!quit_decomp_thread) {
1381         qemu_mutex_lock(&param->mutex);
1382         while (!param->start && !quit_decomp_thread) {
1383             qemu_cond_wait(&param->cond, &param->mutex);
1384             pagesize = TARGET_PAGE_SIZE;
1385             if (!quit_decomp_thread) {
1386                 /* uncompress() will return failed in some case, especially
1387                  * when the page is dirted when doing the compression, it's
1388                  * not a problem because the dirty page will be retransferred
1389                  * and uncompress() won't break the data in other pages.
1390                  */
1391                 uncompress((Bytef *)param->des, &pagesize,
1392                            (const Bytef *)param->compbuf, param->len);
1393             }
1394             param->start = false;
1395         }
1396         qemu_mutex_unlock(&param->mutex);
1397     }
1398
1399     return NULL;
1400 }
1401
1402 void migrate_decompress_threads_create(void)
1403 {
1404     int i, thread_count;
1405
1406     thread_count = migrate_decompress_threads();
1407     decompress_threads = g_new0(QemuThread, thread_count);
1408     decomp_param = g_new0(DecompressParam, thread_count);
1409     compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1410     quit_decomp_thread = false;
1411     for (i = 0; i < thread_count; i++) {
1412         qemu_mutex_init(&decomp_param[i].mutex);
1413         qemu_cond_init(&decomp_param[i].cond);
1414         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1415         qemu_thread_create(decompress_threads + i, "decompress",
1416                            do_data_decompress, decomp_param + i,
1417                            QEMU_THREAD_JOINABLE);
1418     }
1419 }
1420
1421 void migrate_decompress_threads_join(void)
1422 {
1423     int i, thread_count;
1424
1425     quit_decomp_thread = true;
1426     thread_count = migrate_decompress_threads();
1427     for (i = 0; i < thread_count; i++) {
1428         qemu_mutex_lock(&decomp_param[i].mutex);
1429         qemu_cond_signal(&decomp_param[i].cond);
1430         qemu_mutex_unlock(&decomp_param[i].mutex);
1431     }
1432     for (i = 0; i < thread_count; i++) {
1433         qemu_thread_join(decompress_threads + i);
1434         qemu_mutex_destroy(&decomp_param[i].mutex);
1435         qemu_cond_destroy(&decomp_param[i].cond);
1436         g_free(decomp_param[i].compbuf);
1437     }
1438     g_free(decompress_threads);
1439     g_free(decomp_param);
1440     g_free(compressed_data_buf);
1441     decompress_threads = NULL;
1442     decomp_param = NULL;
1443     compressed_data_buf = NULL;
1444 }
1445
1446 static void decompress_data_with_multi_threads(uint8_t *compbuf,
1447                                                void *host, int len)
1448 {
1449     int idx, thread_count;
1450
1451     thread_count = migrate_decompress_threads();
1452     while (true) {
1453         for (idx = 0; idx < thread_count; idx++) {
1454             if (!decomp_param[idx].start) {
1455                 memcpy(decomp_param[idx].compbuf, compbuf, len);
1456                 decomp_param[idx].des = host;
1457                 decomp_param[idx].len = len;
1458                 start_decompression(&decomp_param[idx]);
1459                 break;
1460             }
1461         }
1462         if (idx < thread_count) {
1463             break;
1464         }
1465     }
1466 }
1467
1468 static int ram_load(QEMUFile *f, void *opaque, int version_id)
1469 {
1470     int flags = 0, ret = 0;
1471     static uint64_t seq_iter;
1472     int len = 0;
1473
1474     seq_iter++;
1475
1476     if (version_id != 4) {
1477         ret = -EINVAL;
1478     }
1479
1480     /* This RCU critical section can be very long running.
1481      * When RCU reclaims in the code start to become numerous,
1482      * it will be necessary to reduce the granularity of this
1483      * critical section.
1484      */
1485     rcu_read_lock();
1486     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
1487         ram_addr_t addr, total_ram_bytes;
1488         void *host;
1489         uint8_t ch;
1490
1491         addr = qemu_get_be64(f);
1492         flags = addr & ~TARGET_PAGE_MASK;
1493         addr &= TARGET_PAGE_MASK;
1494
1495         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
1496         case RAM_SAVE_FLAG_MEM_SIZE:
1497             /* Synchronize RAM block list */
1498             total_ram_bytes = addr;
1499             while (!ret && total_ram_bytes) {
1500                 RAMBlock *block;
1501                 char id[256];
1502                 ram_addr_t length;
1503
1504                 len = qemu_get_byte(f);
1505                 qemu_get_buffer(f, (uint8_t *)id, len);
1506                 id[len] = 0;
1507                 length = qemu_get_be64(f);
1508
1509                 QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1510                     if (!strncmp(id, block->idstr, sizeof(id))) {
1511                         if (length != block->used_length) {
1512                             Error *local_err = NULL;
1513
1514                             ret = qemu_ram_resize(block->offset, length, &local_err);
1515                             if (local_err) {
1516                                 error_report_err(local_err);
1517                             }
1518                         }
1519                         ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
1520                                               block->idstr);
1521                         break;
1522                     }
1523                 }
1524
1525                 if (!block) {
1526                     error_report("Unknown ramblock \"%s\", cannot "
1527                                  "accept migration", id);
1528                     ret = -EINVAL;
1529                 }
1530
1531                 total_ram_bytes -= length;
1532             }
1533             break;
1534         case RAM_SAVE_FLAG_COMPRESS:
1535             host = host_from_stream_offset(f, addr, flags);
1536             if (!host) {
1537                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1538                 ret = -EINVAL;
1539                 break;
1540             }
1541             ch = qemu_get_byte(f);
1542             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
1543             break;
1544         case RAM_SAVE_FLAG_PAGE:
1545             host = host_from_stream_offset(f, addr, flags);
1546             if (!host) {
1547                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1548                 ret = -EINVAL;
1549                 break;
1550             }
1551             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
1552             break;
1553         case RAM_SAVE_FLAG_COMPRESS_PAGE:
1554             host = host_from_stream_offset(f, addr, flags);
1555             if (!host) {
1556                 error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
1557                 ret = -EINVAL;
1558                 break;
1559             }
1560
1561             len = qemu_get_be32(f);
1562             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
1563                 error_report("Invalid compressed data length: %d", len);
1564                 ret = -EINVAL;
1565                 break;
1566             }
1567             qemu_get_buffer(f, compressed_data_buf, len);
1568             decompress_data_with_multi_threads(compressed_data_buf, host, len);
1569             break;
1570         case RAM_SAVE_FLAG_XBZRLE:
1571             host = host_from_stream_offset(f, addr, flags);
1572             if (!host) {
1573                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1574                 ret = -EINVAL;
1575                 break;
1576             }
1577             if (load_xbzrle(f, addr, host) < 0) {
1578                 error_report("Failed to decompress XBZRLE page at "
1579                              RAM_ADDR_FMT, addr);
1580                 ret = -EINVAL;
1581                 break;
1582             }
1583             break;
1584         case RAM_SAVE_FLAG_EOS:
1585             /* normal exit */
1586             break;
1587         default:
1588             if (flags & RAM_SAVE_FLAG_HOOK) {
1589                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
1590             } else {
1591                 error_report("Unknown combination of migration flags: %#x",
1592                              flags);
1593                 ret = -EINVAL;
1594             }
1595         }
1596         if (!ret) {
1597             ret = qemu_file_get_error(f);
1598         }
1599     }
1600
1601     rcu_read_unlock();
1602     DPRINTF("Completed load of VM with exit code %d seq iteration "
1603             "%" PRIu64 "\n", ret, seq_iter);
1604     return ret;
1605 }
1606
1607 static SaveVMHandlers savevm_ram_handlers = {
1608     .save_live_setup = ram_save_setup,
1609     .save_live_iterate = ram_save_iterate,
1610     .save_live_complete = ram_save_complete,
1611     .save_live_pending = ram_save_pending,
1612     .load_state = ram_load,
1613     .cancel = ram_migration_cancel,
1614 };
1615
1616 void ram_mig_init(void)
1617 {
1618     qemu_mutex_init(&XBZRLE.lock);
1619     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
1620 }
1621 /* Stub function that's gets run on the vcpu when its brought out of the
1622    VM to run inside qemu via async_run_on_cpu()*/
1623
1624 static void mig_sleep_cpu(void *opq)
1625 {
1626     qemu_mutex_unlock_iothread();
1627     g_usleep(30*1000);
1628     qemu_mutex_lock_iothread();
1629 }
1630
1631 /* To reduce the dirty rate explicitly disallow the VCPUs from spending
1632    much time in the VM. The migration thread will try to catchup.
1633    Workload will experience a performance drop.
1634 */
1635 static void mig_throttle_guest_down(void)
1636 {
1637     CPUState *cpu;
1638
1639     qemu_mutex_lock_iothread();
1640     CPU_FOREACH(cpu) {
1641         async_run_on_cpu(cpu, mig_sleep_cpu, NULL);
1642     }
1643     qemu_mutex_unlock_iothread();
1644 }
1645
1646 static void check_guest_throttling(void)
1647 {
1648     static int64_t t0;
1649     int64_t        t1;
1650
1651     if (!mig_throttle_on) {
1652         return;
1653     }
1654
1655     if (!t0)  {
1656         t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1657         return;
1658     }
1659
1660     t1 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1661
1662     /* If it has been more than 40 ms since the last time the guest
1663      * was throttled then do it again.
1664      */
1665     if (40 < (t1-t0)/1000000) {
1666         mig_throttle_guest_down();
1667         t0 = t1;
1668     }
1669 }