LoadBalancer.h 17.8 KB
Newer Older
1
2
3
4
#pragma once

#include "BlockInfo.h"
#include "GridMPI.h"
5
#include <algorithm>
6
#include <cstring>
7
#include <omp.h>
8
9
10
11
#include <string>

namespace cubism
{
12

13
template <typename TGrid>
14
15
16
class LoadBalancer
{

17
18
19
20
 public:
   typedef typename TGrid::Block BlockType;
   typedef typename TGrid::Block::ElementType ElementType;
   bool movedBlocks;
21
   int counterg{0};
22
23
24
25
   double beta;
   int flux_left_old;
   int flux_right_old;

26
27
   MPI_Comm comm;

28
29
30
31
32
33
 protected:
   TGrid *m_refGrid;
   int rank, size;
   MPI_Datatype MPI_BLOCK;
   struct MPI_Block
   {
34
      long long mn[2];
35
36
37
38
39
40
41
      Real data[sizeof(BlockType) / sizeof(Real)];
      MPI_Block(BlockInfo &info, bool Fillptr = true)
      {
         mn[0] = info.level;
         mn[1] = info.Z;
         if (Fillptr)
         {
42
            Real *aux = &((BlockType *)info.ptrBlock)->data[0][0][0].member(0);
43
44
45
46
47
48
49
50
51
52
            std::memcpy(&data[0], aux, sizeof(BlockType));
         }
      }

      void prepare(BlockInfo &info, bool Fillptr = true)
      {
         mn[0] = info.level;
         mn[1] = info.Z;
         if (Fillptr)
         {
53
            Real *aux = &((BlockType *)info.ptrBlock)->data[0][0][0].member(0);
54
55
56
57
58
59
60
            std::memcpy(&data[0], aux, sizeof(BlockType));
         }
      }

      MPI_Block() {}
   };

61
   void AddBlock(const int level, const long long Z, Real * data)
62
63
64
65
66
67
68
69
   {
      m_refGrid->_alloc(level, Z);
      BlockInfo &info = m_refGrid->getBlockInfoAll(level, Z);
      m_refGrid->Tree(info).setrank(m_refGrid->rank());
      BlockType *b1 = (BlockType *)info.ptrBlock;
      assert(b1 != NULL);
      Real *a1 = &b1->data[0][0][0].member(0);
      std::memcpy(a1, data, sizeof(BlockType));
70
71
72
      #if DIMENSION == 3
         int p[3];
         BlockInfo::inverse(Z, level, p[0], p[1], p[2]);
73
         if (level < m_refGrid->getlevelMax() - 1)
74
75
76
77
78
79
80
81
            for (int k1 = 0; k1 < 2; k1++)
            for (int j1 = 0; j1 < 2; j1++)
            for (int i1 = 0; i1 < 2; i1++)
            {
               const long long nc = m_refGrid->getZforward(level + 1, 2 * p[0] + i1, 2 * p[1] + j1, 2 * p[2] + k1);
               m_refGrid->Tree(level + 1, nc).setCheckCoarser();
            }
         if (level > 0)
82
         {
83
84
            const long long nf = m_refGrid->getZforward(level - 1, p[0] / 2, p[1] / 2, p[2] / 2);
            m_refGrid->Tree(level - 1, nf).setCheckFiner();
85
         }
86
87
88
      #else
         int p[2];
         BlockInfo::inverse(Z, level, p[0], p[1]);
89
         if (level < m_refGrid->getlevelMax() - 1)
90
91
92
93
94
95
96
            for (int j1 = 0; j1 < 2; j1++)
            for (int i1 = 0; i1 < 2; i1++)
            {
               const long long nc = m_refGrid->getZforward(level + 1, 2 * p[0] + i1, 2 * p[1] + j1);
               m_refGrid->Tree(level + 1, nc).setCheckCoarser();
            }
         if (level > 0)
97
         {
98
99
            const long long nf = m_refGrid->getZforward(level - 1, p[0] / 2, p[1] / 2);
            m_refGrid->Tree(level - 1, nf).setCheckFiner();
100
         }
101
      #endif
102
103
   }

104
105
106
 public:
   LoadBalancer(TGrid &grid)
   {
107
108
109
      comm = grid.getWorldComm();
      MPI_Comm_size(comm, &size);
      MPI_Comm_rank(comm, &rank);
110
111
112
113
      m_refGrid   = &grid;
      movedBlocks = false;
      MPI_Block dummy;
      int array_of_blocklengths[2]       = {2, sizeof(BlockType) / sizeof(Real)};
114
115
      MPI_Aint array_of_displacements[2] = {0, 2 * sizeof(long long)};
      MPI_Datatype array_of_types[2]     = {MPI_LONG_LONG, MPI_DOUBLE};
116
      MPI_Type_create_struct(2, array_of_blocklengths, array_of_displacements, array_of_types, &MPI_BLOCK);
117
118
119
120
121
122
123
124
125
      MPI_Type_commit(&MPI_BLOCK);
      flux_left_old  = 0;
      flux_right_old = 0;
   }

