25 #define _diffusion_h_ 1
31 #define DIFFUSION_SUCCESS 0
32 #define DIFFUSION_ERROR -1
33 #define DIFFUSION_INVALID_PROTOCOL_VERSION -2
35 #define DIFFUSION_MD 0x00
36 #define DIFFUSION_RD 0x01
37 #define DIFFUSION_FD 0x02
39 #define DIFFUSION_MSG_TOPIC_LOAD 0x14
40 #define DIFFUSION_MSG_DELTA 0x15
41 #define DIFFUSION_MSG_SUBSCRIBE 0x16
42 #define DIFFUSION_MSG_UNSUBSCRIBE 0x17
43 #define DIFFUSION_MSG_PING_SERVER 0x18
44 #define DIFFUSION_MSG_PING_CLIENT 0x19
45 #define DIFFUSION_MSG_CREDENTIALS 0x1A
46 #define DIFFUSION_MSG_CREDENTIALS_REJECTED 0x1B
47 #define DIFFUSION_ABORT_NOTIFICATION 0x1C
48 #define DIFFUSION_CLOSE_REQUEST 0x1D
49 #define DIFFUSION_MSG_TOPIC_LOAD_ACK 0x1E
50 #define DIFFUSION_MSG_DELTA_ACK 0x1F
51 #define DIFFUSION_MSG_ACK 0x20
52 #define DIFFUSION_MSG_FETCH 0x21
53 #define DIFFUSION_MSG_FETCH_REPLY 0x22
54 #define DIFFUSION_MSG_TOPIC_STATUS_NOTIFICATION 0x23
55 #define DIFFUSION_MSG_COMMAND 0x24
56 #define DIFFUSION_MSG_COMMAND_TOPIC_LOAD 0x28
57 #define DIFFUSION_MSG_COMMAND_TOPIC_NOTIFICATION 0x29
59 #define DIFFUSION_FLAG_RECONNECT 0x01
60 #define DIFFUSION_FLAG_LOAD_BALANCE 0x02
62 #define DIFFUSION_COMMAND_ID_SERVICE_TOPIC "0"
63 #define DIFFUSION_COMMAND_ID_PAGED_TOPIC "1"
65 #define DIFFUSION_PAGINATED_DATA_TYPE_STRING "PS"
66 #define DIFFUSION_PAGINATED_DATA_TYPE_RECORD "PR"
68 #define DIFFUSION_DEBUG(...) if(_diffusion_debug) { printf(__VA_ARGS__); }
70 static const char *DIFFUSION_VERSION __attribute__ ((unused))=
"${product.short.version} ${product.build}#${product.build.changelist}";
78 unsigned char protocol_byte;
79 unsigned char protocol_version;
80 unsigned char response;
81 unsigned char message_length_size;
86 int current_page_number;
88 int total_number_of_lines;
103 int connection_attempt_count;
108 unsigned char protocol_byte;
109 unsigned char protocol_version;
110 unsigned char response;
111 unsigned char message_length_size;
114 int free_server_details;
115 long last_interaction_ms;
116 LLIST *topic_alias_list;
117 LLIST *topic_listener_list;
118 LLIST *service_listener_list;
119 pthread_mutex_t _mutex_write;
120 pthread_mutex_t _mutex_read;
124 unsigned char message_type;
125 unsigned char encoding;
130 unsigned long data_length;
141 void (*on_disconnect)(void);
147 #define DIFF_CB_ZERO(cb) memset(&cb, 0, sizeof(cb))
149 extern int _diffusion_debug;
int diff_page_close(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page close" command message.
int diff_page_first(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page first" command message.
void diff_msg_add_header(DIFFUSION_MESSAGE *msg, const char *data)
Add a new header to an existing message.
DIFFUSION_MESSAGE * diff_decode_message(const int len, const char *buf, DIFFUSION_CONNECTION *connection)
Parses a raw message read from a socket into a DIFFUSION_MESSAGE structure.
int diff_reconnect(DIFFUSION_CONNECTION *connection)
Attempt to reconnect to a server which has been previously connected.
DIFFUSION_PAGE_NOTIFICATION * diff_create_page_notification(DIFFUSION_MESSAGE *msg)
Create and populate a DIFFUSION_PAGE_NOTIFICATION structure from a DIFFUSION_MESSAGE which contains a...
int diff_page_open(DIFFUSION_CONNECTION *connection, const char *topic, const int page_size, const int start_page)
Send a "page open" command message.
int diff_page_last(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page last" command message.
int diff_ping(DIFFUSION_CONNECTION *connection)
Send a server ping request to a Diffusion server.
int diff_connect_request(DIFFUSION_CONNECTION *connection)
Send a connection request message to the Diffusion server, connecting as a C client.
void diff_remove_service_listener(DIFFUSION_CONNECTION *connection, const char *ref)
Remove a registered service listener.
void diff_free_connection(DIFFUSION_CONNECTION *connection)
Frees the memory associated with a Diffusion connection structure.
int diff_send_data_length(DIFFUSION_CONNECTION *connection, const char *topic, const char *data, const long length)
Convenience function for sending arbitrary length data to Diffusion.
int diff_fetch(DIFFUSION_CONNECTION *connection, const char *topic_set)
Fetch data from one or more topics.
void diff_debug_message(DIFFUSION_MESSAGE *msg)
Utility function for displaying a DIFFUSION_MESSAGE to the console.
int diff_page_prior(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page prior" command message.
char * diff_add_topic_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int(*listener)(DIFFUSION_MESSAGE *))
Add a listener function for a topic or regular expression The listener should return 1 if the message...
int diff_ack_response(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send an ACK response to a message.
int diff_send_credentials(DIFFUSION_CONNECTION *connection, SECURITY_CREDENTIALS *credentials)
Send new credentials to Diffusion, or replace existing credentials.
void diff_main(DIFFUSION_CONNECTION *connection, LLIST *server_list, DIFFUSION_CALLBACKS *callbacks, int flags)
This function will begin a loop, receiving messages from Diffusion and invoking the appropriate callb...
int diff_ping_response(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send a client ping back to Diffusion in response to a client ping.
int diff_send_command(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message, const char *command, const char *correlation_id)
Send a command topic message.
void diff_remove_topic_listener(DIFFUSION_CONNECTION *connection, const char *ref)
Remove a registered topic listener.
char * diff_add_service_listener(DIFFUSION_CONNECTION *connection, const char *topic_pattern, int(*listener)(DIFFUSION_MESSAGE *))
Add a service topic listener.
DIFFUSION_MESSAGE * diff_dup_message(const DIFFUSION_MESSAGE *msg)
Copy an existing DIFFUSION_MESSAGE.
void diff_msg_request_ack(DIFFUSION_MESSAGE *msg)
Set the "ACK requested" flag on a DIFFUSION_MESSAGE.
int diff_unsubscribe(DIFFUSION_CONNECTION *connection, const char *topic_set)
Send a request to unsubscribe from one or more topics.
int diff_subscribe(DIFFUSION_CONNECTION *connection, const char *topic_set)
Send a subscription request to a connected Diffusion server.
DIFFUSION_MESSAGE * diff_create_message_length(const LLIST *header_list, const char *data, const long length)
Create a new DIFFUSION_MESSAGE with headers and data.
DIFFUSION_MESSAGE * diff_create_message(const LLIST *header_list, const char *data)
Create a new DIFFUSION_MESSAGE with headers and data.
int diff_fetch_correlated(DIFFUSION_CONNECTION *connection, const char *topic_set, LLIST *header_list)
Fetch data from Diffusion, with headers that are reflected back in the Initial Topic Load response(s)...
void diff_free_page_notification(DIFFUSION_PAGE_NOTIFICATION *notification)
Free memory associated with a DIFFUSION_PAGE_NOTIFICATION strucure.
DIFFUSION_CONNECTION * diff_connect_cascade(LLIST *server_list, int flags, const char *client_id)
Iterate through a list of DIFFUSION_CONNECTION pointers until a connection is successfully made...
void diff_free_message(DIFFUSION_MESSAGE *msg)
Free all memory associated with a DIFFUSION_MESSAGE.
int diff_disconnect(DIFFUSION_CONNECTION *connection)
Send a disconnection request to the Diffusion server and close the connection.
DIFFUSION_MESSAGE * diff_read_message(DIFFUSION_CONNECTION *connection)
Read a message from Diffusion connection.
int diff_send_data(DIFFUSION_CONNECTION *connection, const char *topic, const char *data)
Convenience function for sending NULL-terminated data to Diffusion.
int diff_send_message(DIFFUSION_CONNECTION *connection, DIFFUSION_MESSAGE *message)
Send a message to Diffusion.
void diff_loop(DIFFUSION_CONNECTION *connection, DIFFUSION_CALLBACKS *callbacks)
Calling this function will initiate a processing loop, where messages from Diffusion are read and the...
DIFFUSION_MESSAGE * diff_wait_for_message(DIFFUSION_CONNECTION *connection, long timeout)
Wait for a message from the Diffusion server and return it.
int diff_page_next(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page next" command message.
DIFFUSION_CONNECTION * diff_reconnect_server(DIFFUSION_SERVER_DETAILS *details, const char *client_id)
If a client_id is supplied, attempt to reconnect to a Diffusion server, restoring state and allowing ...
DIFFUSION_CONNECTION * diff_connect(const char *hostname, const int port, const SECURITY_CREDENTIALS *credentials)
Connect to a Diffusion server.
void diff_dispatch(DIFFUSION_MESSAGE *msg, DIFFUSION_CALLBACKS *callbacks, DIFFUSION_CONNECTION *connection)
Call the relevant callback function for the given message.
DIFFUSION_CONNECTION * diff_connect_server(DIFFUSION_SERVER_DETAILS *details)
Connect to a Diffusion server as an External Client.
int diff_page_number(DIFFUSION_CONNECTION *connection, const char *topic, const int page_number)
Send a "page number" command message.
int diff_page_refresh(DIFFUSION_CONNECTION *connection, const char *topic)
Send a "page refresh" command message.