corosync 3.1.9
totemsrp.c
Go to the documentation of this file.
1/*
2 * Copyright (c) 2003-2006 MontaVista Software, Inc.
3 * Copyright (c) 2006-2018 Red Hat, Inc.
4 *
5 * All rights reserved.
6 *
7 * Author: Steven Dake (sdake@redhat.com)
8 *
9 * This software licensed under BSD license, the text of which follows:
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * - Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * - Redistributions in binary form must reproduce the above copyright notice,
17 * this list of conditions and the following disclaimer in the documentation
18 * and/or other materials provided with the distribution.
19 * - Neither the name of the MontaVista Software, Inc. nor the names of its
20 * contributors may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33 * THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * The first version of this code was based upon Yair Amir's PhD thesis:
38 * https://corosync.github.io/corosync/doc/Yair_phd.ps.gz (ch4,5).
39 *
40 * The current version of totemsrp implements the Totem protocol specified in:
41 * https://corosync.github.io/corosync/doc/tocssrp95.ps.gz
42 *
43 * The deviations from the above published protocols are:
44 * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45 * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46 */
47
48#include <config.h>
49
50#include <assert.h>
51#ifdef HAVE_ALLOCA_H
52#include <alloca.h>
53#endif
54#include <sys/mman.h>
55#include <sys/types.h>
56#include <sys/stat.h>
57#include <sys/socket.h>
58#include <netdb.h>
59#include <sys/un.h>
60#include <sys/ioctl.h>
61#include <sys/param.h>
62#include <netinet/in.h>
63#include <arpa/inet.h>
64#include <unistd.h>
65#include <fcntl.h>
66#include <stdlib.h>
67#include <stdio.h>
68#include <errno.h>
69#include <sched.h>
70#include <time.h>
71#include <sys/time.h>
72#include <sys/poll.h>
73#include <sys/uio.h>
74#include <limits.h>
75
76#include <qb/qblist.h>
77#include <qb/qbdefs.h>
78#include <qb/qbutil.h>
79#include <qb/qbloop.h>
80
81#include <corosync/swab.h>
82#include <corosync/sq.h>
83
84#define LOGSYS_UTILS_ONLY 1
85#include <corosync/logsys.h>
86
87#include "totemsrp.h"
88#include "totemnet.h"
89
90#include "icmap.h"
91#include "totemconfig.h"
92
93#include "cs_queue.h"
94
95#define LOCALHOST_IP inet_addr("127.0.0.1")
96#define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99#define MAXIOVS 5
100#define RETRANSMIT_ENTRIES_MAX 30
101#define TOKEN_SIZE_MAX 64000 /* bytes */
102#define LEAVE_DUMMY_NODEID 0
103
104/*
105 * SRP address.
106 */
107struct srp_addr {
108 unsigned int nodeid;
109};
110
111/*
112 * Rollover handling:
113 * SEQNO_START_MSG is the starting sequence number after a new configuration
114 * This should remain zero, unless testing overflow in which case
115 * 0x7ffff000 and 0xfffff000 are good starting values.
116 *
117 * SEQNO_START_TOKEN is the starting sequence number after a new configuration
118 * for a token. This should remain zero, unless testing overflow in which
119 * case 07fffff00 or 0xffffff00 are good starting values.
120 */
121#define SEQNO_START_MSG 0x0
122#define SEQNO_START_TOKEN 0x0
123
124/*
125 * These can be used ot test different rollover points
126 * #define SEQNO_START_MSG 0xfffffe00
127 * #define SEQNO_START_TOKEN 0xfffffe00
128 */
129
130/*
131 * These can be used to test the error recovery algorithms
132 * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
133 * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
134 * #define TEST_DROP_MCAST_PERCENTAGE 50
135 * #define TEST_RECOVERY_MSG_COUNT 300
136 */
137
138/*
139 * we compare incoming messages to determine if their endian is
140 * different - if so convert them
141 *
142 * do not change
143 */
144#define ENDIAN_LOCAL 0xff22
145
147 MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
148 MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
149 MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
150 MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
151 MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
152 MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
153};
154
159
160/*
161 * New membership algorithm local variables
162 */
165 int set;
166};
167
168
170 struct qb_list_head list;
171 int (*callback_fn) (enum totem_callback_token_type type, const void *);
173 int delete;
174 void *data;
175};
176
177
179 int mcast;
180 int token;
181};
182
183struct mcast {
186 unsigned int seq;
189 unsigned int node_id;
191} __attribute__((packed));
192
193
194struct rtr_item {
196 unsigned int seq;
197}__attribute__((packed));
198
199
200struct orf_token {
202 unsigned int seq;
203 unsigned int token_seq;
204 unsigned int aru;
205 unsigned int aru_addr;
207 unsigned int backlog;
208 unsigned int fcc;
212}__attribute__((packed));
213
214
215struct memb_join {
218 unsigned int proc_list_entries;
220 unsigned long long ring_seq;
221 unsigned char end_of_memb_join[0];
222/*
223 * These parts of the data structure are dynamic:
224 * struct srp_addr proc_list[];
225 * struct srp_addr failed_list[];
226 */
227} __attribute__((packed));
228
229
235
236
241
242
245 unsigned int aru;
246 unsigned int high_delivered;
247 unsigned int received_flg;
248}__attribute__((packed));
249
250
253 unsigned int token_seq;
255 unsigned int retrans_flg;
258 unsigned char end_of_commit_token[0];
259/*
260 * These parts of the data structure are dynamic:
261 *
262 * struct srp_addr addr[PROCESSOR_COUNT_MAX];
263 * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
264 */
265}__attribute__((packed));
266
268 struct mcast *mcast;
269 unsigned int msg_len;
271
273 struct mcast *mcast;
274 unsigned int msg_len;
275};
276
283
286
288
289 /*
290 * Flow control mcasts and remcasts on last and current orf_token
291 */
293
295
297
299
301
303
305
307
309
311
313
315
317
319
321
323
325
327
329
331
333
335
337
339
341
343
345
347
348 unsigned int my_last_aru;
349
351
353
355
356 unsigned int my_install_seq;
357
359
361
363
365
367
368 /*
369 * Queues used to order, deliver, and recover messages
370 */
372
374
376
378
380
381 /*
382 * Received up to and including
383 */
384 unsigned int my_aru;
385
386 unsigned int my_high_delivered;
387
389
390 struct qb_list_head token_callback_sent_listhead;
391
393
395
396 unsigned int my_token_seq;
397
398 /*
399 * Timers
400 */
401 qb_loop_timer_handle timer_pause_timeout;
402
403 qb_loop_timer_handle timer_orf_token_timeout;
404
405 qb_loop_timer_handle timer_orf_token_warning;
406
408
410
411 qb_loop_timer_handle timer_merge_detect_timeout;
412
414
416
418
419 qb_loop_timer_handle timer_heartbeat_timeout;
420
421 /*
422 * Function and data used to log messages
423 */
425
427
429
431
433
435
437
439 int level,
440 int subsys,
441 const char *function,
442 const char *file,
443 int line,
444 const char *format, ...)__attribute__((format(printf, 6, 7)));;
445
447
448//TODO struct srp_addr next_memb;
449
451
453
455 unsigned int nodeid,
456 const void *msg,
457 unsigned int msg_len,
458 int endian_conversion_required);
459
461 enum totem_configuration_type configuration_type,
462 const unsigned int *member_list, size_t member_list_entries,
463 const unsigned int *left_list, size_t left_list_entries,
464 const unsigned int *joined_list, size_t joined_list_entries,
465 const struct memb_ring_id *ring_id);
466
468
471
474 unsigned int nodeid);
475
477 const struct memb_ring_id *memb_ring_id,
478 unsigned int nodeid);
479
481
483
484 unsigned long long token_ring_id_seq;
485
486 unsigned int last_released;
487
488 unsigned int set_aru;
489
491
493
495
496 unsigned int my_last_seq;
497
498 struct timeval tv_old;
499
501
503
504 unsigned int use_heartbeat;
505
506 unsigned int my_trc;
507
508 unsigned int my_pbl;
509
510 unsigned int my_cbl;
511
513
515
517
519
521
523
525
527
531};
532
534 int count;
536 struct totemsrp_instance *instance,
537 const void *msg,
538 size_t msg_len,
539 int endian_conversion_needed);
540};
541
561
562const char* gather_state_from_desc [] = {
563 [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
565 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
566 [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
567 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
568 [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
569 [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
570 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
571 [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
572 [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
573 [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
574 [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
575 [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
576 [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
577 [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
578 [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
579};
580
581/*
582 * forward decls
583 */
584static int message_handler_orf_token (
585 struct totemsrp_instance *instance,
586 const void *msg,
587 size_t msg_len,
588 int endian_conversion_needed);
589
590static int message_handler_mcast (
591 struct totemsrp_instance *instance,
592 const void *msg,
593 size_t msg_len,
594 int endian_conversion_needed);
595
596static int message_handler_memb_merge_detect (
597 struct totemsrp_instance *instance,
598 const void *msg,
599 size_t msg_len,
600 int endian_conversion_needed);
601
602static int message_handler_memb_join (
603 struct totemsrp_instance *instance,
604 const void *msg,
605 size_t msg_len,
606 int endian_conversion_needed);
607
608static int message_handler_memb_commit_token (
609 struct totemsrp_instance *instance,
610 const void *msg,
611 size_t msg_len,
612 int endian_conversion_needed);
613
614static int message_handler_token_hold_cancel (
615 struct totemsrp_instance *instance,
616 const void *msg,
617 size_t msg_len,
618 int endian_conversion_needed);
619
620static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
621
622static void srp_addr_to_nodeid (
623 struct totemsrp_instance *instance,
624 unsigned int *nodeid_out,
625 struct srp_addr *srp_addr_in,
626 unsigned int entries);
627
628static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
629
630static void memb_leave_message_send (struct totemsrp_instance *instance);
631
632static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
633static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
634static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
635static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
636 int fcc_mcasts_allowed);
637static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
638
639static void memb_ring_id_set (struct totemsrp_instance *instance,
640 const struct memb_ring_id *ring_id);
641static void target_set_completed (void *context);
642static void memb_state_commit_token_update (struct totemsrp_instance *instance);
643static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
644static int memb_state_commit_token_send (struct totemsrp_instance *instance);
645static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
646static void memb_state_commit_token_create (struct totemsrp_instance *instance);
647static int token_hold_cancel_send (struct totemsrp_instance *instance);
648static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
649static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
650static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
651static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
652static void memb_merge_detect_endian_convert (
653 const struct memb_merge_detect *in,
654 struct memb_merge_detect *out);
655static struct srp_addr srp_addr_endian_convert (struct srp_addr in);
656static void timer_function_orf_token_timeout (void *data);
657static void timer_function_orf_token_warning (void *data);
658static void timer_function_pause_timeout (void *data);
659static void timer_function_heartbeat_timeout (void *data);
660static void timer_function_token_retransmit_timeout (void *data);
661static void timer_function_token_hold_retransmit_timeout (void *data);
662static void timer_function_merge_detect_timeout (void *data);
663static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
664static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
665static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
666
667int main_deliver_fn (
668 void *context,
669 const void *msg,
670 unsigned int msg_len,
671 const struct sockaddr_storage *system_from);
672
674 void *context,
675 const struct totem_ip_address *iface_address,
676 unsigned int iface_no);
677
679 6,
680 {
681 message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
682 message_handler_mcast, /* MESSAGE_TYPE_MCAST */
683 message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
684 message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
685 message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
686 message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
687 }
688};
689
690#define log_printf(level, format, args...) \
691do { \
692 instance->totemsrp_log_printf ( \
693 level, instance->totemsrp_subsys_id, \
694 __FUNCTION__, __FILE__, __LINE__, \
695 format, ##args); \
696} while (0);
697#define LOGSYS_PERROR(err_num, level, fmt, args...) \
698do { \
699 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
700 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
701 instance->totemsrp_log_printf ( \
702 level, instance->totemsrp_subsys_id, \
703 __FUNCTION__, __FILE__, __LINE__, \
704 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
705 } while(0)
706
707static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
708{
709 if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
710 return gather_state_from_desc[gsfrom];
711 }
712 else {
713 return "UNKNOWN";
714 }
715}
716
717static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
718{
719 memset (instance, 0, sizeof (struct totemsrp_instance));
720
721 qb_list_init (&instance->token_callback_received_listhead);
722
723 qb_list_init (&instance->token_callback_sent_listhead);
724
725 instance->my_received_flg = 1;
726
727 instance->my_token_seq = SEQNO_START_TOKEN - 1;
728
730
731 instance->set_aru = -1;
732
733 instance->my_aru = SEQNO_START_MSG;
734
736
738
739 instance->orf_token_discard = 0;
740
741 instance->originated_orf_token = 0;
742
743 instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
744
745 instance->waiting_trans_ack = 1;
746}
747
748static int pause_flush (struct totemsrp_instance *instance)
749{
750 uint64_t now_msec;
751 uint64_t timestamp_msec;
752 int res = 0;
753
754 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
755 timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
756
757 if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
759 "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
760 /*
761 * -1 indicates an error from recvmsg
762 */
763 do {
765 } while (res == -1);
766 }
767 return (res);
768}
769
770static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
771{
772 struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
773 uint64_t time_now;
774
775 time_now = (qb_util_nano_current_get() / QB_TIME_NS_IN_MSEC);
776
778 /* incr latest token the index */
779 if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780 instance->stats.latest_token = 0;
781 else
782 instance->stats.latest_token++;
783
784 if (instance->stats.earliest_token == instance->stats.latest_token) {
785 /* we have filled up the array, start overwriting */
786 if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787 instance->stats.earliest_token = 0;
788 else
789 instance->stats.earliest_token++;
790
791 instance->stats.token[instance->stats.earliest_token].rx = 0;
792 instance->stats.token[instance->stats.earliest_token].tx = 0;
793 instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794 }
795
796 instance->stats.token[instance->stats.latest_token].rx = time_now;
797 instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798 } else {
799 instance->stats.token[instance->stats.latest_token].tx = time_now;
800 }
801 return 0;
802}
803
804static void totempg_mtu_changed(void *context, int net_mtu)
805{
806 struct totemsrp_instance *instance = context;
807
808 instance->totem_config->net_mtu = net_mtu - 2 * sizeof (struct mcast);
809
811 "Net MTU changed to %d, new value is %d",
812 net_mtu, instance->totem_config->net_mtu);
813}
814
815/*
816 * Exported interfaces
817 */
819 qb_loop_t *poll_handle,
820 void **srp_context,
822 totempg_stats_t *stats,
823
824 void (*deliver_fn) (
825 unsigned int nodeid,
826 const void *msg,
827 unsigned int msg_len,
828 int endian_conversion_required),
829
830 void (*confchg_fn) (
831 enum totem_configuration_type configuration_type,
832 const unsigned int *member_list, size_t member_list_entries,
833 const unsigned int *left_list, size_t left_list_entries,
834 const unsigned int *joined_list, size_t joined_list_entries,
835 const struct memb_ring_id *ring_id),
836 void (*waiting_trans_ack_cb_fn) (
837 int waiting_trans_ack))
838{
839 struct totemsrp_instance *instance;
840 int res;
841
842 instance = malloc (sizeof (struct totemsrp_instance));
843 if (instance == NULL) {
844 goto error_exit;
845 }
846
847 totemsrp_instance_initialize (instance);
848
849 instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
851
852 stats->srp = &instance->stats;
853 instance->stats.latest_token = 0;
854 instance->stats.earliest_token = 0;
855
856 instance->totem_config = totem_config;
857
858 /*
859 * Configure logging
860 */
869
870 /*
871 * Configure totem store and load functions
872 */
875
876 /*
877 * Initialize local variables for totemsrp
878 */
880
881 /*
882 * Display totem configuration
883 */
885 "Token Timeout (%d ms) retransmit timeout (%d ms)",
888 uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
890 "Token warning every %d ms (%d%% of Token Timeout)",
891 token_warning_ms, totem_config->token_warning);
892 if (token_warning_ms < totem_config->token_retransmit_timeout)
894 "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895 "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
896 token_warning_ms, totem_config->token_retransmit_timeout);
897 } else {
899 "Token warnings disabled");
900 }
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
909
912 "downcheck (%d ms) fail to recv const (%d msgs)",
915 "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
920
922 "missed count const (%d messages)",
924
926 "send threads (%d threads)", totem_config->threads);
927
929 "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931 "max_network_delay (%d ms)", totem_config->max_network_delay);
932
933
934 cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935 sizeof (struct message_item), instance->threaded_mode_enabled);
936
937 sq_init (&instance->regular_sort_queue,
938 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
939
940 sq_init (&instance->recovery_sort_queue,
941 QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
942
943 instance->totemsrp_poll_handle = poll_handle;
944
945 instance->totemsrp_deliver_fn = deliver_fn;
946
947 instance->totemsrp_confchg_fn = confchg_fn;
948 instance->use_heartbeat = 1;
949
950 timer_function_pause_timeout (instance);
951
954 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955 instance->use_heartbeat = 0;
956 }
957
958 if (instance->use_heartbeat) {
959 instance->heartbeat_timeout
962
963 if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966 instance->heartbeat_timeout,
969 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972 instance->use_heartbeat = 0;
973 }
974 else {
976 "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977 }
978 }
979
980 res = totemnet_initialize (
981 poll_handle,
982 &instance->totemnet_context,
984 stats->srp,
985 instance,
988 totempg_mtu_changed,
989 target_set_completed);
990 if (res == -1) {
991 goto error_exit;
992 }
993
994 instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995
996 /*
997 * Must have net_mtu adjusted by totemnet_initialize first
998 */
999 cs_queue_init (&instance->new_message_queue,
1001 sizeof (struct message_item), instance->threaded_mode_enabled);
1002
1003 cs_queue_init (&instance->new_message_queue_trans,
1005 sizeof (struct message_item), instance->threaded_mode_enabled);
1006
1008 &instance->token_recv_event_handle,
1010 0,
1011 token_event_stats_collector,
1012 instance);
1014 &instance->token_sent_event_handle,
1016 0,
1017 token_event_stats_collector,
1018 instance);
1019 *srp_context = instance;
1020 return (0);
1021
1022error_exit:
1023 return (-1);
1024}
1025
1027 void *srp_context)
1028{
1029 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030
1031 memb_leave_message_send (instance);
1033 cs_queue_free (&instance->new_message_queue);
1034 cs_queue_free (&instance->new_message_queue_trans);
1035 cs_queue_free (&instance->retrans_message_queue);
1036 sq_free (&instance->regular_sort_queue);
1037 sq_free (&instance->recovery_sort_queue);
1038 free (instance);
1039}
1040
1042 void *srp_context,
1043 unsigned int nodeid,
1044 struct totem_node_status *node_status)
1045{
1046 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1047 int i;
1048
1050
1051 /* Fill in 'reachable' here as the lower level UDP[u] layers don't know */
1052 for (i = 0; i < instance->my_proc_list_entries; i++) {
1053 if (instance->my_proc_list[i].nodeid == nodeid) {
1054 node_status->reachable = 1;
1055 }
1056 }
1057
1058 return totemnet_nodestatus_get(instance->totemnet_context, nodeid, node_status);
1059}
1060
1061
1062/*
1063 * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1064 * with interaces_size number of items. iface_count is final number of interfaces filled by this
1065 * function.
1066 *
1067 * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1068 * and if interface was not found, -1 is returned.
1069 */
1071 void *srp_context,
1072 unsigned int nodeid,
1073 unsigned int *interface_id,
1074 struct totem_ip_address *interfaces,
1075 unsigned int interfaces_size,
1076 char ***status,
1077 unsigned int *iface_count)
1078{
1079 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1080 struct totem_ip_address *iface_ptr = interfaces;
1081 int res = 0;
1082 int i,n;
1083 int num_ifs = 0;
1084
1085 memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1086 *iface_count = INTERFACE_MAX;
1087
1088 for (i=0; i<INTERFACE_MAX; i++) {
1089 for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1090 if (instance->totem_config->interfaces[i].configured &&
1091 instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1092 memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1093 interface_id[num_ifs] = i;
1094 iface_ptr++;
1095 if (++num_ifs > interfaces_size) {
1096 res = -2;
1097 break;
1098 }
1099 }
1100 }
1101 }
1102
1103 totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1104 *iface_count = num_ifs;
1105 return (res);
1106}
1107
1109 void *srp_context,
1110 const char *cipher_type,
1111 const char *hash_type)
1112{
1113 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1114 int res;
1115
1116 res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1117
1118 return (res);
1119}
1120
1121
1123 void *srp_context)
1124{
1125 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126 unsigned int res;
1127
1128 res = instance->my_id.nodeid;
1129
1130 return (res);
1131}
1132
1134 void *srp_context)
1135{
1136 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1137 int res;
1138
1139 res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1140
1141 return (res);
1142}
1143
1144
1145/*
1146 * Set operations for use by the membership algorithm
1147 */
1148static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1149{
1150 if (a->nodeid == b->nodeid) {
1151 return 1;
1152 }
1153 return 0;
1154}
1155
1156static void srp_addr_to_nodeid (
1157 struct totemsrp_instance *instance,
1158 unsigned int *nodeid_out,
1159 struct srp_addr *srp_addr_in,
1160 unsigned int entries)
1161{
1162 unsigned int i;
1163
1164 for (i = 0; i < entries; i++) {
1165 nodeid_out[i] = srp_addr_in[i].nodeid;
1166 }
1167}
1168
1169static struct srp_addr srp_addr_endian_convert (struct srp_addr in)
1170{
1171 struct srp_addr res;
1172
1173 res.nodeid = swab32 (in.nodeid);
1174
1175 return (res);
1176}
1177
1178static void memb_consensus_reset (struct totemsrp_instance *instance)
1179{
1180 instance->consensus_list_entries = 0;
1181}
1182
1183static void memb_set_subtract (
1184 struct srp_addr *out_list, int *out_list_entries,
1185 struct srp_addr *one_list, int one_list_entries,
1186 struct srp_addr *two_list, int two_list_entries)
1187{
1188 int found = 0;
1189 int i;
1190 int j;
1191
1192 *out_list_entries = 0;
1193
1194 for (i = 0; i < one_list_entries; i++) {
1195 for (j = 0; j < two_list_entries; j++) {
1196 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1197 found = 1;
1198 break;
1199 }
1200 }
1201 if (found == 0) {
1202 out_list[*out_list_entries] = one_list[i];
1203 *out_list_entries = *out_list_entries + 1;
1204 }
1205 found = 0;
1206 }
1207}
1208
1209/*
1210 * Set consensus for a specific processor
1211 */
1212static void memb_consensus_set (
1213 struct totemsrp_instance *instance,
1214 const struct srp_addr *addr)
1215{
1216 int found = 0;
1217 int i;
1218
1219 for (i = 0; i < instance->consensus_list_entries; i++) {
1220 if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1221 found = 1;
1222 break; /* found entry */
1223 }
1224 }
1225 instance->consensus_list[i].addr = *addr;
1226 instance->consensus_list[i].set = 1;
1227 if (found == 0) {
1228 instance->consensus_list_entries++;
1229 }
1230 return;
1231}
1232
1233/*
1234 * Is consensus set for a specific processor
1235 */
1236static int memb_consensus_isset (
1237 struct totemsrp_instance *instance,
1238 const struct srp_addr *addr)
1239{
1240 int i;
1241
1242 for (i = 0; i < instance->consensus_list_entries; i++) {
1243 if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1244 return (instance->consensus_list[i].set);
1245 }
1246 }
1247 return (0);
1248}
1249
1250/*
1251 * Is consensus agreed upon based upon consensus database
1252 */
1253static int memb_consensus_agreed (
1254 struct totemsrp_instance *instance)
1255{
1256 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1257 int token_memb_entries = 0;
1258 int agreed = 1;
1259 int i;
1260
1261 memb_set_subtract (token_memb, &token_memb_entries,
1262 instance->my_proc_list, instance->my_proc_list_entries,
1263 instance->my_failed_list, instance->my_failed_list_entries);
1264
1265 for (i = 0; i < token_memb_entries; i++) {
1266 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1267 agreed = 0;
1268 break;
1269 }
1270 }
1271
1272 if (agreed && instance->failed_to_recv == 1) {
1273 /*
1274 * Both nodes agreed on our failure. We don't care how many proc list items left because we
1275 * will create single ring anyway.
1276 */
1277
1278 return (agreed);
1279 }
1280
1281 assert (token_memb_entries >= 1);
1282
1283 return (agreed);
1284}
1285
1286static void memb_consensus_notset (
1287 struct totemsrp_instance *instance,
1288 struct srp_addr *no_consensus_list,
1289 int *no_consensus_list_entries,
1290 struct srp_addr *comparison_list,
1291 int comparison_list_entries)
1292{
1293 int i;
1294
1295 *no_consensus_list_entries = 0;
1296
1297 for (i = 0; i < instance->my_proc_list_entries; i++) {
1298 if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1299 no_consensus_list[*no_consensus_list_entries] = instance->my_proc_list[i];
1300 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1301 }
1302 }
1303}
1304
1305/*
1306 * Is set1 equal to set2 Entries can be in different orders
1307 */
1308static int memb_set_equal (
1309 struct srp_addr *set1, int set1_entries,
1310 struct srp_addr *set2, int set2_entries)
1311{
1312 int i;
1313 int j;
1314
1315 int found = 0;
1316
1317 if (set1_entries != set2_entries) {
1318 return (0);
1319 }
1320 for (i = 0; i < set2_entries; i++) {
1321 for (j = 0; j < set1_entries; j++) {
1322 if (srp_addr_equal (&set1[j], &set2[i])) {
1323 found = 1;
1324 break;
1325 }
1326 }
1327 if (found == 0) {
1328 return (0);
1329 }
1330 found = 0;
1331 }
1332 return (1);
1333}
1334
1335/*
1336 * Is subset fully contained in fullset
1337 */
1338static int memb_set_subset (
1339 const struct srp_addr *subset, int subset_entries,
1340 const struct srp_addr *fullset, int fullset_entries)
1341{
1342 int i;
1343 int j;
1344 int found = 0;
1345
1346 if (subset_entries > fullset_entries) {
1347 return (0);
1348 }
1349 for (i = 0; i < subset_entries; i++) {
1350 for (j = 0; j < fullset_entries; j++) {
1351 if (srp_addr_equal (&subset[i], &fullset[j])) {
1352 found = 1;
1353 }
1354 }
1355 if (found == 0) {
1356 return (0);
1357 }
1358 found = 0;
1359 }
1360 return (1);
1361}
1362/*
1363 * merge subset into fullset taking care not to add duplicates
1364 */
1365static void memb_set_merge (
1366 const struct srp_addr *subset, int subset_entries,
1367 struct srp_addr *fullset, int *fullset_entries)
1368{
1369 int found = 0;
1370 int i;
1371 int j;
1372
1373 for (i = 0; i < subset_entries; i++) {
1374 for (j = 0; j < *fullset_entries; j++) {
1375 if (srp_addr_equal (&fullset[j], &subset[i])) {
1376 found = 1;
1377 break;
1378 }
1379 }
1380 if (found == 0) {
1381 fullset[*fullset_entries] = subset[i];
1382 *fullset_entries = *fullset_entries + 1;
1383 }
1384 found = 0;
1385 }
1386 return;
1387}
1388
1389static void memb_set_and_with_ring_id (
1390 struct srp_addr *set1,
1391 struct memb_ring_id *set1_ring_ids,
1392 int set1_entries,
1393 struct srp_addr *set2,
1394 int set2_entries,
1395 struct memb_ring_id *old_ring_id,
1396 struct srp_addr *and,
1397 int *and_entries)
1398{
1399 int i;
1400 int j;
1401 int found = 0;
1402
1403 *and_entries = 0;
1404
1405 for (i = 0; i < set2_entries; i++) {
1406 for (j = 0; j < set1_entries; j++) {
1407 if (srp_addr_equal (&set1[j], &set2[i])) {
1408 if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1409 found = 1;
1410 }
1411 break;
1412 }
1413 }
1414 if (found) {
1415 and[*and_entries] = set1[j];
1416 *and_entries = *and_entries + 1;
1417 }
1418 found = 0;
1419 }
1420 return;
1421}
1422
1423static void memb_set_log(
1424 struct totemsrp_instance *instance,
1425 int level,
1426 const char *string,
1427 struct srp_addr *list,
1428 int list_entries)
1429{
1430 char int_buf[32];
1431 char list_str[512];
1432 int i;
1433
1434 memset(list_str, 0, sizeof(list_str));
1435
1436 for (i = 0; i < list_entries; i++) {
1437 if (i == 0) {
1438 snprintf(int_buf, sizeof(int_buf), CS_PRI_NODE_ID, list[i].nodeid);
1439 } else {
1440 snprintf(int_buf, sizeof(int_buf), "," CS_PRI_NODE_ID, list[i].nodeid);
1441 }
1442
1443 if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1444 break ;
1445 }
1446 strcat(list_str, int_buf);
1447 }
1448
1449 log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1450}
1451
1452static void my_leave_memb_clear(
1453 struct totemsrp_instance *instance)
1454{
1455 memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1456 instance->my_leave_memb_entries = 0;
1457}
1458
1459static unsigned int my_leave_memb_match(
1460 struct totemsrp_instance *instance,
1461 unsigned int nodeid)
1462{
1463 int i;
1464 unsigned int ret = 0;
1465
1466 for (i = 0; i < instance->my_leave_memb_entries; i++){
1467 if (instance->my_leave_memb_list[i] == nodeid){
1468 ret = nodeid;
1469 break;
1470 }
1471 }
1472 return ret;
1473}
1474
1475static void my_leave_memb_set(
1476 struct totemsrp_instance *instance,
1477 unsigned int nodeid)
1478{
1479 int i, found = 0;
1480 for (i = 0; i < instance->my_leave_memb_entries; i++){
1481 if (instance->my_leave_memb_list[i] == nodeid){
1482 found = 1;
1483 break;
1484 }
1485 }
1486 if (found == 1) {
1487 return;
1488 }
1489 if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1490 instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1491 instance->my_leave_memb_entries++;
1492 } else {
1494 "Cannot set LEAVE nodeid=" CS_PRI_NODE_ID, nodeid);
1495 }
1496}
1497
1498
1499static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1500{
1501 assert (instance != NULL);
1502 return totemnet_buffer_alloc (instance->totemnet_context);
1503}
1504
1505static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1506{
1507 assert (instance != NULL);
1509}
1510
1511static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1512{
1513 int32_t res;
1514
1515 qb_loop_timer_del (instance->totemsrp_poll_handle,
1517 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1518 QB_LOOP_MED,
1519 instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1520 (void *)instance,
1521 timer_function_token_retransmit_timeout,
1522 &instance->timer_orf_token_retransmit_timeout);
1523 if (res != 0) {
1524 log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1525 }
1526
1527}
1528
1529static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1530{
1531 int32_t res;
1532
1533 if (instance->my_merge_detect_timeout_outstanding == 0) {
1534 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1535 QB_LOOP_MED,
1536 instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1537 (void *)instance,
1538 timer_function_merge_detect_timeout,
1539 &instance->timer_merge_detect_timeout);
1540 if (res != 0) {
1541 log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1542 }
1543
1545 }
1546}
1547
1548static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1549{
1550 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1552}
1553
1554/*
1555 * ring_state_* is used to save and restore the sort queue
1556 * state when a recovery operation fails (and enters gather)
1557 */
1558static void old_ring_state_save (struct totemsrp_instance *instance)
1559{
1560 if (instance->old_ring_state_saved == 0) {
1561 instance->old_ring_state_saved = 1;
1562 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1563 sizeof (struct memb_ring_id));
1564 instance->old_ring_state_aru = instance->my_aru;
1567 "Saving state aru %x high seq received %x",
1568 instance->my_aru, instance->my_high_seq_received);
1569 }
1570}
1571
1572static void old_ring_state_restore (struct totemsrp_instance *instance)
1573{
1574 instance->my_aru = instance->old_ring_state_aru;
1577 "Restoring instance->my_aru %x my high seq received %x",
1578 instance->my_aru, instance->my_high_seq_received);
1579}
1580
1581static void old_ring_state_reset (struct totemsrp_instance *instance)
1582{
1584 "Resetting old ring state");
1585 instance->old_ring_state_saved = 0;
1586}
1587
1588static void reset_pause_timeout (struct totemsrp_instance *instance)
1589{
1590 int32_t res;
1591
1592 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1593 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1594 QB_LOOP_MED,
1595 instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1596 (void *)instance,
1597 timer_function_pause_timeout,
1598 &instance->timer_pause_timeout);
1599 if (res != 0) {
1600 log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1601 }
1602}
1603
1604static void reset_token_warning (struct totemsrp_instance *instance) {
1605 int32_t res;
1606
1607 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1608 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1609 QB_LOOP_MED,
1610 instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1611 (void *)instance,
1612 timer_function_orf_token_warning,
1613 &instance->timer_orf_token_warning);
1614 if (res != 0) {
1615 log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1616 }
1617}
1618
1619static void reset_token_timeout (struct totemsrp_instance *instance) {
1620 int32_t res;
1621
1622 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1623 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1624 QB_LOOP_MED,
1625 instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1626 (void *)instance,
1627 timer_function_orf_token_timeout,
1628 &instance->timer_orf_token_timeout);
1629 if (res != 0) {
1630 log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1631 }
1632
1633 if (instance->totem_config->token_warning)
1634 reset_token_warning(instance);
1635}
1636
1637static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1638 int32_t res;
1639
1640 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1641 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1642 QB_LOOP_MED,
1643 instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1644 (void *)instance,
1645 timer_function_heartbeat_timeout,
1646 &instance->timer_heartbeat_timeout);
1647 if (res != 0) {
1648 log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1649 }
1650}
1651
1652
1653static void cancel_token_warning (struct totemsrp_instance *instance) {
1654 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1655}
1656
1657static void cancel_token_timeout (struct totemsrp_instance *instance) {
1658 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1659
1660 if (instance->totem_config->token_warning)
1661 cancel_token_warning(instance);
1662}
1663
1664static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1665 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1666}
1667
1668static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1669{
1670 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1671}
1672
1673static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1674{
1675 int32_t res;
1676
1677 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1678 QB_LOOP_MED,
1679 instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1680 (void *)instance,
1681 timer_function_token_hold_retransmit_timeout,
1682 &instance->timer_orf_token_hold_retransmit_timeout);
1683 if (res != 0) {
1684 log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1685 }
1686}
1687
1688static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1689{
1690 qb_loop_timer_del (instance->totemsrp_poll_handle,
1692}
1693
1694static void memb_state_consensus_timeout_expired (
1695 struct totemsrp_instance *instance)
1696{
1697 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1698 int no_consensus_list_entries;
1699
1700 instance->stats.consensus_timeouts++;
1701 if (memb_consensus_agreed (instance)) {
1702 memb_consensus_reset (instance);
1703
1704 memb_consensus_set (instance, &instance->my_id);
1705
1706 reset_token_timeout (instance); // REVIEWED
1707 } else {
1708 memb_consensus_notset (
1709 instance,
1710 no_consensus_list,
1711 &no_consensus_list_entries,
1712 instance->my_proc_list,
1713 instance->my_proc_list_entries);
1714
1715 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1716 instance->my_failed_list, &instance->my_failed_list_entries);
1717 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1718 }
1719}
1720
1721static void memb_join_message_send (struct totemsrp_instance *instance);
1722
1723static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1724
1725/*
1726 * Timers used for various states of the membership algorithm
1727 */
1728static void timer_function_pause_timeout (void *data)
1729{
1730 struct totemsrp_instance *instance = data;
1731
1732 instance->pause_timestamp = qb_util_nano_current_get ();
1733 reset_pause_timeout (instance);
1734}
1735
1736static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1737{
1738 old_ring_state_restore (instance);
1739 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1740 instance->stats.recovery_token_lost++;
1741}
1742
1743static void timer_function_orf_token_warning (void *data)
1744{
1745 struct totemsrp_instance *instance = data;
1746 uint64_t tv_diff;
1747
1748 /* need to protect against the case where token_warning is set to 0 dynamically */
1749 if (instance->totem_config->token_warning) {
1750 tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1751 instance->stats.token[instance->stats.latest_token].rx;
1753 "Token has not been received in %"PRIu64" ms", tv_diff);
1754 reset_token_warning(instance);
1755 } else {
1756 cancel_token_warning(instance);
1757 }
1758}
1759
1760static void timer_function_orf_token_timeout (void *data)
1761{
1762 struct totemsrp_instance *instance = data;
1763
1764 switch (instance->memb_state) {
1767 "The token was lost in the OPERATIONAL state.");
1769 "A processor failed, forming new configuration:"
1770 " token timed out (%ums), waiting %ums for consensus.",
1771 instance->totem_config->token_timeout,
1772 instance->totem_config->consensus_timeout);
1774 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1775 instance->stats.operational_token_lost++;
1776 break;
1777
1778 case MEMB_STATE_GATHER:
1780 "The consensus timeout expired (%ums).",
1781 instance->totem_config->consensus_timeout);
1782 memb_state_consensus_timeout_expired (instance);
1783 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1784 instance->stats.gather_token_lost++;
1785 break;
1786
1787 case MEMB_STATE_COMMIT:
1789 "The token was lost in the COMMIT state.");
1790 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1791 instance->stats.commit_token_lost++;
1792 break;
1793
1796 "The token was lost in the RECOVERY state.");
1797 memb_recovery_state_token_loss (instance);
1798 instance->orf_token_discard = 1;
1799 break;
1800 }
1801}
1802
1803static void timer_function_heartbeat_timeout (void *data)
1804{
1805 struct totemsrp_instance *instance = data;
1807 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1808 timer_function_orf_token_timeout(data);
1809}
1810
1811static void memb_timer_function_state_gather (void *data)
1812{
1813 struct totemsrp_instance *instance = data;
1814 int32_t res;
1815
1816 switch (instance->memb_state) {
1819 assert (0); /* this should never happen */
1820 break;
1821 case MEMB_STATE_GATHER:
1822 case MEMB_STATE_COMMIT:
1823 memb_join_message_send (instance);
1824
1825 /*
1826 * Restart the join timeout
1827 `*/
1828 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1829
1830 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1831 QB_LOOP_MED,
1832 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1833 (void *)instance,
1834 memb_timer_function_state_gather,
1835 &instance->memb_timer_state_gather_join_timeout);
1836
1837 if (res != 0) {
1838 log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1839 }
1840 break;
1841 }
1842}
1843
1844static void memb_timer_function_gather_consensus_timeout (void *data)
1845{
1846 struct totemsrp_instance *instance = data;
1847 memb_state_consensus_timeout_expired (instance);
1848}
1849
1850static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1851{
1852 unsigned int i;
1853 struct sort_queue_item *recovery_message_item;
1854 struct sort_queue_item regular_message_item;
1855 unsigned int range = 0;
1856 int res;
1857 void *ptr;
1858 struct mcast *mcast;
1859
1861 "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1862
1863 range = instance->my_aru - SEQNO_START_MSG;
1864 /*
1865 * Move messages from recovery to regular sort queue
1866 */
1867// todo should i be initialized to 0 or 1 ?
1868 for (i = 1; i <= range; i++) {
1869 res = sq_item_get (&instance->recovery_sort_queue,
1870 i + SEQNO_START_MSG, &ptr);
1871 if (res != 0) {
1872 continue;
1873 }
1874 recovery_message_item = ptr;
1875
1876 /*
1877 * Convert recovery message into regular message
1878 */
1879 mcast = recovery_message_item->mcast;
1881 /*
1882 * Message is a recovery message encapsulated
1883 * in a new ring message
1884 */
1885 regular_message_item.mcast =
1886 (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1887 regular_message_item.msg_len =
1888 recovery_message_item->msg_len - sizeof (struct mcast);
1889 mcast = regular_message_item.mcast;
1890 } else {
1891 /*
1892 * TODO this case shouldn't happen
1893 */
1894 continue;
1895 }
1896
1898 "comparing if ring id is for this processors old ring seqno " CS_PRI_RING_ID_SEQ,
1899 (uint64_t)mcast->seq);
1900
1901 /*
1902 * Only add this message to the regular sort
1903 * queue if it was originated with the same ring
1904 * id as the previous ring
1905 */
1906 if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1907 sizeof (struct memb_ring_id)) == 0) {
1908
1909 res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1910 if (res == 0) {
1911 sq_item_add (&instance->regular_sort_queue,
1912 &regular_message_item, mcast->seq);
1913 if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1915 }
1916 }
1917 } else {
1919 "-not adding msg with seq no " CS_PRI_RING_ID_SEQ, (uint64_t)mcast->seq);
1920 }
1921 }
1922}
1923
1924/*
1925 * Change states in the state machine of the membership algorithm
1926 */
1927static void memb_state_operational_enter (struct totemsrp_instance *instance)
1928{
1929 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1930 int joined_list_entries = 0;
1931 unsigned int aru_save;
1932 unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1933 unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1934 unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1935 unsigned int left_list[PROCESSOR_COUNT_MAX];
1936 unsigned int i;
1937 unsigned int res;
1938 char left_node_msg[1024];
1939 char joined_node_msg[1024];
1940 char failed_node_msg[1024];
1941
1942 instance->originated_orf_token = 0;
1943
1944 memb_consensus_reset (instance);
1945
1946 old_ring_state_reset (instance);
1947
1948 deliver_messages_from_recovery_to_regular (instance);
1949
1951 "Delivering to app %x to %x",
1952 instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1953
1954 aru_save = instance->my_aru;
1955 instance->my_aru = instance->old_ring_state_aru;
1956
1957 messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1958
1959 /*
1960 * Calculate joined and left list
1961 */
1962 memb_set_subtract (instance->my_left_memb_list,
1963 &instance->my_left_memb_entries,
1964 instance->my_memb_list, instance->my_memb_entries,
1965 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1966
1967 memb_set_subtract (joined_list, &joined_list_entries,
1968 instance->my_new_memb_list, instance->my_new_memb_entries,
1969 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1970
1971 /*
1972 * Install new membership
1973 */
1974 instance->my_memb_entries = instance->my_new_memb_entries;
1975 memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1976 sizeof (struct srp_addr) * instance->my_memb_entries);
1977 instance->last_released = 0;
1978 instance->my_set_retrans_flg = 0;
1979
1980 /*
1981 * Deliver transitional configuration to application
1982 */
1983 srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1984 instance->my_left_memb_entries);
1985 srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1986 instance->my_trans_memb_list, instance->my_trans_memb_entries);
1988 trans_memb_list_totemip, instance->my_trans_memb_entries,
1989 left_list, instance->my_left_memb_entries,
1990 0, 0, &instance->my_ring_id);
1991 /*
1992 * Switch new totemsrp messages queue. Messages sent from now on are stored
1993 * in different queue so synchronization messages are delivered first. Totempg
1994 * buffers will be switched later.
1995 */
1996 instance->waiting_trans_ack = 1;
1997
1998// TODO we need to filter to ensure we only deliver those
1999// messages which are part of instance->my_deliver_memb
2000 messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
2001
2002 /*
2003 * Switch totempg buffers. This used to be right after
2004 * instance->waiting_trans_ack = 1;
2005 * line. This was causing problem, because there may be not yet
2006 * processed parts of messages in totempg buffers.
2007 * So when buffers were switched and recovered messages
2008 * got delivered it was not possible to assemble them.
2009 */
2011
2012 instance->my_aru = aru_save;
2013
2014 /*
2015 * Deliver regular configuration to application
2016 */
2017 srp_addr_to_nodeid (instance, new_memb_list_totemip,
2018 instance->my_new_memb_list, instance->my_new_memb_entries);
2019 srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
2020 joined_list_entries);
2022 new_memb_list_totemip, instance->my_new_memb_entries,
2023 0, 0,
2024 joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2025
2026 /*
2027 * The recovery sort queue now becomes the regular
2028 * sort queue. It is necessary to copy the state
2029 * into the regular sort queue.
2030 */
2031 sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2032 instance->my_last_aru = SEQNO_START_MSG;
2033
2034 /* When making my_proc_list smaller, ensure that the
2035 * now non-used entries are zero-ed out. There are some suspect
2036 * assert's that assume that there is always 2 entries in the list.
2037 * These fail when my_proc_list is reduced to 1 entry (and the
2038 * valid [0] entry is the same as the 'unused' [1] entry).
2039 */
2040 memset(instance->my_proc_list, 0,
2041 sizeof (struct srp_addr) * instance->my_proc_list_entries);
2042
2043 instance->my_proc_list_entries = instance->my_new_memb_entries;
2044 memcpy (instance->my_proc_list, instance->my_new_memb_list,
2045 sizeof (struct srp_addr) * instance->my_memb_entries);
2046
2047 instance->my_failed_list_entries = 0;
2048 /*
2049 * TODO Not exactly to spec
2050 *
2051 * At the entry to this function all messages without a gap are
2052 * deliered.
2053 *
2054 * This code throw away messages from the last gap in the sort queue
2055 * to my_high_seq_received
2056 *
2057 * What should really happen is we should deliver all messages up to
2058 * a gap, then delier the transitional configuration, then deliver
2059 * the messages between the first gap and my_high_seq_received, then
2060 * deliver a regular configuration, then deliver the regular
2061 * configuration
2062 *
2063 * Unfortunately totempg doesn't appear to like this operating mode
2064 * which needs more inspection
2065 */
2066 i = instance->my_high_seq_received + 1;
2067 do {
2068 void *ptr;
2069
2070 i -= 1;
2071 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2072 if (i == 0) {
2073 break;
2074 }
2075 } while (res);
2076
2077 instance->my_high_delivered = i;
2078
2079 for (i = 0; i <= instance->my_high_delivered; i++) {
2080 void *ptr;
2081
2082 res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2083 if (res == 0) {
2084 struct sort_queue_item *regular_message;
2085
2086 regular_message = ptr;
2087 free (regular_message->mcast);
2088 }
2089 }
2090 sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2091 instance->last_released = instance->my_high_delivered;
2092
2093 if (joined_list_entries) {
2094 int sptr = 0;
2095 sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2096 for (i=0; i< joined_list_entries; i++) {
2097 sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " " CS_PRI_NODE_ID, joined_list_totemip[i]);
2098 }
2099 }
2100 else {
2101 joined_node_msg[0] = '\0';
2102 }
2103
2104 if (instance->my_left_memb_entries) {
2105 int sptr = 0;
2106 int sptr2 = 0;
2107 sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2108 for (i=0; i< instance->my_left_memb_entries; i++) {
2109 sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " " CS_PRI_NODE_ID, left_list[i]);
2110 }
2111 for (i=0; i< instance->my_left_memb_entries; i++) {
2112 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2113 if (sptr2 == 0) {
2114 sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2115 }
2116 sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " " CS_PRI_NODE_ID, left_list[i]);
2117 }
2118 }
2119 if (sptr2 == 0) {
2120 failed_node_msg[0] = '\0';
2121 }
2122 }
2123 else {
2124 left_node_msg[0] = '\0';
2125 failed_node_msg[0] = '\0';
2126 }
2127
2128 my_leave_memb_clear(instance);
2129
2131 "entering OPERATIONAL state.");
2133 "A new membership (" CS_PRI_RING_ID ") was formed. Members%s%s",
2134 instance->my_ring_id.rep,
2135 (uint64_t)instance->my_ring_id.seq,
2136 joined_node_msg,
2137 left_node_msg);
2138
2139 if (strlen(failed_node_msg)) {
2141 "Failed to receive the leave message.%s",
2142 failed_node_msg);
2143 }
2144
2146
2147 instance->stats.operational_entered++;
2148 instance->stats.continuous_gather = 0;
2149
2150 instance->my_received_flg = 1;
2151
2152 reset_pause_timeout (instance);
2153
2154 /*
2155 * Save ring id information from this configuration to determine
2156 * which processors are transitioning from old regular configuration
2157 * in to new regular configuration on the next configuration change
2158 */
2159 memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2160 sizeof (struct memb_ring_id));
2161
2162 return;
2163}
2164
2165static void memb_state_gather_enter (
2166 struct totemsrp_instance *instance,
2167 enum gather_state_from gather_from)
2168{
2169 int32_t res;
2170
2171 instance->orf_token_discard = 1;
2172
2173 instance->originated_orf_token = 0;
2174
2175 memb_set_merge (
2176 &instance->my_id, 1,
2177 instance->my_proc_list, &instance->my_proc_list_entries);
2178
2179 memb_join_message_send (instance);
2180
2181 /*
2182 * Restart the join timeout
2183 */
2184 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2185
2186 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2187 QB_LOOP_MED,
2188 instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2189 (void *)instance,
2190 memb_timer_function_state_gather,
2191 &instance->memb_timer_state_gather_join_timeout);
2192 if (res != 0) {
2193 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2194 }
2195
2196 /*
2197 * Restart the consensus timeout
2198 */
2199 qb_loop_timer_del (instance->totemsrp_poll_handle,
2201
2202 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2203 QB_LOOP_MED,
2204 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2205 (void *)instance,
2206 memb_timer_function_gather_consensus_timeout,
2207 &instance->memb_timer_state_gather_consensus_timeout);
2208 if (res != 0) {
2209 log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2210 }
2211
2212 /*
2213 * Cancel the token loss and token retransmission timeouts
2214 */
2215 cancel_token_retransmit_timeout (instance); // REVIEWED
2216 cancel_token_timeout (instance); // REVIEWED
2217 cancel_merge_detect_timeout (instance);
2218
2219 memb_consensus_reset (instance);
2220
2221 memb_consensus_set (instance, &instance->my_id);
2222
2224 "entering GATHER state from %d(%s).",
2225 gather_from, gsfrom_to_msg(gather_from));
2226
2227 instance->memb_state = MEMB_STATE_GATHER;
2228 instance->stats.gather_entered++;
2229
2231 /*
2232 * State 3 means gather, so we are continuously gathering.
2233 */
2234 instance->stats.continuous_gather++;
2235 }
2236
2237 return;
2238}
2239
2240static void timer_function_token_retransmit_timeout (void *data);
2241
2242static void target_set_completed (
2243 void *context)
2244{
2245 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2246
2247 memb_state_commit_token_send (instance);
2248
2249}
2250
2251static void memb_state_commit_enter (
2252 struct totemsrp_instance *instance)
2253{
2254 old_ring_state_save (instance);
2255
2256 memb_state_commit_token_update (instance);
2257
2258 memb_state_commit_token_target_set (instance);
2259
2260 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2261
2263
2264 qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2265
2267
2268 memb_ring_id_set (instance, &instance->commit_token->ring_id);
2269
2270 instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2271
2272 instance->token_ring_id_seq = instance->my_ring_id.seq;
2273
2275 "entering COMMIT state.");
2276
2277 instance->memb_state = MEMB_STATE_COMMIT;
2278 reset_token_retransmit_timeout (instance); // REVIEWED
2279 reset_token_timeout (instance); // REVIEWED
2280
2281 instance->stats.commit_entered++;
2282 instance->stats.continuous_gather = 0;
2283
2284 /*
2285 * reset all flow control variables since we are starting a new ring
2286 */
2287 instance->my_trc = 0;
2288 instance->my_pbl = 0;
2289 instance->my_cbl = 0;
2290 /*
2291 * commit token sent after callback that token target has been set
2292 */
2293}
2294
2295static void memb_state_recovery_enter (
2296 struct totemsrp_instance *instance,
2298{
2299 int i;
2300 int local_received_flg = 1;
2301 unsigned int low_ring_aru;
2302 unsigned int range = 0;
2303 unsigned int messages_originated = 0;
2304 const struct srp_addr *addr;
2305 struct memb_commit_token_memb_entry *memb_list;
2306 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2307
2308 addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2309 memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2310
2312 "entering RECOVERY state.");
2313
2314 instance->orf_token_discard = 0;
2315
2316 instance->my_high_ring_delivered = 0;
2317
2318 sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2319 cs_queue_reinit (&instance->retrans_message_queue);
2320
2321 low_ring_aru = instance->old_ring_state_high_seq_received;
2322
2323 memb_state_commit_token_send_recovery (instance, commit_token);
2324
2325 instance->my_token_seq = SEQNO_START_TOKEN - 1;
2326
2327 /*
2328 * Build regular configuration
2329 */
2331 instance->totemnet_context,
2332 commit_token->addr_entries);
2333
2334 /*
2335 * Build transitional configuration
2336 */
2337 for (i = 0; i < instance->my_new_memb_entries; i++) {
2338 memcpy (&my_new_memb_ring_id_list[i],
2339 &memb_list[i].ring_id,
2340 sizeof (struct memb_ring_id));
2341 }
2342 memb_set_and_with_ring_id (
2343 instance->my_new_memb_list,
2344 my_new_memb_ring_id_list,
2345 instance->my_new_memb_entries,
2346 instance->my_memb_list,
2347 instance->my_memb_entries,
2348 &instance->my_old_ring_id,
2349 instance->my_trans_memb_list,
2350 &instance->my_trans_memb_entries);
2351
2352 for (i = 0; i < instance->my_trans_memb_entries; i++) {
2354 "TRANS [%d] member " CS_PRI_NODE_ID ":", i, instance->my_trans_memb_list[i].nodeid);
2355 }
2356 for (i = 0; i < instance->my_new_memb_entries; i++) {
2358 "position [%d] member " CS_PRI_NODE_ID ":", i, addr[i].nodeid);
2360 "previous ringid (" CS_PRI_RING_ID ")",
2361 memb_list[i].ring_id.rep, (uint64_t)memb_list[i].ring_id.seq);
2362
2364 "aru %x high delivered %x received flag %d",
2365 memb_list[i].aru,
2366 memb_list[i].high_delivered,
2367 memb_list[i].received_flg);
2368
2369 // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2370 }
2371 /*
2372 * Determine if any received flag is false
2373 */
2374 for (i = 0; i < commit_token->addr_entries; i++) {
2375 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2376 instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2377
2378 memb_list[i].received_flg == 0) {
2379 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2380 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2381 sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2382 local_received_flg = 0;
2383 break;
2384 }
2385 }
2386 if (local_received_flg == 1) {
2387 goto no_originate;
2388 } /* Else originate messages if we should */
2389
2390 /*
2391 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2392 */
2393 for (i = 0; i < commit_token->addr_entries; i++) {
2394 if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2395 instance->my_deliver_memb_list,
2396 instance->my_deliver_memb_entries) &&
2397
2398 memcmp (&instance->my_old_ring_id,
2399 &memb_list[i].ring_id,
2400 sizeof (struct memb_ring_id)) == 0) {
2401
2402 if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2403
2404 low_ring_aru = memb_list[i].aru;
2405 }
2406 if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2407 instance->my_high_ring_delivered = memb_list[i].high_delivered;
2408 }
2409 }
2410 }
2411
2412 /*
2413 * Copy all old ring messages to instance->retrans_message_queue
2414 */
2415 range = instance->old_ring_state_high_seq_received - low_ring_aru;
2416 if (range == 0) {
2417 /*
2418 * No messages to copy
2419 */
2420 goto no_originate;
2421 }
2422 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2423
2425 "copying all old ring messages from %x-%x.",
2426 low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2427
2428 for (i = 1; i <= range; i++) {
2431 void *ptr;
2432 int res;
2433
2434 res = sq_item_get (&instance->regular_sort_queue,
2435 low_ring_aru + i, &ptr);
2436 if (res != 0) {
2437 continue;
2438 }
2439 sort_queue_item = ptr;
2440 messages_originated++;
2441 memset (&message_item, 0, sizeof (struct message_item));
2442 // TODO LEAK
2443 message_item.mcast = totemsrp_buffer_alloc (instance);
2444 assert (message_item.mcast);
2445 memset(message_item.mcast, 0, sizeof (struct mcast));
2449 message_item.mcast->system_from = instance->my_id;
2451
2453 assert (message_item.mcast->header.nodeid);
2454 memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2455 sizeof (struct memb_ring_id));
2456 message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2457 memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2460 cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2461 }
2463 "Originated %d messages in RECOVERY.", messages_originated);
2464 goto originated;
2465
2466no_originate:
2468 "Did not need to originate any messages in recovery.");
2469
2470originated:
2471 instance->my_aru = SEQNO_START_MSG;
2472 instance->my_aru_count = 0;
2473 instance->my_seq_unchanged = 0;
2475 instance->my_install_seq = SEQNO_START_MSG;
2476 instance->last_released = SEQNO_START_MSG;
2477
2478 reset_token_timeout (instance); // REVIEWED
2479 reset_token_retransmit_timeout (instance); // REVIEWED
2480
2481 instance->memb_state = MEMB_STATE_RECOVERY;
2482 instance->stats.recovery_entered++;
2483 instance->stats.continuous_gather = 0;
2484
2485 return;
2486}
2487
2488void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2489{
2490 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2491
2492 token_hold_cancel_send (instance);
2493
2494 return;
2495}
2496
2498 void *srp_context,
2499 struct iovec *iovec,
2500 unsigned int iov_len,
2501 int guarantee)
2502{
2503 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2504 int i;
2506 char *addr;
2507 unsigned int addr_idx;
2508 struct cs_queue *queue_use;
2509
2510 if (instance->waiting_trans_ack) {
2511 queue_use = &instance->new_message_queue_trans;
2512 } else {
2513 queue_use = &instance->new_message_queue;
2514 }
2515
2516 if (cs_queue_is_full (queue_use)) {
2517 log_printf (instance->totemsrp_log_level_debug, "queue full");
2518 return (-1);
2519 }
2520
2521 memset (&message_item, 0, sizeof (struct message_item));
2522
2523 /*
2524 * Allocate pending item
2525 */
2526 message_item.mcast = totemsrp_buffer_alloc (instance);
2527 if (message_item.mcast == 0) {
2528 goto error_mcast;
2529 }
2530
2531 /*
2532 * Set mcast header
2533 */
2534 memset(message_item.mcast, 0, sizeof (struct mcast));
2539
2541 assert (message_item.mcast->header.nodeid);
2542
2544 message_item.mcast->system_from = instance->my_id;
2545
2546 addr = (char *)message_item.mcast;
2547 addr_idx = sizeof (struct mcast);
2548 for (i = 0; i < iov_len; i++) {
2549 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2550 addr_idx += iovec[i].iov_len;
2551 }
2552
2553 message_item.msg_len = addr_idx;
2554
2555 log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2556 instance->stats.mcast_tx++;
2557 cs_queue_item_add (queue_use, &message_item);
2558
2559 return (0);
2560
2561error_mcast:
2562 return (-1);
2563}
2564
2565/*
2566 * Determine if there is room to queue a new message
2567 */
2568int totemsrp_avail (void *srp_context)
2569{
2570 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2571 int avail;
2572 struct cs_queue *queue_use;
2573
2574 if (instance->waiting_trans_ack) {
2575 queue_use = &instance->new_message_queue_trans;
2576 } else {
2577 queue_use = &instance->new_message_queue;
2578 }
2579 cs_queue_avail (queue_use, &avail);
2580
2581 return (avail);
2582}
2583
2584/*
2585 * ORF Token Management
2586 */
2587/*
2588 * Recast message to mcast group if it is available
2589 */
2590static int orf_token_remcast (
2591 struct totemsrp_instance *instance,
2592 int seq)
2593{
2595 int res;
2596 void *ptr;
2597
2598 struct sq *sort_queue;
2599
2600 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2601 sort_queue = &instance->recovery_sort_queue;
2602 } else {
2603 sort_queue = &instance->regular_sort_queue;
2604 }
2605
2606 res = sq_in_range (sort_queue, seq);
2607 if (res == 0) {
2608 log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2609 return (-1);
2610 }
2611
2612 /*
2613 * Get RTR item at seq, if not available, return
2614 */
2615 res = sq_item_get (sort_queue, seq, &ptr);
2616 if (res != 0) {
2617 return -1;
2618 }
2619
2620 sort_queue_item = ptr;
2621
2623 instance->totemnet_context,
2626
2627 return (0);
2628}
2629
2630
2631/*
2632 * Free all freeable messages from ring
2633 */
2634static void messages_free (
2635 struct totemsrp_instance *instance,
2636 unsigned int token_aru)
2637{
2638 struct sort_queue_item *regular_message;
2639 unsigned int i;
2640 int res;
2641 int log_release = 0;
2642 unsigned int release_to;
2643 unsigned int range = 0;
2644
2645 release_to = token_aru;
2646 if (sq_lt_compare (instance->my_last_aru, release_to)) {
2647 release_to = instance->my_last_aru;
2648 }
2649 if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2650 release_to = instance->my_high_delivered;
2651 }
2652
2653 /*
2654 * Ensure we dont try release before an already released point
2655 */
2656 if (sq_lt_compare (release_to, instance->last_released)) {
2657 return;
2658 }
2659
2660 range = release_to - instance->last_released;
2661 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2662
2663 /*
2664 * Release retransmit list items if group aru indicates they are transmitted
2665 */
2666 for (i = 1; i <= range; i++) {
2667 void *ptr;
2668
2669 res = sq_item_get (&instance->regular_sort_queue,
2670 instance->last_released + i, &ptr);
2671 if (res == 0) {
2672 regular_message = ptr;
2673 totemsrp_buffer_release (instance, regular_message->mcast);
2674 }
2675 sq_items_release (&instance->regular_sort_queue,
2676 instance->last_released + i);
2677
2678 log_release = 1;
2679 }
2680 instance->last_released += range;
2681
2682 if (log_release) {
2684 "releasing messages up to and including %x", release_to);
2685 }
2686}
2687
2688static void update_aru (
2689 struct totemsrp_instance *instance)
2690{
2691 unsigned int i;
2692 int res;
2693 struct sq *sort_queue;
2694 unsigned int range;
2695 unsigned int my_aru_saved = 0;
2696
2697 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2698 sort_queue = &instance->recovery_sort_queue;
2699 } else {
2700 sort_queue = &instance->regular_sort_queue;
2701 }
2702
2703 range = instance->my_high_seq_received - instance->my_aru;
2704
2705 my_aru_saved = instance->my_aru;
2706 for (i = 1; i <= range; i++) {
2707
2708 void *ptr;
2709
2710 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2711 /*
2712 * If hole, stop updating aru
2713 */
2714 if (res != 0) {
2715 break;
2716 }
2717 }
2718 instance->my_aru += i - 1;
2719}
2720
2721/*
2722 * Multicasts pending messages onto the ring (requires orf_token possession)
2723 */
2724static int orf_token_mcast (
2725 struct totemsrp_instance *instance,
2726 struct orf_token *token,
2727 int fcc_mcasts_allowed)
2728{
2729 struct message_item *message_item = 0;
2730 struct cs_queue *mcast_queue;
2731 struct sq *sort_queue;
2733 struct mcast *mcast;
2734 unsigned int fcc_mcast_current;
2735
2736 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2737 mcast_queue = &instance->retrans_message_queue;
2738 sort_queue = &instance->recovery_sort_queue;
2739 reset_token_retransmit_timeout (instance); // REVIEWED
2740 } else {
2741 if (instance->waiting_trans_ack) {
2742 mcast_queue = &instance->new_message_queue_trans;
2743 } else {
2744 mcast_queue = &instance->new_message_queue;
2745 }
2746
2747 sort_queue = &instance->regular_sort_queue;
2748 }
2749
2750 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2751 if (cs_queue_is_empty (mcast_queue)) {
2752 break;
2753 }
2754 message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2755
2756 message_item->mcast->seq = ++token->seq;
2757 message_item->mcast->this_seqno = instance->global_seqno++;
2758
2759 /*
2760 * Build IO vector
2761 */
2762 memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2765
2767
2768 memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2769
2770 /*
2771 * Add message to retransmit queue
2772 */
2773 sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2774
2776 instance->totemnet_context,
2779
2780 /*
2781 * Delete item from pending queue
2782 */
2783 cs_queue_item_remove (mcast_queue);
2784
2785 /*
2786 * If messages mcasted, deliver any new messages to totempg
2787 */
2788 instance->my_high_seq_received = token->seq;
2789 }
2790
2791 update_aru (instance);
2792
2793 /*
2794 * Return 1 if more messages are available for single node clusters
2795 */
2796 return (fcc_mcast_current);
2797}
2798
2799/*
2800 * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2801 * Modify's orf_token's rtr to include retransmits required by this process
2802 */
2803static int orf_token_rtr (
2804 struct totemsrp_instance *instance,
2805 struct orf_token *orf_token,
2806 unsigned int *fcc_allowed)
2807{
2808 unsigned int res;
2809 unsigned int i, j;
2810 unsigned int found;
2811 struct sq *sort_queue;
2812 struct rtr_item *rtr_list;
2813 unsigned int range = 0;
2814 char retransmit_msg[1024];
2815 char value[64];
2816
2817 if (instance->memb_state == MEMB_STATE_RECOVERY) {
2818 sort_queue = &instance->recovery_sort_queue;
2819 } else {
2820 sort_queue = &instance->regular_sort_queue;
2821 }
2822
2824
2825 strcpy (retransmit_msg, "Retransmit List: ");
2828 "Retransmit List %d", orf_token->rtr_list_entries);
2829 for (i = 0; i < orf_token->rtr_list_entries; i++) {
2830 sprintf (value, "%x ", rtr_list[i].seq);
2831 strcat (retransmit_msg, value);
2832 }
2833 strcat (retransmit_msg, "");
2835 "%s", retransmit_msg);
2836 }
2837
2838 /*
2839 * Retransmit messages on orf_token's RTR list from RTR queue
2840 */
2841 for (instance->fcc_remcast_current = 0, i = 0;
2842 instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2843
2844 /*
2845 * If this retransmit request isn't from this configuration,
2846 * try next rtr entry
2847 */
2848 if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2849 sizeof (struct memb_ring_id)) != 0) {
2850
2851 i += 1;
2852 continue;
2853 }
2854
2855 res = orf_token_remcast (instance, rtr_list[i].seq);
2856 if (res == 0) {
2857 /*
2858 * Multicasted message, so no need to copy to new retransmit list
2859 */
2861 assert (orf_token->rtr_list_entries >= 0);
2862 memmove (&rtr_list[i], &rtr_list[i + 1],
2863 sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2864
2865 instance->stats.mcast_retx++;
2866 instance->fcc_remcast_current++;
2867 } else {
2868 i += 1;
2869 }
2870 }
2871 *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2872
2873 /*
2874 * Add messages to retransmit to RTR list
2875 * but only retry if there is room in the retransmit list
2876 */
2877
2878 range = orf_token->seq - instance->my_aru;
2879 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2880
2882 (i <= range); i++) {
2883
2884 /*
2885 * Ensure message is within the sort queue range
2886 */
2887 res = sq_in_range (sort_queue, instance->my_aru + i);
2888 if (res == 0) {
2889 break;
2890 }
2891
2892 /*
2893 * Find if a message is missing from this processor
2894 */
2895 res = sq_item_inuse (sort_queue, instance->my_aru + i);
2896 if (res == 0) {
2897 /*
2898 * Determine how many times we have missed receiving
2899 * this sequence number. sq_item_miss_count increments
2900 * a counter for the sequence number. The miss count
2901 * will be returned and compared. This allows time for
2902 * delayed multicast messages to be received before
2903 * declaring the message is missing and requesting a
2904 * retransmit.
2905 */
2906 res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2907 if (res < instance->totem_config->miss_count_const) {
2908 continue;
2909 }
2910
2911 /*
2912 * Determine if missing message is already in retransmit list
2913 */
2914 found = 0;
2915 for (j = 0; j < orf_token->rtr_list_entries; j++) {
2916 if (instance->my_aru + i == rtr_list[j].seq) {
2917 found = 1;
2918 }
2919 }
2920 if (found == 0) {
2921 /*
2922 * Missing message not found in current retransmit list so add it
2923 */
2924 memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2925 &instance->my_ring_id, sizeof (struct memb_ring_id));
2926 rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2928 }
2929 }
2930 }
2931 return (instance->fcc_remcast_current);
2932}
2933
2934static void token_retransmit (struct totemsrp_instance *instance)
2935{
2936 instance->stats.orf_token_tx++;
2938 instance->orf_token_retransmit,
2939 instance->orf_token_retransmit_size);
2940}
2941
2942/*
2943 * Retransmit the regular token if no mcast or token has
2944 * been received in retransmit token period retransmit
2945 * the token to the next processor
2946 */
2947static void timer_function_token_retransmit_timeout (void *data)
2948{
2949 struct totemsrp_instance *instance = data;
2950
2951 switch (instance->memb_state) {
2952 case MEMB_STATE_GATHER:
2953 break;
2954 case MEMB_STATE_COMMIT:
2957 token_retransmit (instance);
2958 reset_token_retransmit_timeout (instance); // REVIEWED
2959 break;
2960 }
2961}
2962
2963static void timer_function_token_hold_retransmit_timeout (void *data)
2964{
2965 struct totemsrp_instance *instance = data;
2966
2967 switch (instance->memb_state) {
2968 case MEMB_STATE_GATHER:
2969 break;
2970 case MEMB_STATE_COMMIT:
2971 break;
2974 token_retransmit (instance);
2975 break;
2976 }
2977}
2978
2979static void timer_function_merge_detect_timeout(void *data)
2980{
2981 struct totemsrp_instance *instance = data;
2982
2984
2985 switch (instance->memb_state) {
2987 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2988 memb_merge_detect_transmit (instance);
2989 }
2990 break;
2991 case MEMB_STATE_GATHER:
2992 case MEMB_STATE_COMMIT:
2994 break;
2995 }
2996}
2997
2998/*
2999 * Send orf_token to next member (requires orf_token)
3000 */
3001static int token_send (
3002 struct totemsrp_instance *instance,
3003 struct orf_token *orf_token,
3004 int forward_token)
3005{
3006 int res = 0;
3007 unsigned int orf_token_size;
3008
3009 orf_token_size = sizeof (struct orf_token) +
3010 (orf_token->rtr_list_entries * sizeof (struct rtr_item));
3011
3012 orf_token->header.nodeid = instance->my_id.nodeid;
3013 memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3014 instance->orf_token_retransmit_size = orf_token_size;
3015 assert (orf_token->header.nodeid);
3016
3017 if (forward_token == 0) {
3018 return (0);
3019 }
3020
3021 instance->stats.orf_token_tx++;
3023 orf_token,
3024 orf_token_size);
3025
3026 return (res);
3027}
3028
3029static int token_hold_cancel_send (struct totemsrp_instance *instance)
3030{
3032
3033 /*
3034 * Only cancel if the token is currently held
3035 */
3036 if (instance->my_token_held == 0) {
3037 return (0);
3038 }
3039 instance->my_token_held = 0;
3040
3041 /*
3042 * Build message
3043 */
3049 memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3050 sizeof (struct memb_ring_id));
3052
3053 instance->stats.token_hold_cancel_tx++;
3054
3056 sizeof (struct token_hold_cancel));
3057
3058 return (0);
3059}
3060
3061static int orf_token_send_initial (struct totemsrp_instance *instance)
3062{
3063 struct orf_token orf_token;
3064 int res;
3065
3070 orf_token.header.nodeid = instance->my_id.nodeid;
3071 assert (orf_token.header.nodeid);
3075 instance->my_set_retrans_flg = 1;
3076
3077 if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3079 instance->my_set_retrans_flg = 0;
3080 } else {
3082 instance->my_set_retrans_flg = 1;
3083 }
3084
3085 orf_token.aru = 0;
3087 orf_token.aru_addr = instance->my_id.nodeid;
3088
3089 memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3090 orf_token.fcc = 0;
3091 orf_token.backlog = 0;
3092
3094
3095 res = token_send (instance, &orf_token, 1);
3096
3097 return (res);
3098}
3099
3100static void memb_state_commit_token_update (
3101 struct totemsrp_instance *instance)
3102{
3103 struct srp_addr *addr;
3104 struct memb_commit_token_memb_entry *memb_list;
3105 unsigned int high_aru;
3106 unsigned int i;
3107
3108 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3109 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3110
3111 memcpy (instance->my_new_memb_list, addr,
3112 sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3113
3114 instance->my_new_memb_entries = instance->commit_token->addr_entries;
3115
3116 memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3117 &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3118
3119 memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3120 /*
3121 * TODO high delivered is really instance->my_aru, but with safe this
3122 * could change?
3123 */
3124 instance->my_received_flg =
3125 (instance->my_aru == instance->my_high_seq_received);
3126
3127 memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3128
3129 memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3130 /*
3131 * find high aru up to current memb_index for all matching ring ids
3132 * if any ring id matching memb_index has aru less then high aru set
3133 * received flag for that entry to false
3134 */
3135 high_aru = memb_list[instance->commit_token->memb_index].aru;
3136 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3137 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3138 &memb_list[i].ring_id,
3139 sizeof (struct memb_ring_id)) == 0) {
3140
3141 if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3142 high_aru = memb_list[i].aru;
3143 }
3144 }
3145 }
3146
3147 for (i = 0; i <= instance->commit_token->memb_index; i++) {
3148 if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3149 &memb_list[i].ring_id,
3150 sizeof (struct memb_ring_id)) == 0) {
3151
3152 if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3153 memb_list[i].received_flg = 0;
3154 if (i == instance->commit_token->memb_index) {
3155 instance->my_received_flg = 0;
3156 }
3157 }
3158 }
3159 }
3160
3161 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3162 instance->commit_token->memb_index += 1;
3163 assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3164 assert (instance->commit_token->header.nodeid);
3165}
3166
3167static void memb_state_commit_token_target_set (
3168 struct totemsrp_instance *instance)
3169{
3170 struct srp_addr *addr;
3171
3172 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3173
3174 /* Totemnet just looks at the node id */
3176 instance->totemnet_context,
3177 addr[instance->commit_token->memb_index %
3178 instance->commit_token->addr_entries].nodeid);
3179}
3180
3181static int memb_state_commit_token_send_recovery (
3182 struct totemsrp_instance *instance,
3183 struct memb_commit_token *commit_token)
3184{
3185 unsigned int commit_token_size;
3186
3187 commit_token->token_seq++;
3188 commit_token->header.nodeid = instance->my_id.nodeid;
3189 commit_token_size = sizeof (struct memb_commit_token) +
3190 ((sizeof (struct srp_addr) +
3191 sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3192 /*
3193 * Make a copy for retransmission if necessary
3194 */
3195 memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3196 instance->orf_token_retransmit_size = commit_token_size;
3197
3198 instance->stats.memb_commit_token_tx++;
3199
3201 commit_token,
3202 commit_token_size);
3203
3204 /*
3205 * Request retransmission of the commit token in case it is lost
3206 */
3207 reset_token_retransmit_timeout (instance);
3208 return (0);
3209}
3210
3211static int memb_state_commit_token_send (
3212 struct totemsrp_instance *instance)
3213{
3214 unsigned int commit_token_size;
3215
3216 instance->commit_token->token_seq++;
3217 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3218 commit_token_size = sizeof (struct memb_commit_token) +
3219 ((sizeof (struct srp_addr) +
3220 sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3221 /*
3222 * Make a copy for retransmission if necessary
3223 */
3224 memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3225 instance->orf_token_retransmit_size = commit_token_size;
3226
3227 instance->stats.memb_commit_token_tx++;
3228
3230 instance->commit_token,
3231 commit_token_size);
3232
3233 /*
3234 * Request retransmission of the commit token in case it is lost
3235 */
3236 reset_token_retransmit_timeout (instance);
3237 return (0);
3238}
3239
3240
3241static int memb_lowest_in_config (struct totemsrp_instance *instance)
3242{
3243 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3244 int token_memb_entries = 0;
3245 int i;
3246 unsigned int lowest_nodeid;
3247
3248 memb_set_subtract (token_memb, &token_memb_entries,
3249 instance->my_proc_list, instance->my_proc_list_entries,
3250 instance->my_failed_list, instance->my_failed_list_entries);
3251
3252 /*
3253 * find representative by searching for smallest identifier
3254 */
3255 assert(token_memb_entries > 0);
3256
3257 lowest_nodeid = token_memb[0].nodeid;
3258 for (i = 1; i < token_memb_entries; i++) {
3259 if (lowest_nodeid > token_memb[i].nodeid) {
3260 lowest_nodeid = token_memb[i].nodeid;
3261 }
3262 }
3263 return (lowest_nodeid == instance->my_id.nodeid);
3264}
3265
3266static int srp_addr_compare (const void *a, const void *b)
3267{
3268 const struct srp_addr *srp_a = (const struct srp_addr *)a;
3269 const struct srp_addr *srp_b = (const struct srp_addr *)b;
3270
3271 if (srp_a->nodeid < srp_b->nodeid) {
3272 return -1;
3273 } else if (srp_a->nodeid > srp_b->nodeid) {
3274 return 1;
3275 } else {
3276 return 0;
3277 }
3278}
3279
3280static void memb_state_commit_token_create (
3281 struct totemsrp_instance *instance)
3282{
3283 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3284 struct srp_addr *addr;
3285 struct memb_commit_token_memb_entry *memb_list;
3286 int token_memb_entries = 0;
3287
3289 "Creating commit token because I am the rep.");
3290
3291 memb_set_subtract (token_memb, &token_memb_entries,
3292 instance->my_proc_list, instance->my_proc_list_entries,
3293 instance->my_failed_list, instance->my_failed_list_entries);
3294
3295 memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3299 instance->commit_token->header.encapsulated = 0;
3300 instance->commit_token->header.nodeid = instance->my_id.nodeid;
3301 assert (instance->commit_token->header.nodeid);
3302
3303 instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3304 instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3305
3306 /*
3307 * This qsort is necessary to ensure the commit token traverses
3308 * the ring in the proper order
3309 */
3310 qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3311 srp_addr_compare);
3312
3313 instance->commit_token->memb_index = 0;
3314 instance->commit_token->addr_entries = token_memb_entries;
3315
3316 addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3317 memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3318
3319 memcpy (addr, token_memb,
3320 token_memb_entries * sizeof (struct srp_addr));
3321 memset (memb_list, 0,
3322 sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3323}
3324
3325static void memb_join_message_send (struct totemsrp_instance *instance)
3326{
3327 char memb_join_data[40000];
3328 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3329 char *addr;
3330 unsigned int addr_idx;
3331 size_t msg_len;
3332
3337 memb_join->header.nodeid = instance->my_id.nodeid;
3338 assert (memb_join->header.nodeid);
3339
3340 msg_len = sizeof(struct memb_join) +
3341 ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3342
3343 if (msg_len > sizeof(memb_join_data)) {
3345 "memb_join_message too long. Ignoring message.");
3346
3347 return ;
3348 }
3349
3350 memb_join->ring_seq = instance->my_ring_id.seq;
3353 memb_join->system_from = instance->my_id;
3354
3355 /*
3356 * This mess adds the joined and failed processor lists into the join
3357 * message
3358 */
3359 addr = (char *)memb_join;
3360 addr_idx = sizeof (struct memb_join);
3361 memcpy (&addr[addr_idx],
3362 instance->my_proc_list,
3363 instance->my_proc_list_entries *
3364 sizeof (struct srp_addr));
3365 addr_idx +=
3366 instance->my_proc_list_entries *
3367 sizeof (struct srp_addr);
3368 memcpy (&addr[addr_idx],
3369 instance->my_failed_list,
3370 instance->my_failed_list_entries *
3371 sizeof (struct srp_addr));
3372 addr_idx +=
3373 instance->my_failed_list_entries *
3374 sizeof (struct srp_addr);
3375
3376 if (instance->totem_config->send_join_timeout) {
3377 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3378 }
3379
3380 instance->stats.memb_join_tx++;
3381
3383 instance->totemnet_context,
3384 memb_join,
3385 addr_idx);
3386}
3387
3388static void memb_leave_message_send (struct totemsrp_instance *instance)
3389{
3390 char memb_join_data[40000];
3391 struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3392 char *addr;
3393 unsigned int addr_idx;
3394 int active_memb_entries;
3395 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3396 size_t msg_len;
3397
3399 "sending join/leave message");
3400
3401 /*
3402 * add us to the failed list, and remove us from
3403 * the members list
3404 */
3405 memb_set_merge(
3406 &instance->my_id, 1,
3407 instance->my_failed_list, &instance->my_failed_list_entries);
3408
3409 memb_set_subtract (active_memb, &active_memb_entries,
3410 instance->my_proc_list, instance->my_proc_list_entries,
3411 &instance->my_id, 1);
3412
3413 msg_len = sizeof(struct memb_join) +
3414 ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3415
3416 if (msg_len > sizeof(memb_join_data)) {
3418 "memb_leave message too long. Ignoring message.");
3419
3420 return ;
3421 }
3422
3428
3429 memb_join->ring_seq = instance->my_ring_id.seq;
3430 memb_join->proc_list_entries = active_memb_entries;
3432 memb_join->system_from = instance->my_id;
3433
3434 // TODO: CC Maybe use the actual join send routine.
3435 /*
3436 * This mess adds the joined and failed processor lists into the join
3437 * message
3438 */
3439 addr = (char *)memb_join;
3440 addr_idx = sizeof (struct memb_join);
3441 memcpy (&addr[addr_idx],
3442 active_memb,
3443 active_memb_entries *
3444 sizeof (struct srp_addr));
3445 addr_idx +=
3446 active_memb_entries *
3447 sizeof (struct srp_addr);
3448 memcpy (&addr[addr_idx],
3449 instance->my_failed_list,
3450 instance->my_failed_list_entries *
3451 sizeof (struct srp_addr));
3452 addr_idx +=
3453 instance->my_failed_list_entries *
3454 sizeof (struct srp_addr);
3455
3456
3457 if (instance->totem_config->send_join_timeout) {
3458 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3459 }
3460 instance->stats.memb_join_tx++;
3461
3463 instance->totemnet_context,
3464 memb_join,
3465 addr_idx);
3466}
3467
3468static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3469{
3471
3478 memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3479 sizeof (struct memb_ring_id));
3481
3482 instance->stats.memb_merge_detect_tx++;
3485 sizeof (struct memb_merge_detect));
3486}
3487
3488static void memb_ring_id_set (
3489 struct totemsrp_instance *instance,
3490 const struct memb_ring_id *ring_id)
3491{
3492
3493 memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3494}
3495
3497 void *srp_context,
3498 void **handle_out,
3500 int delete,
3501 int (*callback_fn) (enum totem_callback_token_type type, const void *),
3502 const void *data)
3503{
3504 struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3505 struct token_callback_instance *callback_handle;
3506
3507 token_hold_cancel_send (instance);
3508
3509 callback_handle = malloc (sizeof (struct token_callback_instance));
3510 if (callback_handle == 0) {
3511 return (-1);
3512 }
3513 *handle_out = (void *)callback_handle;
3514 qb_list_init (&callback_handle->list);
3515 callback_handle->callback_fn = callback_fn;
3516 callback_handle->data = (void *) data;
3517 callback_handle->callback_type = type;
3518 callback_handle->delete = delete;
3519 switch (type) {
3521 qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3522 break;
3524 qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3525 break;
3526 }
3527
3528 return (0);
3529}
3530
3531void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3532{
3533 struct token_callback_instance *h;
3534
3535 if (*handle_out) {
3536 h = (struct token_callback_instance *)*handle_out;
3537 qb_list_del (&h->list);
3538 free (h);
3539 h = NULL;
3540 *handle_out = 0;
3541 }
3542}
3543
3544static void token_callbacks_execute (
3545 struct totemsrp_instance *instance,
3547{
3548 struct qb_list_head *list, *tmp_iter;
3549 struct qb_list_head *callback_listhead = 0;
3551 int res;
3552 int del;
3553
3554 switch (type) {
3556 callback_listhead = &instance->token_callback_received_listhead;
3557 break;
3559 callback_listhead = &instance->token_callback_sent_listhead;
3560 break;
3561 default:
3562 assert (0);
3563 }
3564
3565 qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3566 token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3568 if (del == 1) {
3569 qb_list_del (list);
3570 }
3571
3575 /*
3576 * This callback failed to execute, try it again on the next token
3577 */
3578 if (res == -1 && del == 1) {
3579 qb_list_add (list, callback_listhead);
3580 } else if (del) {
3582 }
3583 }
3584}
3585
3586/*
3587 * Flow control functions
3588 */
3589static unsigned int backlog_get (struct totemsrp_instance *instance)
3590{
3591 unsigned int backlog = 0;
3592 struct cs_queue *queue_use = NULL;
3593
3594 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3595 if (instance->waiting_trans_ack) {
3596 queue_use = &instance->new_message_queue_trans;
3597 } else {
3598 queue_use = &instance->new_message_queue;
3599 }
3600 } else
3601 if (instance->memb_state == MEMB_STATE_RECOVERY) {
3602 queue_use = &instance->retrans_message_queue;
3603 }
3604
3605 if (queue_use != NULL) {
3606 backlog = cs_queue_used (queue_use);
3607 }
3608
3609 instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3610 return (backlog);
3611}
3612
3613static int fcc_calculate (
3614 struct totemsrp_instance *instance,
3615 struct orf_token *token)
3616{
3617 unsigned int transmits_allowed;
3618 unsigned int backlog_calc;
3619
3620 transmits_allowed = instance->totem_config->max_messages;
3621
3622 if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3623 transmits_allowed = instance->totem_config->window_size - token->fcc;
3624 }
3625
3626 instance->my_cbl = backlog_get (instance);
3627
3628 /*
3629 * Only do backlog calculation if there is a backlog otherwise
3630 * we would result in div by zero
3631 */
3632 if (token->backlog + instance->my_cbl - instance->my_pbl) {
3633 backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3634 (token->backlog + instance->my_cbl - instance->my_pbl);
3635 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3636 transmits_allowed = backlog_calc;
3637 }
3638 }
3639
3640 return (transmits_allowed);
3641}
3642
3643/*
3644 * don't overflow the RTR sort queue
3645 */
3646static void fcc_rtr_limit (
3647 struct totemsrp_instance *instance,
3648 struct orf_token *token,
3649 unsigned int *transmits_allowed)
3650{
3651 int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3652 check -= (*transmits_allowed + instance->totem_config->window_size);
3653 assert (check >= 0);
3654 if (sq_lt_compare (instance->last_released +
3655 QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3656 instance->totem_config->window_size,
3657
3658 token->seq)) {
3659
3660 *transmits_allowed = 0;
3661 }
3662}
3663
3664static void fcc_token_update (
3665 struct totemsrp_instance *instance,
3666 struct orf_token *token,
3667 unsigned int msgs_transmitted)
3668{
3669 token->fcc += msgs_transmitted - instance->my_trc;
3670 token->backlog += instance->my_cbl - instance->my_pbl;
3671 instance->my_trc = msgs_transmitted;
3672 instance->my_pbl = instance->my_cbl;
3673}
3674
3675/*
3676 * Sanity checkers
3677 */
3678static int check_orf_token_sanity(
3679 const struct totemsrp_instance *instance,
3680 const void *msg,
3681 size_t msg_len,
3682 size_t max_msg_len,
3683 int endian_conversion_needed)
3684{
3685 int rtr_entries;
3686 const struct orf_token *token = (const struct orf_token *)msg;
3687 size_t required_len;
3688
3689 if (msg_len > max_msg_len) {
3691 "Received orf_token message is too long... ignoring.");
3692
3693 return (-1);
3694 }
3695
3696 if (msg_len < sizeof(struct orf_token)) {
3698 "Received orf_token message is too short... ignoring.");
3699
3700 return (-1);
3701 }
3702
3703 if (endian_conversion_needed) {
3704 rtr_entries = swab32(token->rtr_list_entries);
3705 } else {
3706 rtr_entries = token->rtr_list_entries;
3707 }
3708
3709 if (rtr_entries > RETRANSMIT_ENTRIES_MAX) {
3711 "Received orf_token message rtr_entries is corrupted... ignoring.");
3712
3713 return (-1);
3714 }
3715
3716 required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3717 if (msg_len < required_len) {
3719 "Received orf_token message is too short... ignoring.");
3720
3721 return (-1);
3722 }
3723
3724 return (0);
3725}
3726
3727static int check_mcast_sanity(
3728 struct totemsrp_instance *instance,
3729 const void *msg,
3730 size_t msg_len,
3731 int endian_conversion_needed)
3732{
3733
3734 if (msg_len < sizeof(struct mcast)) {
3736 "Received mcast message is too short... ignoring.");
3737
3738 return (-1);
3739 }
3740
3741 return (0);
3742}
3743
3744static int check_memb_merge_detect_sanity(
3745 struct totemsrp_instance *instance,
3746 const void *msg,
3747 size_t msg_len,
3748 int endian_conversion_needed)
3749{
3750
3751 if (msg_len < sizeof(struct memb_merge_detect)) {
3753 "Received memb_merge_detect message is too short... ignoring.");
3754
3755 return (-1);
3756 }
3757
3758 return (0);
3759}
3760
3761static int check_memb_join_sanity(
3762 struct totemsrp_instance *instance,
3763 const void *msg,
3764 size_t msg_len,
3765 int endian_conversion_needed)
3766{
3767 const struct memb_join *mj_msg = (const struct memb_join *)msg;
3768 unsigned int proc_list_entries;
3769 unsigned int failed_list_entries;
3770 size_t required_len;
3771
3772 if (msg_len < sizeof(struct memb_join)) {
3774 "Received memb_join message is too short... ignoring.");
3775
3776 return (-1);
3777 }
3778
3781
3782 if (endian_conversion_needed) {
3785 }
3786
3790 "Received memb_join message list_entries exceeds the maximum "
3791 "allowed value... ignoring.");
3792
3793 return (-1);
3794 }
3795
3796 required_len = sizeof(struct memb_join) +
3797 (((size_t)proc_list_entries + (size_t)failed_list_entries) * sizeof(struct srp_addr));
3798 if (msg_len < required_len) {
3800 "Received memb_join message is too short... ignoring.");
3801
3802 return (-1);
3803 }
3804
3805 return (0);
3806}
3807
3808static int check_memb_commit_token_sanity(
3809 struct totemsrp_instance *instance,
3810 const void *msg,
3811 size_t msg_len,
3812 int endian_conversion_needed)
3813{
3814 const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3815 unsigned int addr_entries;
3816 size_t required_len;
3817
3818 if (msg_len < sizeof(struct memb_commit_token)) {
3820 "Received memb_commit_token message is too short... ignoring.");
3821
3822 return (-1);
3823 }
3824
3825 addr_entries = mct_msg->addr_entries;
3826 if (endian_conversion_needed) {
3828 }
3829
3830 required_len = sizeof(struct memb_commit_token) +
3831 (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3832 if (msg_len < required_len) {
3834 "Received memb_commit_token message is too short... ignoring.");
3835
3836 return (-1);
3837 }
3838
3839 return (0);
3840}
3841
3842static int check_token_hold_cancel_sanity(
3843 struct totemsrp_instance *instance,
3844 const void *msg,
3845 size_t msg_len,
3846 int endian_conversion_needed)
3847{
3848
3849 if (msg_len < sizeof(struct token_hold_cancel)) {
3851 "Received token_hold_cancel message is too short... ignoring.");
3852
3853 return (-1);
3854 }
3855
3856 return (0);
3857}
3858
3859/*
3860 * Message Handlers
3861 */
3862
3863#ifdef GIVEINFO
3864uint64_t tv_old;
3865#endif
3866/*
3867 * message handler called when TOKEN message type received
3868 */
3869static int message_handler_orf_token (
3870 struct totemsrp_instance *instance,
3871 const void *msg,
3872 size_t msg_len,
3873 int endian_conversion_needed)
3874{
3875 char token_storage[1500];
3876 char token_convert[1500];
3877 struct orf_token *token = NULL;
3878 int forward_token;
3879 unsigned int transmits_allowed;
3880 unsigned int mcasted_retransmit;
3881 unsigned int mcasted_regular;
3882 unsigned int last_aru;
3883
3884#ifdef GIVEINFO
3885 uint64_t tv_current;
3886 uint64_t tv_diff;
3887
3888 tv_current = qb_util_nano_current_get ();
3889 tv_diff = tv_current - tv_old;
3890 tv_old = tv_current;
3891
3893 "Time since last token %0.4f ms", tv_diff / (float)QB_TIME_NS_IN_MSEC);
3894#endif
3895
3896 if (check_orf_token_sanity(instance, msg, msg_len, sizeof(token_storage),
3897 endian_conversion_needed) == -1) {
3898 return (0);
3899 }
3900
3901 if (instance->orf_token_discard) {
3902 return (0);
3903 }
3904#ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3905 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3906 return (0);
3907 }
3908#endif
3909
3910 if (endian_conversion_needed) {
3911 orf_token_endian_convert ((struct orf_token *)msg,
3912 (struct orf_token *)token_convert);
3913 msg = (struct orf_token *)token_convert;
3914 }
3915
3916 /*
3917 * Make copy of token and retransmit list in case we have
3918 * to flush incoming messages from the kernel queue
3919 */
3920 token = (struct orf_token *)token_storage;
3921 memcpy (token, msg, sizeof (struct orf_token));
3922 memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3923 sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3924
3925
3926 /*
3927 * Handle merge detection timeout
3928 */
3929 if (token->seq == instance->my_last_seq) {
3930 start_merge_detect_timeout (instance);
3931 instance->my_seq_unchanged += 1;
3932 } else {
3933 cancel_merge_detect_timeout (instance);
3934 cancel_token_hold_retransmit_timeout (instance);
3935 instance->my_seq_unchanged = 0;
3936 }
3937
3938 instance->my_last_seq = token->seq;
3939
3940#ifdef TEST_RECOVERY_MSG_COUNT
3941 if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3942 return (0);
3943 }
3944#endif
3945 instance->flushing = 1;
3947 instance->flushing = 0;
3948
3949 /*
3950 * Determine if we should hold (in reality drop) the token
3951 */
3952 instance->my_token_held = 0;
3953 if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3954 instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3955 instance->my_token_held = 1;
3956 } else {
3957 if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3958 instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3959 instance->my_token_held = 1;
3960 }
3961 }
3962
3963 /*
3964 * Hold onto token when there is no activity on ring and
3965 * this processor is the ring rep
3966 */
3967 forward_token = 1;
3968 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3969 if (instance->my_token_held) {
3970 forward_token = 0;
3971 }
3972 }
3973
3974 switch (instance->memb_state) {
3975 case MEMB_STATE_COMMIT:
3976 /* Discard token */
3977 break;
3978
3980 messages_free (instance, token->aru);
3981 /*
3982 * Do NOT add break, this case should also execute code in gather case.
3983 */
3984
3985 case MEMB_STATE_GATHER:
3986 /*
3987 * DO NOT add break, we use different free mechanism in recovery state
3988 */
3989
3991 /*
3992 * Discard tokens from another configuration
3993 */
3994 if (memcmp (&token->ring_id, &instance->my_ring_id,
3995 sizeof (struct memb_ring_id)) != 0) {
3996
3997 if ((forward_token)
3998 && instance->use_heartbeat) {
3999 reset_heartbeat_timeout(instance);
4000 }
4001 else {
4002 cancel_heartbeat_timeout(instance);
4003 }
4004
4005 return (0); /* discard token */
4006 }
4007
4008 /*
4009 * Discard retransmitted tokens
4010 */
4011 if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4012 return (0); /* discard token */
4013 }
4014
4015 /*
4016 * Token is valid so trigger callbacks
4017 */
4018 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4019
4020 last_aru = instance->my_last_aru;
4021 instance->my_last_aru = token->aru;
4022
4023 transmits_allowed = fcc_calculate (instance, token);
4024 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4025
4027 instance->my_token_held == 1 &&
4028 (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4029 instance->my_token_held = 0;
4030 forward_token = 1;
4031 }
4032
4033 fcc_rtr_limit (instance, token, &transmits_allowed);
4034 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4035/*
4036if (mcasted_regular) {
4037printf ("mcasted regular %d\n", mcasted_regular);
4038printf ("token seq %d\n", token->seq);
4039}
4040*/
4041 fcc_token_update (instance, token, mcasted_retransmit +
4042 mcasted_regular);
4043
4044 if (sq_lt_compare (instance->my_aru, token->aru) ||
4045 instance->my_id.nodeid == token->aru_addr ||
4046 token->aru_addr == 0) {
4047
4048 token->aru = instance->my_aru;
4049 if (token->aru == token->seq) {
4050 token->aru_addr = 0;
4051 } else {
4052 token->aru_addr = instance->my_id.nodeid;
4053 }
4054 }
4055 if (token->aru == last_aru && token->aru_addr != 0) {
4056 instance->my_aru_count += 1;
4057 } else {
4058 instance->my_aru_count = 0;
4059 }
4060
4061 /*
4062 * We really don't follow specification there. In specification, OTHER nodes
4063 * detect failure of one node (based on aru_count) and my_id IS NEVER added
4064 * to failed list (so node never mark itself as failed)
4065 */
4066 if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4067 token->aru_addr == instance->my_id.nodeid) {
4068
4070 "FAILED TO RECEIVE");
4071
4072 instance->failed_to_recv = 1;
4073
4074 memb_set_merge (&instance->my_id, 1,
4075 instance->my_failed_list,
4076 &instance->my_failed_list_entries);
4077
4078 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4079 } else {
4080 instance->my_token_seq = token->token_seq;
4081 token->token_seq += 1;
4082
4083 if (instance->memb_state == MEMB_STATE_RECOVERY) {
4084 /*
4085 * instance->my_aru == instance->my_high_seq_received means this processor
4086 * has recovered all messages it can recover
4087 * (ie: its retrans queue is empty)
4088 */
4089 if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4090
4091 if (token->retrans_flg == 0) {
4092 token->retrans_flg = 1;
4093 instance->my_set_retrans_flg = 1;
4094 }
4095 } else
4096 if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4097 token->retrans_flg = 0;
4098 instance->my_set_retrans_flg = 0;
4099 }
4101 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4102 token->retrans_flg, instance->my_set_retrans_flg,
4103 cs_queue_is_empty (&instance->retrans_message_queue),
4104 instance->my_retrans_flg_count, token->aru);
4105 if (token->retrans_flg == 0) {
4106 instance->my_retrans_flg_count += 1;
4107 } else {
4108 instance->my_retrans_flg_count = 0;
4109 }
4110 if (instance->my_retrans_flg_count == 2) {
4111 instance->my_install_seq = token->seq;
4112 }
4114 "install seq %x aru %x high seq received %x",
4115 instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4116 if (instance->my_retrans_flg_count >= 2 &&
4117 instance->my_received_flg == 0 &&
4118 sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4119 instance->my_received_flg = 1;
4120 instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4121 memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4122 sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4123 }
4124 if (instance->my_retrans_flg_count >= 3 &&
4125 sq_lte_compare (instance->my_install_seq, token->aru)) {
4126 instance->my_rotation_counter += 1;
4127 } else {
4128 instance->my_rotation_counter = 0;
4129 }
4130 if (instance->my_rotation_counter == 2) {
4132 "retrans flag count %x token aru %x install seq %x aru %x %x",
4133 instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4134 instance->my_aru, token->seq);
4135
4136 memb_state_operational_enter (instance);
4137 instance->my_rotation_counter = 0;
4138 instance->my_retrans_flg_count = 0;
4139 }
4140 }
4141
4143 token_send (instance, token, forward_token);
4144
4145#ifdef GIVEINFO
4146 tv_current = qb_util_nano_current_get ();
4147 tv_diff = tv_current - tv_old;
4148 tv_old = tv_current;
4150 "I held %0.4f ms",
4151 tv_diff / (float)QB_TIME_NS_IN_MSEC);
4152#endif
4153 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4154 messages_deliver_to_app (instance, 0,
4155 instance->my_high_seq_received);
4156 }
4157
4158 /*
4159 * Deliver messages after token has been transmitted
4160 * to improve performance
4161 */
4162 reset_token_timeout (instance); // REVIEWED
4163 reset_token_retransmit_timeout (instance); // REVIEWED
4164 if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4165 instance->my_token_held == 1) {
4166
4167 start_token_hold_retransmit_timeout (instance);
4168 }
4169
4170 token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4171 }
4172 break;
4173 }
4174
4175 if ((forward_token)
4176 && instance->use_heartbeat) {
4177 reset_heartbeat_timeout(instance);
4178 }
4179 else {
4180 cancel_heartbeat_timeout(instance);
4181 }
4182
4183 return (0);
4184}
4185
4186static void messages_deliver_to_app (
4187 struct totemsrp_instance *instance,
4188 int skip,
4189 unsigned int end_point)
4190{
4191 struct sort_queue_item *sort_queue_item_p;
4192 unsigned int i;
4193 int res;
4194 struct mcast *mcast_in;
4195 struct mcast mcast_header;
4196 unsigned int range = 0;
4197 int endian_conversion_required;
4198 unsigned int my_high_delivered_stored = 0;
4199 struct srp_addr aligned_system_from;
4200
4201 range = end_point - instance->my_high_delivered;
4202
4203 if (range) {
4205 "Delivering %x to %x", instance->my_high_delivered,
4206 end_point);
4207 }
4208 assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4209 my_high_delivered_stored = instance->my_high_delivered;
4210
4211 /*
4212 * Deliver messages in order from rtr queue to pending delivery queue
4213 */
4214 for (i = 1; i <= range; i++) {
4215
4216 void *ptr = 0;
4217
4218 /*
4219 * If out of range of sort queue, stop assembly
4220 */
4221 res = sq_in_range (&instance->regular_sort_queue,
4222 my_high_delivered_stored + i);
4223 if (res == 0) {
4224 break;
4225 }
4226
4227 res = sq_item_get (&instance->regular_sort_queue,
4228 my_high_delivered_stored + i, &ptr);
4229 /*
4230 * If hole, stop assembly
4231 */
4232 if (res != 0 && skip == 0) {
4233 break;
4234 }
4235
4236 instance->my_high_delivered = my_high_delivered_stored + i;
4237
4238 if (res != 0) {
4239 continue;
4240
4241 }
4242
4243 sort_queue_item_p = ptr;
4244
4245 mcast_in = sort_queue_item_p->mcast;
4246 assert (mcast_in != (struct mcast *)0xdeadbeef);
4247
4248 endian_conversion_required = 0;
4249 if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4250 endian_conversion_required = 1;
4251 mcast_endian_convert (mcast_in, &mcast_header);
4252 } else {
4253 memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4254 }
4255
4256 aligned_system_from = mcast_header.system_from;
4257
4258 /*
4259 * Skip messages not originated in instance->my_deliver_memb
4260 */
4261 if (skip &&
4262 memb_set_subset (&aligned_system_from,
4263 1,
4264 instance->my_deliver_memb_list,
4265 instance->my_deliver_memb_entries) == 0) {
4266
4267 instance->my_high_delivered = my_high_delivered_stored + i;
4268
4269 continue;
4270 }
4271
4272 /*
4273 * Message found
4274 */
4276 "Delivering MCAST message with seq %x to pending delivery queue",
4277 mcast_header.seq);
4278
4279 /*
4280 * Message is locally originated multicast
4281 */
4282 instance->totemsrp_deliver_fn (
4283 mcast_header.header.nodeid,
4284 ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4285 sort_queue_item_p->msg_len - sizeof (struct mcast),
4286 endian_conversion_required);
4287 }
4288}
4289
4290/*
4291 * recv message handler called when MCAST message type received
4292 */
4293static int message_handler_mcast (
4294 struct totemsrp_instance *instance,
4295 const void *msg,
4296 size_t msg_len,
4297 int endian_conversion_needed)
4298{
4300 struct sq *sort_queue;
4301 struct mcast mcast_header;
4302 struct srp_addr aligned_system_from;
4303
4304 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4305 return (0);
4306 }
4307
4308 if (endian_conversion_needed) {
4309 mcast_endian_convert (msg, &mcast_header);
4310 } else {
4311 memcpy (&mcast_header, msg, sizeof (struct mcast));
4312 }
4313
4314 if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4315 sort_queue = &instance->recovery_sort_queue;
4316 } else {
4317 sort_queue = &instance->regular_sort_queue;
4318 }
4319
4320 assert (msg_len <= FRAME_SIZE_MAX);
4321
4322#ifdef TEST_DROP_MCAST_PERCENTAGE
4323 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4324 return (0);
4325 }
4326#endif
4327
4328 /*
4329 * If the message is foreign execute the switch below
4330 */
4331 if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4332 sizeof (struct memb_ring_id)) != 0) {
4333
4334 aligned_system_from = mcast_header.system_from;
4335
4336 switch (instance->memb_state) {
4338 memb_set_merge (
4339 &aligned_system_from, 1,
4340 instance->my_proc_list, &instance->my_proc_list_entries);
4341 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4342 break;
4343
4344 case MEMB_STATE_GATHER:
4345 if (!memb_set_subset (
4346 &aligned_system_from,
4347 1,
4348 instance->my_proc_list,
4349 instance->my_proc_list_entries)) {
4350
4351 memb_set_merge (&aligned_system_from, 1,
4352 instance->my_proc_list, &instance->my_proc_list_entries);
4353 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4354 return (0);
4355 }
4356 break;
4357
4358 case MEMB_STATE_COMMIT:
4359 /* discard message */
4360 instance->stats.rx_msg_dropped++;
4361 break;
4362
4364 /* discard message */
4365 instance->stats.rx_msg_dropped++;
4366 break;
4367 }
4368 return (0);
4369 }
4370
4372 "Received ringid (" CS_PRI_RING_ID ") seq %x",
4373 mcast_header.ring_id.rep,
4374 (uint64_t)mcast_header.ring_id.seq,
4375 mcast_header.seq);
4376
4377 /*
4378 * Add mcast message to rtr queue if not already in rtr queue
4379 * otherwise free io vectors
4380 */
4381 if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4382 sq_in_range (sort_queue, mcast_header.seq) &&
4383 sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4384
4385 /*
4386 * Allocate new multicast memory block
4387 */
4388// TODO LEAK
4389 sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4390 if (sort_queue_item.mcast == NULL) {
4391 return (-1); /* error here is corrected by the algorithm */
4392 }
4393 memcpy (sort_queue_item.mcast, msg, msg_len);
4394 sort_queue_item.msg_len = msg_len;
4395
4396 if (sq_lt_compare (instance->my_high_seq_received,
4397 mcast_header.seq)) {
4398 instance->my_high_seq_received = mcast_header.seq;
4399 }
4400
4401 sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4402 }
4403
4404 update_aru (instance);
4405 if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4406 messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4407 }
4408
4409/* TODO remove from retrans message queue for old ring in recovery state */
4410 return (0);
4411}
4412
4413static int message_handler_memb_merge_detect (
4414 struct totemsrp_instance *instance,
4415 const void *msg,
4416 size_t msg_len,
4417 int endian_conversion_needed)
4418{
4420 struct srp_addr aligned_system_from;
4421
4422 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4423 return (0);
4424 }
4425
4426 if (endian_conversion_needed) {
4427 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4428 } else {
4429 memcpy (&memb_merge_detect, msg,
4430 sizeof (struct memb_merge_detect));
4431 }
4432
4433 /*
4434 * do nothing if this is a merge detect from this configuration
4435 */
4436 if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4437 sizeof (struct memb_ring_id)) == 0) {
4438
4439 return (0);
4440 }
4441
4442 aligned_system_from = memb_merge_detect.system_from;
4443
4444 /*
4445 * Execute merge operation
4446 */
4447 switch (instance->memb_state) {
4449 memb_set_merge (&aligned_system_from, 1,
4450 instance->my_proc_list, &instance->my_proc_list_entries);
4451 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4452 break;
4453
4454 case MEMB_STATE_GATHER:
4455 if (!memb_set_subset (
4456 &aligned_system_from,
4457 1,
4458 instance->my_proc_list,
4459 instance->my_proc_list_entries)) {
4460
4461 memb_set_merge (&aligned_system_from, 1,
4462 instance->my_proc_list, &instance->my_proc_list_entries);
4463 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4464 return (0);
4465 }
4466 break;
4467
4468 case MEMB_STATE_COMMIT:
4469 /* do nothing in commit */
4470 break;
4471
4473 /* do nothing in recovery */
4474 break;
4475 }
4476 return (0);
4477}
4478
4479static void memb_join_process (
4480 struct totemsrp_instance *instance,
4481 const struct memb_join *memb_join)
4482{
4483 struct srp_addr *proc_list;
4484 struct srp_addr *failed_list;
4485 int gather_entered = 0;
4486 int fail_minus_memb_entries = 0;
4487 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4488 struct srp_addr aligned_system_from;
4489
4490 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4491 failed_list = proc_list + memb_join->proc_list_entries;
4492 aligned_system_from = memb_join->system_from;
4493
4494 log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4495 memb_set_log(instance, instance->totemsrp_log_level_trace,
4496 "proclist", proc_list, memb_join->proc_list_entries);
4497 memb_set_log(instance, instance->totemsrp_log_level_trace,
4498 "faillist", failed_list, memb_join->failed_list_entries);
4499 memb_set_log(instance, instance->totemsrp_log_level_trace,
4500 "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4501 memb_set_log(instance, instance->totemsrp_log_level_trace,
4502 "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4503
4505 if (instance->flushing) {
4508 "Discarding LEAVE message during flush, nodeid=" CS_PRI_NODE_ID,
4510 if (memb_join->failed_list_entries > 0) {
4511 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4512 }
4513 } else {
4515 "Discarding JOIN message during flush, nodeid=" CS_PRI_NODE_ID, memb_join->header.nodeid);
4516 }
4517 return;
4518 } else {
4521 "Received LEAVE message from " CS_PRI_NODE_ID, memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4522 if (memb_join->failed_list_entries > 0) {
4523 my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4524 }
4525 }
4526 }
4527
4528 }
4529
4530 if (memb_set_equal (proc_list,
4532 instance->my_proc_list,
4533 instance->my_proc_list_entries) &&
4534
4535 memb_set_equal (failed_list,
4537 instance->my_failed_list,
4538 instance->my_failed_list_entries)) {
4539
4541 memb_consensus_set (instance, &aligned_system_from);
4542 }
4543
4544 if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4545 instance->failed_to_recv = 0;
4546 instance->my_proc_list[0] = instance->my_id;
4547 instance->my_proc_list_entries = 1;
4548 instance->my_failed_list_entries = 0;
4549
4550 memb_state_commit_token_create (instance);
4551
4552 memb_state_commit_enter (instance);
4553 return;
4554 }
4555 if (memb_consensus_agreed (instance) &&
4556 memb_lowest_in_config (instance)) {
4557
4558 memb_state_commit_token_create (instance);
4559
4560 memb_state_commit_enter (instance);
4561 } else {
4562 goto out;
4563 }
4564 } else
4565 if (memb_set_subset (proc_list,
4567 instance->my_proc_list,
4568 instance->my_proc_list_entries) &&
4569
4570 memb_set_subset (failed_list,
4572 instance->my_failed_list,
4573 instance->my_failed_list_entries)) {
4574
4575 goto out;
4576 } else
4577 if (memb_set_subset (&aligned_system_from, 1,
4578 instance->my_failed_list, instance->my_failed_list_entries)) {
4579
4580 goto out;
4581 } else {
4582 memb_set_merge (proc_list,
4584 instance->my_proc_list, &instance->my_proc_list_entries);
4585
4586 if (memb_set_subset (
4587 &instance->my_id, 1,
4588 failed_list, memb_join->failed_list_entries)) {
4589
4590 memb_set_merge (
4591 &aligned_system_from, 1,
4592 instance->my_failed_list, &instance->my_failed_list_entries);
4593 } else {
4594 if (memb_set_subset (
4595 &aligned_system_from, 1,
4596 instance->my_memb_list,
4597 instance->my_memb_entries)) {
4598
4599 if (memb_set_subset (
4600 &aligned_system_from, 1,
4601 instance->my_failed_list,
4602 instance->my_failed_list_entries) == 0) {
4603
4604 memb_set_merge (failed_list,
4606 instance->my_failed_list, &instance->my_failed_list_entries);
4607 } else {
4608 memb_set_subtract (fail_minus_memb,
4609 &fail_minus_memb_entries,
4610 failed_list,
4612 instance->my_memb_list,
4613 instance->my_memb_entries);
4614
4615 memb_set_merge (fail_minus_memb,
4616 fail_minus_memb_entries,
4617 instance->my_failed_list,
4618 &instance->my_failed_list_entries);
4619 }
4620 }
4621 }
4622 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4623 gather_entered = 1;
4624 }
4625
4626out:
4627 if (gather_entered == 0 &&
4628 instance->memb_state == MEMB_STATE_OPERATIONAL) {
4629
4630 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4631 }
4632}
4633
4634static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4635{
4636 int i;
4637 struct srp_addr *in_proc_list;
4638 struct srp_addr *in_failed_list;
4639 struct srp_addr *out_proc_list;
4640 struct srp_addr *out_failed_list;
4641
4644 out->header.type = in->header.type;
4645 out->header.nodeid = swab32 (in->header.nodeid);
4646 out->system_from = srp_addr_endian_convert(in->system_from);
4649 out->ring_seq = swab64 (in->ring_seq);
4650
4651 in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4652 in_failed_list = in_proc_list + out->proc_list_entries;
4653 out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4654 out_failed_list = out_proc_list + out->proc_list_entries;
4655
4656 for (i = 0; i < out->proc_list_entries; i++) {
4657 out_proc_list[i] = srp_addr_endian_convert (in_proc_list[i]);
4658 }
4659 for (i = 0; i < out->failed_list_entries; i++) {
4660 out_failed_list[i] = srp_addr_endian_convert (in_failed_list[i]);
4661 }
4662}
4663
4664static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4665{
4666 int i;
4667 struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4668 struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4669 struct memb_commit_token_memb_entry *in_memb_list;
4670 struct memb_commit_token_memb_entry *out_memb_list;
4671
4672 out->header.magic = TOTEM_MH_MAGIC;
4673 out->header.version = TOTEM_MH_VERSION;
4674 out->header.type = in->header.type;
4675 out->header.nodeid = swab32 (in->header.nodeid);
4676 out->token_seq = swab32 (in->token_seq);
4677 out->ring_id.rep = swab32(in->ring_id.rep);
4678 out->ring_id.seq = swab64 (in->ring_id.seq);
4679 out->retrans_flg = swab32 (in->retrans_flg);
4680 out->memb_index = swab32 (in->memb_index);
4681 out->addr_entries = swab32 (in->addr_entries);
4682
4683 in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4684 out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4685 for (i = 0; i < out->addr_entries; i++) {
4686 out_addr[i] = srp_addr_endian_convert (in_addr[i]);
4687
4688 /*
4689 * Only convert the memb entry if it has been set
4690 */
4691 if (in_memb_list[i].ring_id.rep != 0) {
4692 out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4693
4694 out_memb_list[i].ring_id.seq =
4695 swab64 (in_memb_list[i].ring_id.seq);
4696 out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4697 out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4698 out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4699 }
4700 }
4701}
4702
4703static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4704{
4705 int i;
4706
4709 out->header.type = in->header.type;
4710 out->header.nodeid = swab32 (in->header.nodeid);
4711 out->seq = swab32 (in->seq);
4712 out->token_seq = swab32 (in->token_seq);
4713 out->aru = swab32 (in->aru);
4714 out->ring_id.rep = swab32(in->ring_id.rep);
4715 out->aru_addr = swab32(in->aru_addr);
4716 out->ring_id.seq = swab64 (in->ring_id.seq);
4717 out->fcc = swab32 (in->fcc);
4718 out->backlog = swab32 (in->backlog);
4719 out->retrans_flg = swab32 (in->retrans_flg);
4721 for (i = 0; i < out->rtr_list_entries; i++) {
4722 out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4723 out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4724 out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4725 }
4726}
4727
4728static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4729{
4732 out->header.type = in->header.type;
4733 out->header.nodeid = swab32 (in->header.nodeid);
4735
4736 out->seq = swab32 (in->seq);
4737 out->this_seqno = swab32 (in->this_seqno);
4738 out->ring_id.rep = swab32(in->ring_id.rep);
4739 out->ring_id.seq = swab64 (in->ring_id.seq);
4740 out->node_id = swab32 (in->node_id);
4741 out->guarantee = swab32 (in->guarantee);
4742 out->system_from = srp_addr_endian_convert(in->system_from);
4743}
4744
4745static void memb_merge_detect_endian_convert (
4746 const struct memb_merge_detect *in,
4747 struct memb_merge_detect *out)
4748{
4751 out->header.type = in->header.type;
4752 out->header.nodeid = swab32 (in->header.nodeid);
4753 out->ring_id.rep = swab32(in->ring_id.rep);
4754 out->ring_id.seq = swab64 (in->ring_id.seq);
4755 out->system_from = srp_addr_endian_convert (in->system_from);
4756}
4757
4758static int ignore_join_under_operational (
4759 struct totemsrp_instance *instance,
4760 const struct memb_join *memb_join)
4761{
4762 struct srp_addr *proc_list;
4763 struct srp_addr *failed_list;
4764 unsigned long long ring_seq;
4765 struct srp_addr aligned_system_from;
4766
4767 proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4768 failed_list = proc_list + memb_join->proc_list_entries;
4770 aligned_system_from = memb_join->system_from;
4771
4772 if (memb_set_subset (&instance->my_id, 1,
4773 failed_list, memb_join->failed_list_entries)) {
4774 return (1);
4775 }
4776
4777 /*
4778 * In operational state, my_proc_list is exactly the same as
4779 * my_memb_list.
4780 */
4781 if ((memb_set_subset (&aligned_system_from, 1,
4782 instance->my_memb_list, instance->my_memb_entries)) &&
4783 (ring_seq < instance->my_ring_id.seq)) {
4784 return (1);
4785 }
4786
4787 return (0);
4788}
4789
4790static int message_handler_memb_join (
4791 struct totemsrp_instance *instance,
4792 const void *msg,
4793 size_t msg_len,
4794 int endian_conversion_needed)
4795{
4796 const struct memb_join *memb_join;
4797 struct memb_join *memb_join_convert = alloca (msg_len);
4798 struct srp_addr aligned_system_from;
4799
4800 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4801 return (0);
4802 }
4803
4804 if (endian_conversion_needed) {
4805 memb_join = memb_join_convert;
4806 memb_join_endian_convert (msg, memb_join_convert);
4807
4808 } else {
4809 memb_join = msg;
4810 }
4811
4812 aligned_system_from = memb_join->system_from;
4813
4814 /*
4815 * If the process paused because it wasn't scheduled in a timely
4816 * fashion, flush the join messages because they may be queued
4817 * entries
4818 */
4819 if (pause_flush (instance)) {
4820 return (0);
4821 }
4822
4823 if (instance->token_ring_id_seq < memb_join->ring_seq) {
4825 }
4826 switch (instance->memb_state) {
4828 if (!ignore_join_under_operational (instance, memb_join)) {
4829 memb_join_process (instance, memb_join);
4830 }
4831 break;
4832
4833 case MEMB_STATE_GATHER:
4834 memb_join_process (instance, memb_join);
4835 break;
4836
4837 case MEMB_STATE_COMMIT:
4838 if (memb_set_subset (&aligned_system_from,
4839 1,
4840 instance->my_new_memb_list,
4841 instance->my_new_memb_entries) &&
4842
4843 memb_join->ring_seq >= instance->my_ring_id.seq) {
4844
4845 memb_join_process (instance, memb_join);
4846 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4847 }
4848 break;
4849
4851 if (memb_set_subset (&aligned_system_from,
4852 1,
4853 instance->my_new_memb_list,
4854 instance->my_new_memb_entries) &&
4855
4856 memb_join->ring_seq >= instance->my_ring_id.seq) {
4857
4858 memb_join_process (instance, memb_join);
4859 memb_recovery_state_token_loss (instance);
4860 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4861 }
4862 break;
4863 }
4864 return (0);
4865}
4866
4867static int message_handler_memb_commit_token (
4868 struct totemsrp_instance *instance,
4869 const void *msg,
4870 size_t msg_len,
4871 int endian_conversion_needed)
4872{
4873 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4875 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4876 int sub_entries;
4877
4878 struct srp_addr *addr;
4879
4881 "got commit token");
4882
4883 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4884 return (0);
4885 }
4886
4887 if (endian_conversion_needed) {
4888 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4889 } else {
4890 memcpy (memb_commit_token_convert, msg, msg_len);
4891 }
4892 memb_commit_token = memb_commit_token_convert;
4894
4895#ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4896 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4897 return (0);
4898 }
4899#endif
4900 switch (instance->memb_state) {
4902 /* discard token */
4903 break;
4904
4905 case MEMB_STATE_GATHER:
4906 memb_set_subtract (sub, &sub_entries,
4907 instance->my_proc_list, instance->my_proc_list_entries,
4908 instance->my_failed_list, instance->my_failed_list_entries);
4909
4910 if (memb_set_equal (addr,
4912 sub,
4913 sub_entries) &&
4914
4916 memcpy (instance->commit_token, memb_commit_token, msg_len);
4917 memb_state_commit_enter (instance);
4918 }
4919 break;
4920
4921 case MEMB_STATE_COMMIT:
4922 /*
4923 * If retransmitted commit tokens are sent on this ring
4924 * filter them out and only enter recovery once the
4925 * commit token has traversed the array. This is
4926 * determined by :
4927 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4928 */
4929 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4931 memb_state_recovery_enter (instance, memb_commit_token);
4932 }
4933 break;
4934
4936 if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4937
4938 /* Filter out duplicated tokens */
4939 if (instance->originated_orf_token) {
4940 break;
4941 }
4942
4943 instance->originated_orf_token = 1;
4944
4946 "Sending initial ORF token");
4947
4948 // TODO convert instead of initiate
4949 orf_token_send_initial (instance);
4950 reset_token_timeout (instance); // REVIEWED
4951 reset_token_retransmit_timeout (instance); // REVIEWED
4952 }
4953 break;
4954 }
4955 return (0);
4956}
4957
4958static int message_handler_token_hold_cancel (
4959 struct totemsrp_instance *instance,
4960 const void *msg,
4961 size_t msg_len,
4962 int endian_conversion_needed)
4963{
4964 const struct token_hold_cancel *token_hold_cancel = msg;
4965
4966 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4967 return (0);
4968 }
4969
4970 if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4971 sizeof (struct memb_ring_id)) == 0) {
4972
4973 instance->my_seq_unchanged = 0;
4974 if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4975 timer_function_token_retransmit_timeout (instance);
4976 }
4977 }
4978 return (0);
4979}
4980
4981static int check_message_header_validity(
4982 void *context,
4983 const void *msg,
4984 unsigned int msg_len,
4985 const struct sockaddr_storage *system_from)
4986{
4987 struct totemsrp_instance *instance = context;
4988 const struct totem_message_header *message_header = msg;
4989 const char *guessed_str;
4990 const char *msg_byte = msg;
4991
4992 if (msg_len < sizeof (struct totem_message_header)) {
4994 "Message received from %s is too short... Ignoring %u.",
4995 totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4996 return (-1);
4997 }
4998
4999 if (message_header->magic != TOTEM_MH_MAGIC &&
5000 message_header->magic != swab16(TOTEM_MH_MAGIC)) {
5001 /*
5002 * We've received ether Knet, old version of Corosync,
5003 * or something else. Do some guessing to display (hopefully)
5004 * helpful message
5005 */
5006 guessed_str = NULL;
5007
5008 if (message_header->magic == 0xFFFF) {
5009 /*
5010 * Corosync 2.2 used header with two UINT8_MAX
5011 */
5012 guessed_str = "Corosync 2.2";
5013 } else if (message_header->magic == 0xFEFE) {
5014 /*
5015 * Corosync 2.3+ used header with two UINT8_MAX - 1
5016 */
5017 guessed_str = "Corosync 2.3+";
5018 } else if (msg_byte[0] == 0x01) {
5019 /*
5020 * Knet has stable1 with first byte of message == 1
5021 */
5022 guessed_str = "unencrypted Kronosnet";
5023 } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
5024 /*
5025 * Unencrypted Corosync 1.x/OpenAIS has first byte
5026 * 0-5. Collision with Knet (but still worth the try)
5027 */
5028 guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
5029 } else {
5030 /*
5031 * Encrypted Kronosned packet has a hash at the end of
5032 * the packet and nothing specific at the beginning of the
5033 * packet (just encrypted data).
5034 * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
5035 * is in the beginning of the packet.
5036 *
5037 * So it's not possible to reliably detect ether of them.
5038 */
5039 guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
5040 }
5041
5043 "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
5044 totemip_sa_print((struct sockaddr *)system_from),
5045 guessed_str);
5046
5047 return (-1);
5048 }
5049
5050 if (message_header->version != TOTEM_MH_VERSION) {
5052 "Message received from %s has unsupported version %u... Ignoring",
5053 totemip_sa_print((struct sockaddr *)system_from),
5054 message_header->version);
5055
5056 return (-1);
5057 }
5058
5059 return (0);
5060}
5061
5062
5064 void *context,
5065 const void *msg,
5066 unsigned int msg_len,
5067 const struct sockaddr_storage *system_from)
5068{
5069 struct totemsrp_instance *instance = context;
5070 const struct totem_message_header *message_header = msg;
5071
5072 if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
5073 return -1;
5074 }
5075
5076 switch (message_header->type) {
5078 instance->stats.orf_token_rx++;
5079 break;
5080 case MESSAGE_TYPE_MCAST:
5081 instance->stats.mcast_rx++;
5082 break;
5084 instance->stats.memb_merge_detect_rx++;
5085 break;
5087 instance->stats.memb_join_rx++;
5088 break;
5090 instance->stats.memb_commit_token_rx++;
5091 break;
5093 instance->stats.token_hold_cancel_rx++;
5094 break;
5095 default:
5097 "Message received from %s has wrong type... ignoring %d.\n",
5098 totemip_sa_print((struct sockaddr *)system_from),
5099 (int)message_header->type);
5100
5101 instance->stats.rx_msg_dropped++;
5102 return 0;
5103 }
5104 /*
5105 * Handle incoming message
5106 */
5107 return totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5108 instance,
5109 msg,
5110 msg_len,
5111 message_header->magic != TOTEM_MH_MAGIC);
5112}
5113
5115 void *context,
5116 const struct totem_ip_address *interface_addr,
5117 unsigned short ip_port,
5118 unsigned int iface_no)
5119{
5120 struct totemsrp_instance *instance = context;
5121 int res;
5122
5123 totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5124
5125 res = totemnet_iface_set (
5126 instance->totemnet_context,
5127 interface_addr,
5128 ip_port,
5129 iface_no);
5130
5131 return (res);
5132}
5133
5134/* Contrary to its name, this only gets called when the interface is enabled */
5136 void *context,
5137 const struct totem_ip_address *iface_addr,
5138 unsigned int iface_no)
5139{
5140 struct totemsrp_instance *instance = context;
5141 int num_interfaces;
5142 int i;
5143 int res = 0;
5144
5145 if (!instance->my_id.nodeid) {
5146 instance->my_id.nodeid = iface_addr->nodeid;
5147 }
5148 totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5149
5150 if (instance->iface_changes++ == 0) {
5151 instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5152 /*
5153 * Increase the ring_id sequence number. This doesn't follow specification.
5154 * Solves problem with restarted leader node (node with lowest nodeid) before
5155 * rest of the cluster forms new membership and guarantees unique ring_id for
5156 * new singleton configuration.
5157 */
5158 instance->my_ring_id.seq++;
5159
5160 instance->token_ring_id_seq = instance->my_ring_id.seq;
5161 log_printf (
5162 instance->totemsrp_log_level_debug,
5163 "Created or loaded sequence id " CS_PRI_RING_ID " for this ring.",
5164 instance->my_ring_id.rep,
5165 (uint64_t)instance->my_ring_id.seq);
5166
5167 if (instance->totemsrp_service_ready_fn) {
5168 instance->totemsrp_service_ready_fn ();
5169 }
5170
5171 }
5172
5173 num_interfaces = 0;
5174 for (i = 0; i < INTERFACE_MAX; i++) {
5175 if (instance->totem_config->interfaces[i].configured) {
5176 num_interfaces++;
5177 }
5178 }
5179
5180 if (instance->iface_changes >= num_interfaces) {
5181 /* We need to clear orig_interfaces so that 'commit' diffs against nothing */
5182 instance->totem_config->orig_interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
5183 assert(instance->totem_config->orig_interfaces != NULL);
5184 memset(instance->totem_config->orig_interfaces, 0, sizeof (struct totem_interface) * INTERFACE_MAX);
5185
5187
5188 memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5189 free(instance->totem_config->orig_interfaces);
5190 }
5191 return res;
5192}
5193
5195 totem_config->net_mtu -= 2 * sizeof (struct mcast);
5196}
5197
5199 void *context,
5200 void (*totem_service_ready) (void))
5201{
5202 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5203
5204 instance->totemsrp_service_ready_fn = totem_service_ready;
5205}
5206
5208 void *context,
5209 const struct totem_ip_address *member,
5210 int iface_no)
5211{
5212 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5213 int res;
5214
5215 res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5216
5217 return (res);
5218}
5219
5221 void *context,
5222 const struct totem_ip_address *member,
5223 int iface_no)
5224{
5225 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5226 int res;
5227
5228 res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5229
5230 return (res);
5231}
5232
5234{
5235 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5236
5237 instance->threaded_mode_enabled = 1;
5238}
5239
5240void totemsrp_trans_ack (void *context)
5241{
5242 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5243
5244 instance->waiting_trans_ack = 0;
5246}
5247
5248
5250{
5251 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5252 int res;
5253
5255 return (res);
5256}
5257
5259{
5260 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5261 int res;
5262
5264 return (res);
5265}
5266
5267void totemsrp_stats_clear (void *context, int flags)
5268{
5269 struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5270
5271 memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5274 }
5275}
5276
5277void totemsrp_force_gather (void *context)
5278{
5279 timer_function_orf_token_timeout(context);
5280}
unsigned long long seq
Definition coroapi.h:1
totem_configuration_type
The totem_configuration_type enum.
Definition coroapi.h:132
@ TOTEM_CONFIGURATION_REGULAR
Definition coroapi.h:133
@ TOTEM_CONFIGURATION_TRANSITIONAL
Definition coroapi.h:134
#define INTERFACE_MAX
Definition coroapi.h:88
#define MESSAGE_QUEUE_MAX
Definition coroapi.h:98
unsigned int nodeid
Definition coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition coroapi.h:2
totem_callback_token_type
The totem_callback_token_type enum.
Definition coroapi.h:142
@ TOTEM_CALLBACK_TOKEN_SENT
Definition coroapi.h:144
@ TOTEM_CALLBACK_TOKEN_RECEIVED
Definition coroapi.h:143
#define PROCESSOR_COUNT_MAX
Definition coroapi.h:96
#define CS_PRI_RING_ID_SEQ
Definition corotypes.h:61
#define CS_PRI_NODE_ID
Definition corotypes.h:59
#define CS_PRI_RING_ID
Definition corotypes.h:62
uint32_t flags
uint32_t value
icmap_map_t icmap_get_global_map(void)
Return global icmap.
Definition icmap.c:268
#define LOGSYS_LEVEL_DEBUG
Definition logsys.h:76
struct srp_addr addr
Definition totemsrp.c:164
int guarantee
Definition totemsrp.c:190
unsigned int node_id
Definition totemsrp.c:189
struct memb_ring_id ring_id
Definition totemsrp.c:188
struct totem_message_header header
Definition totemsrp.c:184
unsigned int seq
Definition totemsrp.c:186
int this_seqno
Definition totemsrp.c:187
struct srp_addr system_from
Definition totemsrp.c:185
Definition totemsrp.c:243
unsigned int aru
Definition totemsrp.c:245
unsigned int received_flg
Definition totemsrp.c:247
struct memb_ring_id ring_id
Definition totemsrp.c:244
unsigned int high_delivered
Definition totemsrp.c:246
unsigned int retrans_flg
Definition totemsrp.c:255
struct totem_message_header header
Definition totemsrp.c:252
unsigned char end_of_commit_token[0]
Definition totemsrp.c:258
unsigned int token_seq
Definition totemsrp.c:253
struct memb_ring_id ring_id
Definition totemsrp.c:254
struct srp_addr system_from
Definition totemsrp.c:217
struct totem_message_header header
Definition totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition totemsrp.c:221
unsigned long long ring_seq
Definition totemsrp.c:220
unsigned int failed_list_entries
Definition totemsrp.c:219
unsigned int proc_list_entries
Definition totemsrp.c:218
struct totem_message_header header
Definition totemsrp.c:231
struct memb_ring_id ring_id
Definition totemsrp.c:233
struct srp_addr system_from
Definition totemsrp.c:232
The memb_ring_id struct.
Definition coroapi.h:122
unsigned long long seq
Definition coroapi.h:124
unsigned int rep
Definition totem.h:150
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition totemsrp.c:535
unsigned int msg_len
Definition totemsrp.c:269
struct mcast * mcast
Definition totemsrp.c:268
unsigned int backlog
Definition totemsrp.c:207
unsigned int token_seq
Definition totemsrp.c:203
unsigned int aru_addr
Definition totemsrp.c:205
unsigned int fcc
Definition totemsrp.c:208
unsigned int aru
Definition totemsrp.c:204
int rtr_list_entries
Definition totemsrp.c:210
struct rtr_item rtr_list[0]
Definition totemsrp.c:211
int retrans_flg
Definition totemsrp.c:209
unsigned int seq
Definition totemsrp.c:202
struct totem_message_header header
Definition totemsrp.c:201
struct memb_ring_id ring_id
Definition totemsrp.c:206
struct memb_ring_id ring_id
Definition totemsrp.c:195
unsigned int seq
Definition totemsrp.c:196
unsigned int msg_len
Definition totemsrp.c:274
struct mcast * mcast
Definition totemsrp.c:273
The sq struct.
Definition sq.h:43
unsigned int nodeid
Definition totemsrp.c:108
struct qb_list_head list
Definition totemsrp.c:170
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition totemsrp.c:171
enum totem_callback_token_type callback_type
Definition totemsrp.c:172
struct totem_message_header header
Definition totemsrp.c:238
struct memb_ring_id ring_id
Definition totemsrp.c:239
unsigned int max_messages
Definition totem.h:220
unsigned int heartbeat_failures_allowed
Definition totem.h:214
unsigned int token_timeout
Definition totem.h:182
unsigned int window_size
Definition totem.h:218
struct totem_logging_configuration totem_logging_configuration
Definition totem.h:208
unsigned int downcheck_timeout
Definition totem.h:200
unsigned int miss_count_const
Definition totem.h:242
struct totem_interface * interfaces
Definition totem.h:165
unsigned int cancel_token_hold_on_retransmit
Definition totem.h:248
unsigned int fail_to_recv_const
Definition totem.h:202
unsigned int merge_timeout
Definition totem.h:198
struct totem_interface * orig_interfaces
Definition totem.h:166
unsigned int net_mtu
Definition totem.h:210
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:250
unsigned int token_retransmits_before_loss_const
Definition totem.h:190
unsigned int max_network_delay
Definition totem.h:216
unsigned int seqno_unchanged_const
Definition totem.h:204
unsigned int consensus_timeout
Definition totem.h:196
unsigned int threads
Definition totem.h:212
unsigned int send_join_timeout
Definition totem.h:194
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totem.h:254
unsigned int token_retransmit_timeout
Definition totem.h:186
unsigned int token_warning
Definition totem.h:184
unsigned int join_timeout
Definition totem.h:192
unsigned int token_hold_timeout
Definition totem.h:188
struct totem_ip_address boundto
Definition totem.h:84
uint8_t configured
Definition totem.h:89
int member_count
Definition totem.h:90
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition totem.h:97
struct totem_ip_address mcast_addr
Definition totem.h:85
The totem_ip_address struct.
Definition coroapi.h:111
unsigned int nodeid
Definition coroapi.h:112
unsigned short family
Definition coroapi.h:113
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition totem.h:101
unsigned int nodeid
Definition totem.h:131
unsigned short magic
Definition totem.h:127
uint8_t reachable
Definition totem.h:268
uint32_t version
Definition totem.h:266
struct totem_ip_address mcast_address
Definition totemsrp.c:452
totemsrp_stats_t stats
Definition totemsrp.c:516
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:320
int consensus_list_entries
Definition totemsrp.c:300
int my_merge_detect_timeout_outstanding
Definition totemsrp.c:346
unsigned int my_last_seq
Definition totemsrp.c:496
qb_loop_timer_handle timer_heartbeat_timeout
Definition totemsrp.c:419
unsigned int my_token_seq
Definition totemsrp.c:396
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition totemsrp.c:413
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:298
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition totemsrp.c:415
uint64_t pause_timestamp
Definition totemsrp.c:512
uint32_t threaded_mode_enabled
Definition totemsrp.c:522
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:316
void * totemnet_context
Definition totemsrp.c:500
int my_leave_memb_entries
Definition totemsrp.c:338
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:308
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:312
int my_failed_list_entries
Definition totemsrp.c:326
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:310
unsigned int use_heartbeat
Definition totemsrp.c:504
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:472
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition totemsrp.c:417
struct cs_queue new_message_queue
Definition totemsrp.c:371
int orf_token_retransmit_size
Definition totemsrp.c:394
unsigned int my_high_seq_received
Definition totemsrp.c:354
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition totemsrp.c:454
uint32_t orf_token_discard
Definition totemsrp.c:518
struct qb_list_head token_callback_sent_listhead
Definition totemsrp.c:390
unsigned int last_released
Definition totemsrp.c:486
unsigned int set_aru
Definition totemsrp.c:488
int totemsrp_log_level_notice
Definition totemsrp.c:430
struct cs_queue new_message_queue_trans
Definition totemsrp.c:373
int totemsrp_log_level_trace
Definition totemsrp.c:434
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition totemsrp.c:392
unsigned int my_trc
Definition totemsrp.c:506
struct cs_queue retrans_message_queue
Definition totemsrp.c:375
struct memb_ring_id my_ring_id
Definition totemsrp.c:340
int totemsrp_log_level_error
Definition totemsrp.c:426
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition totemsrp.c:469
unsigned int old_ring_state_high_seq_received
Definition totemsrp.c:494
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition totemsrp.c:409
qb_loop_timer_handle timer_pause_timeout
Definition totemsrp.c:401
unsigned int my_high_ring_delivered
Definition totemsrp.c:364
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition totemsrp.c:407
struct totem_config * totem_config
Definition totemsrp.c:502
int my_deliver_memb_entries
Definition totemsrp.c:334
void(* totemsrp_service_ready_fn)(void)
Definition totemsrp.c:467
int my_trans_memb_entries
Definition totemsrp.c:330
uint32_t originated_orf_token
Definition totemsrp.c:520
void * token_recv_event_handle
Definition totemsrp.c:528
struct sq recovery_sort_queue
Definition totemsrp.c:379
qb_loop_timer_handle timer_orf_token_timeout
Definition totemsrp.c:403
qb_loop_timer_handle timer_merge_detect_timeout
Definition totemsrp.c:411
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:322
void * token_sent_event_handle
Definition totemsrp.c:529
unsigned int my_high_delivered
Definition totemsrp.c:386
void enum memb_state memb_state
Definition totemsrp.c:446
int totemsrp_log_level_security
Definition totemsrp.c:424
int totemsrp_log_level_warning
Definition totemsrp.c:428
struct memb_commit_token * commit_token
Definition totemsrp.c:514
char commit_token_storage[40000]
Definition totemsrp.c:530
struct memb_ring_id my_old_ring_id
Definition totemsrp.c:342
struct timeval tv_old
Definition totemsrp.c:498
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition totemsrp.c:476
qb_loop_t * totemsrp_poll_handle
Definition totemsrp.c:450
unsigned int my_install_seq
Definition totemsrp.c:356
qb_loop_timer_handle timer_orf_token_warning
Definition totemsrp.c:405
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:314
struct srp_addr my_id
Definition totemsrp.c:304
unsigned int my_cbl
Definition totemsrp.c:510
struct qb_list_head token_callback_received_listhead
Definition totemsrp.c:388
unsigned int my_last_aru
Definition totemsrp.c:348
unsigned int my_aru
Definition totemsrp.c:384
uint32_t waiting_trans_ack
Definition totemsrp.c:524
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition totemsrp.c:438
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition totemsrp.c:460
struct sq regular_sort_queue
Definition totemsrp.c:377
unsigned long long token_ring_id_seq
Definition totemsrp.c:484
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition totemsrp.c:318
int totemsrp_log_level_debug
Definition totemsrp.c:432
unsigned int my_pbl
Definition totemsrp.c:508
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition totemsrp.c:306
uint64_t memb_join_tx
Definition totemstats.h:59
uint32_t continuous_gather
Definition totemstats.h:78
uint64_t recovery_entered
Definition totemstats.h:74
uint64_t rx_msg_dropped
Definition totemstats.h:77
uint64_t gather_entered
Definition totemstats.h:70
uint64_t memb_commit_token_rx
Definition totemstats.h:65
uint64_t mcast_retx
Definition totemstats.h:62
uint64_t mcast_tx
Definition totemstats.h:61
uint64_t memb_commit_token_tx
Definition totemstats.h:64
uint64_t operational_token_lost
Definition totemstats.h:69
uint64_t operational_entered
Definition totemstats.h:68
uint64_t gather_token_lost
Definition totemstats.h:71
uint64_t commit_token_lost
Definition totemstats.h:73
uint64_t token_hold_cancel_tx
Definition totemstats.h:66
uint64_t orf_token_rx
Definition totemstats.h:56
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition totemstats.h:90
uint64_t recovery_token_lost
Definition totemstats.h:75
uint64_t commit_entered
Definition totemstats.h:72
uint64_t memb_merge_detect_rx
Definition totemstats.h:58
uint64_t memb_join_rx
Definition totemstats.h:60
uint64_t orf_token_tx
Definition totemstats.h:55
uint64_t memb_merge_detect_tx
Definition totemstats.h:57
uint64_t mcast_rx
Definition totemstats.h:63
uint64_t token_hold_cancel_rx
Definition totemstats.h:67
uint64_t consensus_timeouts
Definition totemstats.h:76
#define swab64(x)
The swab64 macro.
Definition swab.h:65
#define swab16(x)
The swab16 macro.
Definition swab.h:39
#define swab32(x)
The swab32 macro.
Definition swab.h:51
totem_event_type
Definition totem.h:290
#define TOTEM_MH_VERSION
Definition totem.h:124
#define FRAME_SIZE_MAX
Definition totem.h:52
cfg_message_crypto_reconfig_phase_t
Definition totem.h:154
#define TOTEM_NODE_STATUS_STRUCTURE_VERSION
Definition totem.h:264
#define TOTEM_MH_MAGIC
Definition totem.h:123
char type
Definition totem.h:2
int totemconfig_commit_new_params(struct totem_config *totem_config, icmap_map_t map)
const char * totemip_sa_print(const struct sockaddr *sa)
Definition totemip.c:234
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition totemip.c:123
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, int(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), int(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
Definition totemnet.c:317
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemnet.c:471
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:553
void * totemnet_buffer_alloc(void *net_context)
Definition totemnet.c:367
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:414
int totemnet_send_flush(void *net_context)
Definition totemnet.c:404
void totemnet_buffer_release(void *net_context, void *ptr)
Definition totemnet.c:375
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:426
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition totemnet.c:533
int totemnet_finalize(void *net_context)
Definition totemnet.c:306
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition totemnet.c:292
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition totemnet.c:497
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition totemnet.c:383
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition totemnet.c:510
int totemnet_recv_flush(void *net_context)
Definition totemnet.c:394
int totemnet_iface_check(void *net_context)
Definition totemnet.c:452
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition totemnet.c:439
int totemnet_recv_mcast_empty(void *net_context)
Definition totemnet.c:522
int totemnet_nodestatus_get(void *net_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemnet.c:484
void totemnet_stats_clear(void *net_context)
Definition totemnet.c:619
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition totemnet.c:589
int totemnet_crypto_reconfigure_phase(void *net_context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemnet.c:603
Totem Network interface - also does encryption/decryption.
int totemsrp_my_family_get(void *srp_context)
Definition totemsrp.c:1133
#define SEQNO_START_TOKEN
Definition totemsrp.c:122
int main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition totemsrp.c:5135
unsigned long long ring_seq
Definition totemsrp.c:4
#define RETRANSMIT_ENTRIES_MAX
Definition totemsrp.c:100
#define log_printf(level, format, args...)
Definition totemsrp.c:690
void totemsrp_force_gather(void *context)
Definition totemsrp.c:5277
int rtr_list_entries
Definition totemsrp.c:9
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition totemsrp.c:5198
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition totemsrp.c:818
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition totemsrp.c:3496
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition totemsrp.c:97
void totemsrp_threaded_mode_enable(void *context)
Definition totemsrp.c:5233
struct rtr_item rtr_list[0]
Definition totemsrp.c:10
message_type
Definition totemsrp.c:146
@ MESSAGE_TYPE_MEMB_COMMIT_TOKEN
Definition totemsrp.c:151
@ MESSAGE_TYPE_TOKEN_HOLD_CANCEL
Definition totemsrp.c:152
@ MESSAGE_TYPE_ORF_TOKEN
Definition totemsrp.c:147
@ MESSAGE_TYPE_MEMB_JOIN
Definition totemsrp.c:150
@ MESSAGE_TYPE_MEMB_MERGE_DETECT
Definition totemsrp.c:149
@ MESSAGE_TYPE_MCAST
Definition totemsrp.c:148
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition totemsrp.c:5194
#define TOKEN_SIZE_MAX
Definition totemsrp.c:101
encapsulation_type
Definition totemsrp.c:155
@ MESSAGE_NOT_ENCAPSULATED
Definition totemsrp.c:157
@ MESSAGE_ENCAPSULATED
Definition totemsrp.c:156
unsigned int failed_list_entries
Definition totemsrp.c:3
struct message_handlers totemsrp_message_handlers
Definition totemsrp.c:678
int totemsrp_nodestatus_get(void *srp_context, unsigned int nodeid, struct totem_node_status *node_status)
Definition totemsrp.c:1041
#define LEAVE_DUMMY_NODEID
Definition totemsrp.c:102
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition totemsrp.c:96
int guarantee
Definition totemsrp.c:6
unsigned int aru
Definition totemsrp.c:3
gather_state_from
Definition totemsrp.c:542
@ TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED
Definition totemsrp.c:546
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE
Definition totemsrp.c:551
@ TOTEMSRP_GSFROM_FAILED_TO_RECEIVE
Definition totemsrp.c:549
@ TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT
Definition totemsrp.c:543
@ TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE
Definition totemsrp.c:552
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE
Definition totemsrp.c:545
@ TOTEMSRP_GSFROM_MERGE_DURING_JOIN
Definition totemsrp.c:554
@ TOTEMSRP_GSFROM_INTERFACE_CHANGE
Definition totemsrp.c:558
@ TOTEMSRP_GSFROM_GATHER_MISSING1
Definition totemsrp.c:544
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE
Definition totemsrp.c:547
@ TOTEMSRP_GSFROM_MAX
Definition totemsrp.c:559
@ TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE
Definition totemsrp.c:556
@ TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE
Definition totemsrp.c:548
@ TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE
Definition totemsrp.c:550
@ TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE
Definition totemsrp.c:555
@ TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE
Definition totemsrp.c:553
@ TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY
Definition totemsrp.c:557
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition totemsrp.c:1108
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition totemsrp.c:2568
void totemsrp_stats_clear(void *context, int flags)
Definition totemsrp.c:5267
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition totemsrp.c:5114
void totemsrp_finalize(void *srp_context)
Definition totemsrp.c:1026
struct memb_ring_id ring_id
Definition totemsrp.c:4
void totemsrp_trans_ack(void *context)
Definition totemsrp.c:5240
int totemsrp_crypto_reconfigure_phase(void *context, struct totem_config *totem_config, cfg_message_crypto_reconfig_phase_t phase)
Definition totemsrp.c:5258
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition totemsrp.c:1122
int addr_entries
Definition totemsrp.c:5
unsigned int backlog
Definition totemsrp.c:6
#define SEQNO_START_MSG
Definition totemsrp.c:121
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition totemsrp.c:2488
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition totemsrp.c:3531
unsigned int received_flg
Definition totemsrp.c:3
struct message_item __attribute__
int main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition totemsrp.c:5063
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5207
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition totemsrp.c:1070
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition totemsrp.c:5249
unsigned int high_delivered
Definition totemsrp.c:2
struct srp_addr system_from
Definition totemsrp.c:1
unsigned int proc_list_entries
Definition totemsrp.c:2
const char * gather_state_from_desc[]
Definition totemsrp.c:562
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition totemsrp.c:5220
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition totemsrp.c:2497
memb_state
Definition totemsrp.c:277
@ MEMB_STATE_GATHER
Definition totemsrp.c:279
@ MEMB_STATE_RECOVERY
Definition totemsrp.c:281
@ MEMB_STATE_COMMIT
Definition totemsrp.c:280
@ MEMB_STATE_OPERATIONAL
Definition totemsrp.c:278
Totem Single Ring Protocol.
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition totemstats.h:116
#define TOTEM_TOKEN_STATS_MAX
Definition totemstats.h:89