   ~LoadBalancer() { MPI_Type_free(&MPI_BLOCK); }

   void PrepareCompression()
   {
126
      m_refGrid->FillPos();
127
128
129
130
131
132
133
134
135
136
      std::vector<BlockInfo> &I = m_refGrid->getBlocksInfo();
      std::vector<std::vector<MPI_Block>> send_blocks(size);
      std::vector<std::vector<MPI_Block>> recv_blocks(size);

      for (auto &b : I)
      {
         assert(b.level >= 0);
         assert(b.index[0] >= 0);
         assert(b.index[1] >= 0);
         assert(b.index[2] >= 0);
137
138
139
140
141
         #if DIMENSION == 3
            const long long nBlock = m_refGrid->getZforward(b.level, 2 * (b.index[0] / 2), 2 * (b.index[1] / 2), 2 * (b.index[2] / 2));
         #else
            const long long nBlock = m_refGrid->getZforward(b.level, 2 * (b.index[0] / 2), 2 * (b.index[1] / 2));
         #endif
142
143
144
145
146
147
         assert(nBlock >= 0);
         assert(b.Z >= 0);

         BlockInfo &base  = m_refGrid->getBlockInfoAll(b.level, nBlock);
         BlockInfo &bCopy = m_refGrid->getBlockInfoAll(b.level, b.Z);

148
         const int baserank = m_refGrid->Tree(b.level, nBlock).rank();
149
         const int brank    = m_refGrid->Tree(b.level, b.Z).rank();
150

151
         if (!m_refGrid->Tree(base).Exists()) continue;
152
153
154

         if (b.Z != nBlock && base.state == Compress)
         {
155
            if (baserank != rank && brank == rank)
156
            {
157
158
               send_blocks[baserank].push_back({bCopy});
               m_refGrid->Tree(b.level, b.Z).setrank(baserank);
159
            }
160
161
162
         }
         else if (b.Z == nBlock && base.state == Compress)
         {
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
            #if DIMENSION ==3
               for (int k = 0; k < 2; k++)
                  for (int j = 0; j < 2; j++)
                     for (int i = 0; i < 2; i++)
                     {
                        const long long n = m_refGrid->getZforward(b.level, b.index[0] + i, b.index[1] + j, b.index[2] + k);
                        if (n == nBlock) continue;
                        BlockInfo &temp    = m_refGrid->getBlockInfoAll(b.level, n);
                        const int temprank = m_refGrid->Tree(b.level, n).rank();
                        if (temprank != rank)
                        {
                           recv_blocks[temprank].push_back({temp, false});
                           m_refGrid->Tree(b.level, n).setrank(baserank);
                        }
                     }
            #else
179
180
181
               for (int j = 0; j < 2; j++)
                  for (int i = 0; i < 2; i++)
                  {
182
                     const long long n = m_refGrid->getZforward(b.level, b.index[0] + i, b.index[1] + j);
183
184
185
186
187
188
189
190
191
                     if (n == nBlock) continue;
                     BlockInfo &temp    = m_refGrid->getBlockInfoAll(b.level, n);
                     const int temprank = m_refGrid->Tree(b.level, n).rank();
                     if (temprank != rank)
                     {
                        recv_blocks[temprank].push_back({temp, false});
                        m_refGrid->Tree(b.level, n).setrank(baserank);
                     }
                  }
192
            #endif
193
194
         }
      }
195

196
      std::vector<MPI_Request> requests;
197

198
199
200
201
      for (int r = 0; r < size; r++)
         if (r != rank)
         {
            if (recv_blocks[r].size() != 0)
202
            {
203
204
               MPI_Request req;
               requests.push_back(req);
205
               MPI_Irecv(&recv_blocks[r][0], recv_blocks[r].size(), MPI_BLOCK, r, 123450, comm, &requests.back());
michaich's avatar
michaich committed
206
            }
207
            if (send_blocks[r].size() != 0)
208
            {
209
210
               MPI_Request req;
               requests.push_back(req);
211
               MPI_Isend(&send_blocks[r][0], send_blocks[r].size(), MPI_BLOCK, r, 123450, comm, &requests.back());
212
            }
213
214
215
216
217
218
219
220
221
222
223
224
         }

      if (requests.size() != 0)
      {
         movedBlocks = true;
         MPI_Waitall(requests.size(), &requests[0], MPI_STATUSES_IGNORE);
      }

      for (int r = 0; r < size; r++)
         for (int i = 0; i < (int)send_blocks[r].size(); i++)
         {
            m_refGrid->_dealloc(send_blocks[r][i].mn[0], send_blocks[r][i].mn[1]);
225
            m_refGrid->Tree(send_blocks[r][i].mn[0], send_blocks[r][i].mn[1]).setCheckCoarser();
226
227
228
229
230
         }

      for (int r = 0; r < size; r++)
         for (int i = 0; i < (int)recv_blocks[r].size(); i++)
         {
231
232
            const int level = (int) recv_blocks[r][i].mn[0];
            const long long Z = recv_blocks[r][i].mn[1];
233
234
235
236
237

            m_refGrid->_alloc(level, Z);
            BlockInfo info = m_refGrid->getBlockInfoAll(level, Z);
            BlockType *b1  = (BlockType *)info.ptrBlock;
            assert(b1 != NULL);
238
            Real *a1 = &b1->data[0][0][0].member(0);
239
            std::memcpy(a1, recv_blocks[r][i].data, sizeof(BlockType));
240
241
242
243
244
         }
   }

