StarPU Internal Handbook
Loading...
Searching...
No Matches
starpu_mpi_private.h
Go to the documentation of this file.
1/* StarPU --- Runtime system for heterogeneous multicore architectures.
2 *
3 * Copyright (C) 2010-2023 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
4 *
5 * StarPU is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU Lesser General Public License as published by
7 * the Free Software Foundation; either version 2.1 of the License, or (at
8 * your option) any later version.
9 *
10 * StarPU is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
13 *
14 * See the GNU Lesser General Public License in COPYING.LGPL for more details.
15 */
16
17#ifndef __STARPU_MPI_PRIVATE_H__
18#define __STARPU_MPI_PRIVATE_H__
19
20#include <starpu.h>
21#include <common/config.h>
22#include <common/uthash.h>
23#include <starpu_mpi.h>
24#include <starpu_mpi_fxt.h>
25#include <common/list.h>
26#include <common/prio_list.h>
28#include <core/simgrid.h>
29
32#ifdef __cplusplus
33extern "C"
34{
35#endif
36
37#ifdef STARPU_SIMGRID
38extern starpu_pthread_wait_t _starpu_mpi_thread_wait;
39extern starpu_pthread_queue_t _starpu_mpi_thread_dontsleep;
40
42{
43 MPI_Request *request;
44 MPI_Status *status;
45 starpu_pthread_queue_t *queue;
46 unsigned *done;
47};
48
49int _starpu_mpi_simgrid_mpi_test(unsigned *done, int *flag);
50void _starpu_mpi_simgrid_wait_req(MPI_Request *request, MPI_Status *status, starpu_pthread_queue_t *queue, unsigned *done);
51#endif
52
53struct _starpu_mpi_req* _starpu_mpi_isend_cache_aware(starpu_data_handle_t data_handle, int dest, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *_arg, int sequential_consistency, int* cache_flag);
54struct _starpu_mpi_req* _starpu_mpi_irecv_cache_aware(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *_arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int* cache_flag);
55
56extern int _starpu_debug_rank;
57char *_starpu_mpi_get_mpi_error_code(int code);
58extern int _starpu_mpi_comm_debug;
59
60#ifdef STARPU_MPI_VERBOSE
61extern int _starpu_debug_level_min;
62extern int _starpu_debug_level_max;
63void _starpu_mpi_set_debug_level_min(int level);
64void _starpu_mpi_set_debug_level_max(int level);
65#endif
66extern int _starpu_mpi_fake_world_size;
67extern int _starpu_mpi_fake_world_rank;
68extern int _starpu_mpi_use_prio;
69extern int _starpu_mpi_nobind;
70extern int _starpu_mpi_thread_cpuid;
71extern int _starpu_mpi_use_coop_sends;
72extern int _starpu_mpi_mem_throttle;
73extern int _starpu_mpi_recv_wait_finalize;
74extern int _starpu_mpi_has_cuda;
75extern int _starpu_mpi_cuda_devid;
76void _starpu_mpi_env_init(void);
77
78#ifdef STARPU_NO_ASSERT
79# define STARPU_MPI_ASSERT_MSG(x, msg, ...) do { if (0) { (void) (x); }} while(0)
80#else
81# if defined(__CUDACC__) && defined(STARPU_HAVE_WINDOWS)
82int _starpu_debug_rank;
83# define STARPU_MPI_ASSERT_MSG(x, msg, ...) \
84 do \
85 { \
86 if (STARPU_UNLIKELY(!(x))) \
87 { \
88 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
89 fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); *(int*)NULL = 0; \
90 } \
91 } while(0)
92# else
93# define STARPU_MPI_ASSERT_MSG(x, msg, ...) \
94 do \
95 { \
96 if (STARPU_UNLIKELY(!(x))) \
97 { \
98 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
99 fprintf(stderr, "\n[%d][starpu_mpi][%s][assert failure] " msg "\n\n", _starpu_debug_rank, __starpu_func__, ## __VA_ARGS__); \
100 } \
101 assert(x); \
102 } while(0)
103
104# endif
105#endif
106
107#define _STARPU_MPI_MALLOC(ptr, size) do { ptr = malloc(size); STARPU_MPI_ASSERT_MSG(ptr != NULL, "Cannot allocate %ld bytes\n", (long) (size)); } while (0)
108#define _STARPU_MPI_CALLOC(ptr, nmemb, size) do { ptr = calloc(nmemb, size); STARPU_MPI_ASSERT_MSG(ptr != NULL, "Cannot allocate %ld bytes\n", (long) (nmemb*size)); } while (0)
109#define _STARPU_MPI_REALLOC(ptr, size) do { void *_new_ptr = realloc(ptr, size); STARPU_MPI_ASSERT_MSG(_new_ptr != NULL, "Cannot reallocate %ld bytes\n", (long) (size)); ptr = _new_ptr; } while (0)
110
111#ifdef STARPU_MPI_VERBOSE
112# define _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, node, tag, utag, comm, way) \
113 do \
114 { \
115 if (_starpu_mpi_comm_debug) \
116 { \
117 int __size; \
118 char _comm_name[128]; \
119 int _comm_name_len; \
120 int _rank; \
121 starpu_mpi_comm_rank(comm, &_rank); \
122 MPI_Type_size(datatype, &__size); \
123 MPI_Comm_get_name(comm, _comm_name, &_comm_name_len); \
124 fprintf(stderr, "[%d][starpu_mpi] :%d:%s:%d:%d:%ld:%s:%p:%ld:%d:%s:%d\n", _rank, _rank, way, node, tag, utag, _comm_name, ptr, count, __size, __starpu_func__ , __LINE__); \
125 fflush(stderr); \
126 } \
127 } while(0)
128# define _STARPU_MPI_COMM_TO_DEBUG(ptr, count, datatype, dest, tag, utag, comm) _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, dest, tag, utag, comm, "-->")
129# define _STARPU_MPI_COMM_FROM_DEBUG(ptr, count, datatype, source, tag, utag, comm) _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, source, tag, utag, comm, "<--")
130# define _STARPU_MPI_DEBUG(level, fmt, ...) \
131 do \
132 { \
133 if (!_starpu_silent && _starpu_debug_level_min <= level && level <= _starpu_debug_level_max) \
134 { \
135 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
136 fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__,## __VA_ARGS__); \
137 fflush(stderr); \
138 } \
139 } while(0)
140#else
141# define _STARPU_MPI_COMM_DEBUG(ptr, count, datatype, node, tag, utag, comm, way) do { } while(0)
142# define _STARPU_MPI_COMM_TO_DEBUG(ptr, count, datatype, dest, tag, utag, comm) do { } while(0)
143# define _STARPU_MPI_COMM_FROM_DEBUG(ptr, count, datatype, source, tag, utag, comm) do { } while(0)
144# define _STARPU_MPI_DEBUG(level, fmt, ...) do { } while(0)
145#endif
146
147#define _STARPU_MPI_DISP(fmt, ...) do { if (!_starpu_silent) { \
148 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
149 fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] " fmt , (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__ ,## __VA_ARGS__); \
150 fflush(stderr); }} while(0)
151#define _STARPU_MPI_MSG(fmt, ...) do { if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
152 fprintf(stderr, "[%d][starpu_mpi][%s:%d] " fmt , _starpu_debug_rank, __starpu_func__ , __LINE__ ,## __VA_ARGS__); \
153 fflush(stderr); } while(0)
154
155#ifdef STARPU_MPI_EXTRA_VERBOSE
156# define _STARPU_MPI_LOG_IN() do { if (!_starpu_silent) { \
157 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
158 fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] -->\n", (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__ , __LINE__); \
159 fflush(stderr); }} while(0)
160# define _STARPU_MPI_LOG_OUT() do { if (!_starpu_silent) { \
161 if (_starpu_debug_rank == -1) starpu_mpi_comm_rank(MPI_COMM_WORLD, &_starpu_debug_rank); \
162 fprintf(stderr, "%*s[%d][starpu_mpi][%s:%d] <--\n", (_starpu_debug_rank+1)*4, "", _starpu_debug_rank, __starpu_func__, __LINE__); \
163 fflush(stderr); }} while(0)
164#else
165# define _STARPU_MPI_LOG_IN()
166# define _STARPU_MPI_LOG_OUT()
167#endif
168
169enum _starpu_mpi_request_type
170{
171 SEND_REQ=0,
172 RECV_REQ=1,
173 WAIT_REQ=2,
174 TEST_REQ=3,
175 BARRIER_REQ=4,
176 PROBE_REQ=5,
177 UNKNOWN_REQ=6,
178};
179
181{
182 MPI_Comm comm;
183 int rank;
184};
185
187{
188 struct _starpu_mpi_node node;
189 starpu_mpi_tag_t data_tag;
190};
191
192MULTILIST_CREATE_TYPE(_starpu_mpi_req, coop_sends)
195{
196 starpu_data_handle_t data_handle;
197
199 struct _starpu_mpi_req_multilist_coop_sends reqs;
200 struct _starpu_mpi_data *mpi_data;
201
203 struct _starpu_spinlock lock;
204 struct _starpu_mpi_req **reqs_array;
205 unsigned n;
206 unsigned redirects_sent;
207
208 /* Used to trace dependencies */
209 long pre_sync_jobid;
210};
211
214{
215 int magic;
216 struct _starpu_mpi_node_tag node_tag;
217 char *cache_sent;
218 unsigned int cache_received;
219 unsigned int ft_induced_cache_received:1;
220 unsigned int ft_induced_cache_received_count:1;
221 unsigned int modified:1; // Whether the data has been modified since the registration.
222
226
232
235};
236
237struct _starpu_mpi_data *_starpu_mpi_data_get(starpu_data_handle_t data_handle);
238
240struct _starpu_mpi_req;
243 starpu_data_handle_t data_handle;
244
245 int prio;
246 unsigned node; /* Which StarPU memory node this will read from / write to */
247
249 MPI_Datatype datatype;
250 char *datatype_name;
251 void *ptr;
252 starpu_ssize_t count;
253 int registered_datatype; // = 0: datatype is not predefined by StarPU; = 1: otherwise; initialized with -1
254
255 struct _starpu_mpi_req_backend *backend;
256
258 struct _starpu_mpi_node_tag node_tag;
259 void (*func)(struct _starpu_mpi_req *);
260
261 MPI_Status *status;
262 struct _starpu_mpi_req_multilist_coop_sends coop_sends;
263 struct _starpu_mpi_coop_sends *coop_sends_head;
264
265 int *flag;
266 unsigned sync;
267
270
271 int ret;
272
274 enum _starpu_mpi_request_type request_type;
275
276 unsigned submitted;
277 unsigned completed;
278 unsigned posted;
279
282 void *callback_arg;
283 void (*callback)(void *);
284
285 int sequential_consistency;
286
287 long pre_sync_jobid;
288 long post_sync_jobid;
289
290#ifdef STARPU_SIMGRID
291 MPI_Status status_store;
292 starpu_pthread_queue_t queue;
293 unsigned done;
294#endif
295);
296PRIO_LIST_TYPE(_starpu_mpi_req, prio)
297
298MULTILIST_CREATE_INLINES(struct _starpu_mpi_req, _starpu_mpi_req, coop_sends)
299
300
301void _starpu_mpi_req_willpost(struct _starpu_mpi_req *req);
306
307void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency);
308
309#if 0
311void _starpu_mpi_coop_sends_build_tree(struct _starpu_mpi_coop_sends *coop_sends);
312#endif
314void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency);
315
324void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_control, int submit_data);
325
326/*
327 * Fills post_sync_jobid with the reduction synchronization task jobid
328 */
329void _starpu_mpi_redux_fill_post_sync_jobid(const void * const redux_data_args, long * const post_sync_jobid);
330
331void _starpu_mpi_request_init(struct _starpu_mpi_req **req);
332struct _starpu_mpi_req * _starpu_mpi_request_fill(starpu_data_handle_t data_handle,
333 int srcdst, starpu_mpi_tag_t data_tag, MPI_Comm comm,
334 unsigned detached, unsigned sync, int prio, void (*callback)(void *), void *arg,
335 enum _starpu_mpi_request_type request_type, void (*func)(struct _starpu_mpi_req *),
336 int sequential_consistency,
337 int is_internal_req,
338 starpu_ssize_t count);
339
340void _starpu_mpi_request_destroy(struct _starpu_mpi_req *req);
341
342char *_starpu_mpi_request_type(enum _starpu_mpi_request_type request_type);
343
344struct _starpu_mpi_req *_starpu_mpi_irecv_common(starpu_data_handle_t data_handle, int source, starpu_mpi_tag_t data_tag, MPI_Comm comm, unsigned detached, unsigned sync, void (*callback)(void *), void *arg, int sequential_consistency, int is_internal_req, starpu_ssize_t count, int prio);
345
346int _starpu_mpi_choose_node(starpu_data_handle_t data_handle, enum starpu_data_access_mode mode);
347
348void _starpu_mpi_data_flush(starpu_data_handle_t data_handle);
349
352
354{
355 int initialize_mpi;
356 int *argc;
357 char ***argv;
358 MPI_Comm comm;
360 int fargc;
362 char **fargv;
363 int rank;
364 int world_size;
365};
366
371{
372 void (*_starpu_mpi_backend_init)(struct starpu_conf *conf);
373 void (*_starpu_mpi_backend_shutdown)(void);
374 int (*_starpu_mpi_backend_reserve_core)(void);
375 void (*_starpu_mpi_backend_request_init)(struct _starpu_mpi_req *req);
376 void (*_starpu_mpi_backend_request_fill)(struct _starpu_mpi_req *req, int is_internal_req);
377 void (*_starpu_mpi_backend_request_destroy)(struct _starpu_mpi_req *req);
378 void (*_starpu_mpi_backend_data_clear)(starpu_data_handle_t data_handle);
379 void (*_starpu_mpi_backend_data_register)(starpu_data_handle_t data_handle, starpu_mpi_tag_t data_tag);
380 void (*_starpu_mpi_backend_comm_register)(MPI_Comm comm);
381
382 int (*_starpu_mpi_backend_progress_init)(struct _starpu_mpi_argc_argv *argc_argv);
383 void (*_starpu_mpi_backend_progress_shutdown)(void **value);
384#ifdef STARPU_SIMGRID
385 void (*_starpu_mpi_backend_wait_for_initialization)();
386#endif
387
388 int (*_starpu_mpi_backend_barrier)(MPI_Comm comm);
389 int (*_starpu_mpi_backend_wait_for_all)(MPI_Comm comm);
390 int (*_starpu_mpi_backend_wait)(starpu_mpi_req *public_req, MPI_Status *status);
391 int (*_starpu_mpi_backend_test)(starpu_mpi_req *public_req, int *flag, MPI_Status *status);
392
393 void (*_starpu_mpi_backend_isend_size_func)(struct _starpu_mpi_req *req);
394 void (*_starpu_mpi_backend_irecv_size_func)(struct _starpu_mpi_req *req);
395};
396
397extern struct _starpu_mpi_backend _mpi_backend;
398#ifdef __cplusplus
399}
400#endif
401
402#endif // __STARPU_MPI_PRIVATE_H__
#define struct
Definition list.h:175
Definition starpu_mpi_mpi_backend.h:59
void _starpu_mpi_coop_send(starpu_data_handle_t data_handle, struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
void _starpu_mpi_tags_init(void)
struct _starpu_spinlock coop_lock
Definition starpu_mpi_private.h:229
char * redux_map
Definition starpu_mpi_private.h:225
struct _starpu_mpi_coop_sends * coop_sends
Definition starpu_mpi_private.h:231
char ** fargv
Definition starpu_mpi_private.h:362
int fargc
Definition starpu_mpi_private.h:360
unsigned nb_future_sends
Definition starpu_mpi_private.h:234
void _starpu_mpi_submit_ready_request(void *arg)
void _starpu_mpi_release_req_data(struct _starpu_mpi_req *req)
void _starpu_mpi_submit_coop_sends(struct _starpu_mpi_coop_sends *coop_sends, int submit_control, int submit_data)
Definition starpu_mpi_private.h:354
Definition starpu_mpi_private.h:195
Definition starpu_mpi_private.h:214
Definition starpu_mpi_private.h:181
Definition starpu_mpi_private.h:187
Definition starpu_mpi_private.h:42
Definition starpu_spinlock.h:82
Definition starpu_mpi_private.h:371
Definition starpu_mpi_private.h:241
enum _starpu_mpi_request_type request_type
Definition starpu_mpi_private.h:274
MPI_Datatype datatype
Definition starpu_mpi_private.h:249
int detached
Definition starpu_mpi_private.h:281
size_t reserved_size
Definition starpu_mpi_private.h:269
starpu_data_handle_t data_handle
Definition starpu_mpi_private.h:243