DEPRECATED: Diffusion C Classic API  5.9.4
 All Data Structures Files Functions Pages
diffusion.h
Go to the documentation of this file.
1 
24 #ifndef _diffusion_h_
25 #define _diffusion_h_ 1
26 
27 #include <pthread.h>
28 #include <string.h>
29 #include "llist.h"
30 
31 #define DIFFUSION_SUCCESS 0
32 #define DIFFUSION_ERROR -1
33 #define DIFFUSION_INVALID_PROTOCOL_VERSION -2
34 
35 #define DIFFUSION_MD 0x00
36 #define DIFFUSION_RD 0x01
37 #define DIFFUSION_FD 0x02
38 
39 #define DIFFUSION_MSG_TOPIC_LOAD 0x14
40 #define DIFFUSION_MSG_DELTA 0x15
41 #define DIFFUSION_MSG_SUBSCRIBE 0x16
42 #define DIFFUSION_MSG_UNSUBSCRIBE 0x17
43 #define DIFFUSION_MSG_PING_SERVER 0x18
44 #define DIFFUSION_MSG_PING_CLIENT 0x19
45 #define DIFFUSION_MSG_CREDENTIALS 0x1A
46 #define DIFFUSION_MSG_CREDENTIALS_REJECTED 0x1B
47 #define DIFFUSION_ABORT_NOTIFICATION 0x1C
48 #define DIFFUSION_CLOSE_REQUEST 0x1D
49 #define DIFFUSION_MSG_TOPIC_LOAD_ACK 0x1E
50 #define DIFFUSION_MSG_DELTA_ACK 0x1F
51 #define DIFFUSION_MSG_ACK 0x20
52 #define DIFFUSION_MSG_FETCH 0x21
53 #define DIFFUSION_MSG_FETCH_REPLY 0x22
54 #define DIFFUSION_MSG_TOPIC_STATUS_NOTIFICATION 0x23
55 #define DIFFUSION_MSG_COMMAND 0x24
56 #define DIFFUSION_MSG_COMMAND_TOPIC_LOAD 0x28
57 #define DIFFUSION_MSG_COMMAND_TOPIC_NOTIFICATION 0x29
58 
59 #define DIFFUSION_FLAG_RECONNECT 0x01
60 #define DIFFUSION_FLAG_LOAD_BALANCE 0x02
61 
62 #define DIFFUSION_COMMAND_ID_SERVICE_TOPIC "0"
63 #define DIFFUSION_COMMAND_ID_PAGED_TOPIC "1"
64 
65 #define DIFFUSION_PAGINATED_DATA_TYPE_STRING "PS"
66 #define DIFFUSION_PAGINATED_DATA_TYPE_RECORD "PR"
67 
68 #define DIFFUSION_DEBUG(...) if(_diffusion_debug) { printf(__VA_ARGS__); }
69 
70 static const char *DIFFUSION_VERSION __attribute__ ((unused))= "${product.short.version} ${product.build}#${product.build.changelist}";
71 
72 typedef struct {
73  char *principal;
74  char *credentials;
76 
77 typedef struct {
78  unsigned char protocol_byte;
79  unsigned char protocol_version;
80  unsigned char response;
81  unsigned char message_length_size;
83 
84 typedef struct {
85  char type;
86  int current_page_number;
87  int last_page_number;
88  int total_number_of_lines;
89  union {
90  int dirty;
91  int index;
92  };
94 
95 typedef struct {
96  char *protocol;
97  char *hostname;
98  int port;
99  SECURITY_CREDENTIALS *security_credentials;
100  LLIST *topic_set;
101  int retry_count;
102  int retry_delay;
103  int connection_attempt_count;
105 
106 typedef struct {
107  int fd;
108  unsigned char protocol_byte;
109  unsigned char protocol_version;
110  unsigned char response;
111  unsigned char message_length_size;
112  char *client_id;
113  DIFFUSION_SERVER_DETAILS *server_details;
114  int free_server_details;
115  long last_interaction_ms;
116  LLIST *topic_alias_list;
117  LLIST *topic_listener_list;
118  LLIST *service_listener_list;
119  pthread_mutex_t _mutex_write;
120  pthread_mutex_t _mutex_read;
122 
123 typedef struct {
124  unsigned char message_type;
125  unsigned char encoding;
126  char *ack_id;
127  char *topic;
128  LLIST *header_list;
129  char *data;
130  unsigned long data_length;
132 
133 typedef struct {
134  void (*on_initial_load)(DIFFUSION_MESSAGE *);
135  void (*on_delta)(DIFFUSION_MESSAGE *);
136  void (*on_ping_server)(DIFFUSION_MESSAGE *);
137  void (*on_ping_client)(DIFFUSION_MESSAGE *);
138  void (*on_fetch_reply)(DIFFUSION_MESSAGE *);
139  void (*on_ack)(DIFFUSION_MESSAGE *);
140  void (*on_unhandled_message)(DIFFUSION_MESSAGE *);
141  void (*on_disconnect)(void);
142  void (*on_connect)(DIFFUSION_CONNECTION *);
143  void (*on_command_topic_load)(DIFFUSION_MESSAGE *);
144  void (*on_command_topic_notification)(DIFFUSION_MESSAGE *);
146 
147 #define DIFF_CB_ZERO(cb) memset(&cb, 0, sizeof(cb))
148 
149 extern int _diffusion_debug;
150 
164 DIFFUSION_CONNECTION *diff_connect(const char *hostname, const int port, const SECURITY_CREDENTIALS *credentials);
165 
173 
186 
200 DIFFUSION_CONNECTION *diff_connect_cascade(LLIST *server_list, int flags, const char *client_id);
201 
209 
216 int diff_disconnect(DIFFUSION_CONNECTION *connection);
217 
224 int diff_reconnect(DIFFUSION_CONNECTION *connection);
225 
232 
249 int diff_subscribe(DIFFUSION_CONNECTION *connection, const char *topic_set);
250 
262 int diff_unsubscribe(DIFFUSION_CONNECTION *connection, const char *topic_set);
263 
270 int diff_ping(DIFFUSION_CONNECTION *connection);
271 
280 
288 int diff_ack_response(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message);
289 
299 int diff_send_message(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message);
300 
309 int diff_send_data(DIFFUSION_CONNECTION *connection, const char *topic, const char *data);
310 
320 int diff_send_data_length(DIFFUSION_CONNECTION *connection, const char *topic, const char *data, const long length);
321 
329 int diff_fetch(DIFFUSION_CONNECTION *connection, const char *topic_set);
330 
340 int diff_fetch_correlated(DIFFUSION_CONNECTION *connection, const char *topic_set, LLIST *header_list);
341 
350 
360 int diff_send_command(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message, const char *command, const char *correlation_id);
361 
371 int diff_page_open(DIFFUSION_CONNECTION *connection, const char *topic, const int page_size, const int start_page);
372 
380 int diff_page_refresh(DIFFUSION_CONNECTION *connection, const char *topic);
381 
389 int diff_page_next(DIFFUSION_CONNECTION *connection, const char *topic);
390 
398 int diff_page_prior(DIFFUSION_CONNECTION *connection, const char *topic);
399 
407 int diff_page_first(DIFFUSION_CONNECTION *connection, const char *topic);
408 
416 int diff_page_last(DIFFUSION_CONNECTION *connection, const char *topic);
417 
428 int diff_page_number(DIFFUSION_CONNECTION *connection, const char *topic, const int page_number);
429 
437 int diff_page_close(DIFFUSION_CONNECTION *connection, const char *topic);
438 
454 
466 
481 DIFFUSION_MESSAGE *diff_decode_message(const int len, const char *buf, DIFFUSION_CONNECTION *connection);
482 
493 DIFFUSION_MESSAGE *diff_create_message(const LLIST *header_list, const char *data);
494 
506 DIFFUSION_MESSAGE *diff_create_message_length(const LLIST *header_list, const char *data, const long length);
507 
515 
522 
532 
539 
546 void diff_msg_add_header(DIFFUSION_MESSAGE *msg, const char *data);
547 
554 
561 
570 void diff_loop(DIFFUSION_CONNECTION *connection, DIFFUSION_CALLBACKS *callbacks);
571 
591 void diff_main(DIFFUSION_CONNECTION *connection, LLIST *server_list, DIFFUSION_CALLBACKS *callbacks, int flags);
592 
600 void diff_dispatch(DIFFUSION_MESSAGE *msg, DIFFUSION_CALLBACKS *callbacks, DIFFUSION_CONNECTION *connection);
601 
612 char *diff_add_topic_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int (*listener)(DIFFUSION_MESSAGE *));
613 
614 
621 void diff_remove_topic_listener(DIFFUSION_CONNECTION *connection, const char *ref);
622 
633 char *diff_add_service_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int (*listener)(DIFFUSION_MESSAGE *));
634 
641 void diff_remove_service_listener(DIFFUSION_CONNECTION *connection, const char *ref);
642 
643 #endif
int diff_page_close(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page close&quot; command message.
int diff_page_first(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page first&quot; command message.
void diff_msg_add_header(DIFFUSION_MESSAGE *msg, const char *data)
Add a new header to an existing message.
DIFFUSION_MESSAGE * diff_decode_message(const int len, const char *buf, DIFFUSION_CONNECTION *connection)
Parses a raw message read from a socket into a DIFFUSION_MESSAGE structure.
int diff_reconnect(DIFFUSION_CONNECTION *connection)
Attempt to reconnect to a server which has been previously connected.
DIFFUSION_PAGE_NOTIFICATION * diff_create_page_notification(DIFFUSION_MESSAGE *msg)
Create and populate a DIFFUSION_PAGE_NOTIFICATION structure from a DIFFUSION_MESSAGE which contains a...
int diff_page_open(DIFFUSION_CONNECTION *connection, const char *topic, const int page_size, const int start_page)
Send a &quot;page open&quot; command message.
int diff_page_last(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page last&quot; command message.
int diff_ping(DIFFUSION_CONNECTION *connection)
Send a server ping request to a Diffusion server.
int diff_connect_request(DIFFUSION_CONNECTION *connection)
Send a connection request message to the Diffusion server, connecting as a C client.
void diff_remove_service_listener(DIFFUSION_CONNECTION *connection, const char *ref)
Remove a registered service listener.
void diff_free_connection(DIFFUSION_CONNECTION *connection)
Frees the memory associated with a Diffusion connection structure.
int diff_send_data_length(DIFFUSION_CONNECTION *connection, const char *topic, const char *data, const long length)
Convenience function for sending arbitrary length data to Diffusion.
int diff_fetch(DIFFUSION_CONNECTION *connection, const char *topic_set)
Fetch data from one or more topics.
void diff_debug_message(DIFFUSION_MESSAGE *msg)
Utility function for displaying a DIFFUSION_MESSAGE to the console.
int diff_page_prior(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page prior&quot; command message.
char * diff_add_topic_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int(*listener)(DIFFUSION_MESSAGE *))
Add a listener function for a topic or regular expression The listener should return 1 if the message...
int diff_ack_response(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send an ACK response to a message.
int diff_send_credentials(DIFFUSION_CONNECTION *connection, SECURITY_CREDENTIALS *credentials)
Send new credentials to Diffusion, or replace existing credentials.
void diff_main(DIFFUSION_CONNECTION *connection, LLIST *server_list, DIFFUSION_CALLBACKS *callbacks, int flags)
This function will begin a loop, receiving messages from Diffusion and invoking the appropriate callb...
int diff_ping_response(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send a client ping back to Diffusion in response to a client ping.
int diff_send_command(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message, const char *command, const char *correlation_id)
Send a command topic message.
void diff_remove_topic_listener(DIFFUSION_CONNECTION *connection, const char *ref)
Remove a registered topic listener.
char * diff_add_service_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int(*listener)(DIFFUSION_MESSAGE *))
Add a service topic listener.
DIFFUSION_MESSAGE * diff_dup_message(const DIFFUSION_MESSAGE *msg)
Copy an existing DIFFUSION_MESSAGE.
void diff_msg_request_ack(DIFFUSION_MESSAGE *msg)
Set the &quot;ACK requested&quot; flag on a DIFFUSION_MESSAGE.
int diff_unsubscribe(DIFFUSION_CONNECTION *connection, const char *topic_set)
Send a request to unsubscribe from one or more topics.
int diff_subscribe(DIFFUSION_CONNECTION *connection, const char *topic_set)
Send a subscription request to a connected Diffusion server.
DIFFUSION_MESSAGE * diff_create_message_length(const LLIST *header_list, const char *data, const long length)
Create a new DIFFUSION_MESSAGE with headers and data.
DIFFUSION_MESSAGE * diff_create_message(const LLIST *header_list, const char *data)
Create a new DIFFUSION_MESSAGE with headers and data.
int diff_fetch_correlated(DIFFUSION_CONNECTION *connection, const char *topic_set, LLIST *header_list)
Fetch data from Diffusion, with headers that are reflected back in the Initial Topic Load response(s)...
void diff_free_page_notification(DIFFUSION_PAGE_NOTIFICATION *notification)
Free memory associated with a DIFFUSION_PAGE_NOTIFICATION strucure.
DIFFUSION_CONNECTION * diff_connect_cascade(LLIST *server_list, int flags, const char *client_id)
Iterate through a list of DIFFUSION_CONNECTION pointers until a connection is successfully made...
void diff_free_message(DIFFUSION_MESSAGE *msg)
Free all memory associated with a DIFFUSION_MESSAGE.
int diff_disconnect(DIFFUSION_CONNECTION *connection)
Send a disconnection request to the Diffusion server and close the connection.
DIFFUSION_MESSAGE * diff_read_message(DIFFUSION_CONNECTION *connection)
Read a message from Diffusion connection.
int diff_send_data(DIFFUSION_CONNECTION *connection, const char *topic, const char *data)
Convenience function for sending NULL-terminated data to Diffusion.
int diff_send_message(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send a message to Diffusion.
void diff_loop(DIFFUSION_CONNECTION *connection, DIFFUSION_CALLBACKS *callbacks)
Calling this function will initiate a processing loop, where messages from Diffusion are read and the...
DIFFUSION_MESSAGE * diff_wait_for_message(DIFFUSION_CONNECTION *connection, long timeout)
Wait for a message from the Diffusion server and return it.
int diff_page_next(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page next&quot; command message.
DIFFUSION_CONNECTION * diff_reconnect_server(DIFFUSION_SERVER_DETAILS *details, const char *client_id)
If a client_id is supplied, attempt to reconnect to a Diffusion server, restoring state and allowing ...
DIFFUSION_CONNECTION * diff_connect(const char *hostname, const int port, const SECURITY_CREDENTIALS *credentials)
Connect to a Diffusion server.
void diff_dispatch(DIFFUSION_MESSAGE *msg, DIFFUSION_CALLBACKS *callbacks, DIFFUSION_CONNECTION *connection)
Call the relevant callback function for the given message.
DIFFUSION_CONNECTION * diff_connect_server(DIFFUSION_SERVER_DETAILS *details)
Connect to a Diffusion server as an External Client.
int diff_page_number(DIFFUSION_CONNECTION *connection, const char *topic, const int page_number)
Send a &quot;page number&quot; command message.
int diff_page_refresh(DIFFUSION_CONNECTION *connection, const char *topic)
Send a &quot;page refresh&quot; command message.
Definition: llist.h:24