   void Balance_Diffusion()
   {
245
      {
246
247
248
249
         long long b = m_refGrid->getBlocksInfo().size();
         long long max_b, min_b;
         MPI_Allreduce(&b, &max_b, 1, MPI_LONG_LONG, MPI_MAX, comm);
         MPI_Allreduce(&b, &min_b, 1, MPI_LONG_LONG, MPI_MIN, comm);
Michail Chatzimanolakis's avatar
Michail Chatzimanolakis committed
250
         const double ratio = static_cast<double>(max_b) / min_b;
251
         if (rank == 0) std::cout << "Load imbalance ratio = " << ratio << std::endl;
252
         if (ratio > 1.1 || min_b == 0)
253
         {
254
255
            Balance_Global();
            return;
256
         }
257
258
      }

259
260
      const int right = (rank == size - 1) ? MPI_PROC_NULL : rank + 1;
      const int left  = (rank == 0) ? MPI_PROC_NULL : rank - 1;
261

262
      const int my_blocks = m_refGrid->getBlocksInfo().size();
263

264
265
266
      int right_blocks, left_blocks;

      std::vector<MPI_Request> reqs(4);
267
      MPI_Irecv(&left_blocks, 1, MPI_INT, left, 123, comm, &reqs[0]);
268
      MPI_Irecv(&right_blocks, 1, MPI_INT, right, 456, comm, &reqs[1]);
269
270
      MPI_Isend(&my_blocks, 1, MPI_INT, left, 456, comm, &reqs[2]);
      MPI_Isend(&my_blocks, 1, MPI_INT, right, 123, comm, &reqs[3]);
271
272
273

      MPI_Waitall(4, &reqs[0], MPI_STATUSES_IGNORE);

274
      int nu         = 4;
275
      int flux_left  = (my_blocks - left_blocks) / nu;
276
      int flux_right = (my_blocks - right_blocks) / nu;
277
      if (rank == size - 1) flux_right = 0;
278
      if (rank == 0) flux_left = 0;
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293

      std::vector<BlockInfo> SortedInfos = m_refGrid->getBlocksInfo();

      if (flux_right != 0 || flux_left != 0) std::sort(SortedInfos.begin(), SortedInfos.end());

      std::vector<MPI_Block> send_left;
      std::vector<MPI_Block> recv_left;
      std::vector<MPI_Block> send_right;
      std::vector<MPI_Block> recv_right;

      std::vector<MPI_Request> request;

      if (flux_left > 0) // then I will send blocks to my left rank
      {
         send_left.resize(flux_left);
294
         #pragma omp parallel for schedule(runtime)
295
296
297
         for (int i = 0; i < flux_left; i++) send_left[i].prepare(SortedInfos[i]);
         MPI_Request req;
         request.push_back(req);
298
         MPI_Isend(&send_left[0], send_left.size(), MPI_BLOCK, left, 7890, comm, &request.back());
299
300
301
302
303
304
      }
      else if (flux_left < 0) // then I will receive blocks from my left rank
      {
         recv_left.resize(abs(flux_left));
         MPI_Request req;
         request.push_back(req);
305
         MPI_Irecv(&recv_left[0], recv_left.size(), MPI_BLOCK, left, 4560, comm, &request.back());
306
307
308
309
      }
      if (flux_right > 0) // then I will send blocks to my right rank
      {
         send_right.resize(flux_right);
310
         #pragma omp parallel for schedule(runtime)
311
312
313
         for (int i = 0; i < flux_right; i++) send_right[i].prepare(SortedInfos[my_blocks - i - 1]);
         MPI_Request req;
         request.push_back(req);
314
         MPI_Isend(&send_right[0], send_right.size(), MPI_BLOCK, right, 4560, comm, &request.back());
315
316
317
318
319
320
      }
      else if (flux_right < 0) // then I will receive blocks from my right rank
      {
         recv_right.resize(abs(flux_right));
         MPI_Request req;
         request.push_back(req);
321
         MPI_Irecv(&recv_right[0], recv_right.size(), MPI_BLOCK, right, 7890, comm, &request.back());
322
323
324
325
326
327
328
329
330
331
332
333
      }

      if (request.size() != 0)
      {
         movedBlocks = true;
         MPI_Waitall(request.size(), &request[0], MPI_STATUSES_IGNORE);
      }

      for (int i = 0; i < flux_right; i++)
      {
         BlockInfo &info = SortedInfos[my_blocks - i - 1];
         m_refGrid->_dealloc(info.level, info.Z);
334
         m_refGrid->Tree(info.level, info.Z).setrank(right);
335
336
337
338
339
340
      }

      for (int i = 0; i < flux_left; i++)
      {
         BlockInfo &info = SortedInfos[i];
         m_refGrid->_dealloc(info.level, info.Z);
341
         m_refGrid->Tree(info.level, info.Z).setrank(left);
342
343
344
      }

      for (int i = 0; i < -flux_left; i++)
345
         AddBlock(recv_left[i].mn[0],recv_left[i].mn[1],recv_left[i].data);
346
347

      for (int i = 0; i < -flux_right; i++)
348
         AddBlock(recv_right[i].mn[0],recv_right[i].mn[1],recv_right[i].data);
349
350

      int temp = movedBlocks ? 1 : 0;
351
352
      MPI_Allreduce(MPI_IN_PLACE, &temp, 1, MPI_INT, MPI_SUM, comm);
      movedBlocks = (temp >= 1);
353
354
      #if 0
         if (movedBlocks == true)
355
         {
356
357
358
359
360
361
362
363
364
365
366
367
            int b = m_refGrid->getBlocksInfo().size();
            std::vector<int> all_b(size);
            MPI_Gather(&b, 1, MPI_INT, &all_b[0], 1, MPI_INT, 0, comm);
            if (rank == 0)
            {
               std::cout << "&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~\n";
               std::cout << " Distribution of blocks among ranks: \n";
               for (int r = 0; r < size; r++) std::cout << all_b[r] << " | ";
               std::cout << "\n";
               std::cout << "&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~&~\n";
               std::cout << std::endl;
            }
368
         }
369
      #endif
370
      m_refGrid->FillPos();
371
372
373
      flux_left_old  = flux_left;
      flux_right_old = flux_right;
   }
374
375
376

   void Balance_Global()
   {
377
378
379
      long long b = m_refGrid->getBlocksInfo().size();
      std::vector<long long> all_b(size);
      MPI_Allgather(&b, 1, MPI_LONG_LONG, all_b.data(), 1, MPI_LONG_LONG, comm);
380
381
382
383

      std::vector<BlockInfo> SortedInfos = m_refGrid->getBlocksInfo();
      std::sort(SortedInfos.begin(), SortedInfos.end());

384
385
      std::vector<std::vector<MPI_Block>> send_blocks(size);
      std::vector<std::vector<MPI_Block>> recv_blocks(size);
386

387
      long long total_load = 0;
388
      for (int r = 0; r < size; r++) total_load += all_b[r];
389
      long long my_load = total_load / size;
390
      if (rank < (total_load % size)) my_load += 1;
391

392
      std::vector<long long> index_start(size);
393
      index_start[0] = 0;
394
      for (int r = 1; r < size; r++) index_start[r] = index_start[r - 1] + all_b[r - 1];
395

396
      long long ideal_index = (total_load / size) * rank;
397
398
      ideal_index += (rank < (total_load % size)) ? rank : (total_load % size);

399
400
401
402
      for (int r = 0; r < size; r++)
         if (rank != r)
         {
            { // check if I need to receive blocks
403
404
405
406
407
408
               const long long a1 = ideal_index;
               const long long a2 = ideal_index + my_load - 1;
               const long long b1 = index_start[r];
               const long long b2 = index_start[r] + all_b[r] - 1;
               const long long c1 = max(a1, b1);
               const long long c2 = min(a2, b2);
409
410
411
               if (c2 - c1 + 1 > 0) recv_blocks[r].resize(c2 - c1 + 1);
            }
            { // check if I need to send blocks
412
               long long other_ideal_index = (total_load / size) * r;
413
               other_ideal_index += (r < (total_load % size)) ? r : (total_load % size);
414
               long long other_load = total_load / size;
415
               if (r < (total_load % size)) other_load += 1;
416
417
418
419
420
421
               const long long a1 = other_ideal_index;
               const long long a2 = other_ideal_index + other_load - 1;
               const long long b1 = index_start[rank];
               const long long b2 = index_start[rank] + all_b[rank] - 1;
               const long long c1 = max(a1, b1);
               const long long c2 = min(a2, b2);
422
423
               if (c2 - c1 + 1 > 0) send_blocks[r].resize(c2 - c1 + 1);
            }
424
425
         }

426
      int tag = counterg;
427
      std::vector<MPI_Request> recv_request;
428
429
430
431
432
      for (int r = 0; r < size; r++)
         if (recv_blocks[r].size() != 0)
         {
            MPI_Request req;
            recv_request.push_back(req);
433
            MPI_Irecv(recv_blocks[r].data(), recv_blocks[r].size(), MPI_BLOCK, r, tag, comm, &recv_request.back());
434
         }
435
436

      std::vector<MPI_Request> send_request;
437
438
      long long counter_S = 0;
      long long counter_E = 0;
439
440
      for (int r = 0; r < size; r++)
         if (send_blocks[r].size() != 0)
441
         {
442
443
444
445
446
447
448
449
450
451
452
453
454
            if (r < rank)
            {
               for (size_t i = 0; i < send_blocks[r].size(); i++) send_blocks[r][i].prepare(SortedInfos[counter_S + i]);
               counter_S += send_blocks[r].size();
            }
            else
            {
               for (size_t i = 0; i < send_blocks[r].size(); i++) send_blocks[r][i].prepare(SortedInfos[SortedInfos.size() - 1 - (counter_E + i)]);
               counter_E += send_blocks[r].size();
            }

            MPI_Request req;
            send_request.push_back(req);
455
            MPI_Isend(send_blocks[r].data(), send_blocks[r].size(), MPI_BLOCK, r, tag, comm, &send_request.back());
456
         }
457

458
459
      MPI_Waitall(send_request.size(), send_request.data(), MPI_STATUSES_IGNORE);
      MPI_Waitall(recv_request.size(), recv_request.data(), MPI_STATUSES_IGNORE);
460
461
462

      movedBlocks = true;

463
464
      counter_S = 0;
      counter_E = 0;
465
466
      for (int r = 0; r < size; r++)
         if (send_blocks[r].size() != 0)
467
         {
468
            if (r < rank)
469
            {
470
471
472
473
474
475
476
               for (size_t i = 0; i < send_blocks[r].size(); i++)
               {
                  BlockInfo &info = SortedInfos[counter_S + i];
                  m_refGrid->_dealloc(info.level, info.Z);
                  m_refGrid->Tree(info.level, info.Z).setrank(r);
               }
               counter_S += send_blocks[r].size();
477
            }
478
            else
479
            {
480
481
482
483
484
485
486
               for (size_t i = 0; i < send_blocks[r].size(); i++)
               {
                  BlockInfo &info = SortedInfos[SortedInfos.size() - 1 - (counter_E + i)];
                  m_refGrid->_dealloc(info.level, info.Z);
                  m_refGrid->Tree(info.level, info.Z).setrank(r);
               }
               counter_E += send_blocks[r].size();
487
            }
488
         }
489
490
      for (int r = 0; r < size; r++)
         if (recv_blocks[r].size() != 0)
491
         {
492
            for (size_t i = 0; i < recv_blocks[r].size(); i++)
493
               AddBlock(recv_blocks[r][i].mn[0],recv_blocks[r][i].mn[1],recv_blocks[r][i].data);
494
495
496
         }
      m_refGrid->FillPos();
   }
497
498
};

499
} // namespace cubism