Table of Contents
Just a second...

Specifying a reconnection strategy

Reconnection behavior can be configured using custom reconnection strategies.

The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.

Reconnection can only succeed if the client session is still available on the Diffusion™ server. The maximum time that the Diffusion server keeps client sessions in the DISCONNECTED state before closing them can be configured using the Connectors.xml configuration file. For more information, see Configuring connectors.

Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to the Diffusion server

Examples

JavaScript
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.

// Set the maximum amount of time we'll try and reconnect for to 10 minutes
var maximumTimeoutDuration = 1000 * 60 * 10;

// Set the maximum interval between reconnect attempts to 60 seconds
var maximumAttemptInterval = 1000 * 60;

// Set an upper limit to the number of times we'll try to reconnect for
var maximumAttempts = 25;

// Count the number of reconnection attempts we've made
var attempts = 0;

// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
var reconnectionStrategy = function(start, abort) {
    if (attempts > maximumAttempts) {
        abort();
    } else {
        var wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);

        // Wait the specified time period, and then start the reconnection attempt
        setTimeout(start, wait);
    }
};

// Connect to the server.
diffusion.connect({
    host : 'diffusion.example.com',
    port : 443,
    secure : true,
    principal : 'control',
    credentials : 'password',
    reconnect : {
        timeout : maximumTimeoutDuration,
        strategy : reconnectionStrategy
    }
}).then(function(session) {

    session.on('disconnect', function() {
        // This will be called when we lose connection. Because we've specified the 
        // reconnection strategy, it will be called automatically when this event
        // is dispatched
    });

    session.on('reconnect', function() {
        // If the session is able to reconnect within the reconnect timeout, this
        // event will be dispatched to notify that normal operations may resume
        attempts = 0;
    });

    session.on('close', function() {
        // If the session is closed normally, or the session is unable to reconnect,
        // this event will be dispatched to notify that the session is no longer
        // operational.
    });
});
Apple
@import Diffusion;

@interface ExponentialBackoffReconnectionStrategy : NSObject <PTDiffusionSessionReconnectionStrategy>
@end

@implementation CustomReconnectionStrategyExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    NSLog(@"Connecting...");

    PTDiffusionMutableSessionConfiguration *const sessionConfiguration =
        [PTDiffusionMutableSessionConfiguration new];

    // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
    sessionConfiguration.reconnectionTimeout = @(10.0 * 60.0); // seconds

    // Set the reconnection strategy to be used.
    sessionConfiguration.reconnectionStrategy = [ExponentialBackoffReconnectionStrategy new];

    // Start connecting asynchronously.
    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;
    }];
}

@end

@implementation ExponentialBackoffReconnectionStrategy {
    NSUInteger _attemptCount;
}

-(void)         diffusionSession:(PTDiffusionSession *const)session
    wishesToReconnectWithAttempt:(PTDiffusionSessionReconnectionAttempt *const)attempt {
    // Limit the maximum time to delay between reconnection attempts to 60 seconds.
    const NSTimeInterval maximumAttemptInterval = 60.0;

    // Compute delay for exponential backoff based on the number of attempts so far.
    const NSTimeInterval delay = MIN(pow(2.0, _attemptCount++) * 0.1, maximumAttemptInterval);

    // Schedule asynchronous execution.
    NSLog(@"Reconnection attempt scheduled for %.2fs", delay);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)),
        dispatch_get_main_queue(), ^
    {
        NSLog(@"Attempting reconnection.");
        [attempt start];
    });
}

@end
                    
Java and Android
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.Listener;
import com.pushtechnology.diffusion.client.session.Session.State;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;


/**
 * This example class demonstrates the ability to set a custom {@link ReconnectionStrategy}
 * when creating sessions.
 *
 * @author Push Technology Limited
 * @since 5.5
 */
public class ClientWithReconnectionStrategy {

    private volatile int retries = 0;
    /**
     * Constructor.
     */
    public ClientWithReconnectionStrategy() {

        // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
        final int maximumTimeoutDuration = 1000 * 60 * 10;

        // Set the maximum interval between reconnect attempts to 60 seconds.
        final long maximumAttemptInterval = 1000 * 60;

        // Create a new reconnection strategy that applies an exponential backoff
        final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {
            private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            @Override
            public void performReconnection(final ReconnectionAttempt reconnection) {
                final long exponentialWaitTime =
                    Math.min((long) Math.pow(2,  retries++) * 100L, maximumAttemptInterval);

                scheduler.schedule(new Runnable() {
                    @Override
                    public void run() {
                        reconnection.start();
                    }
                }, exponentialWaitTime, TimeUnit.MILLISECONDS);
            }
        };

        final Session session = Diffusion.sessions().reconnectionTimeout(maximumTimeoutDuration)
                                                    .reconnectionStrategy(reconnectionStrategy)
                                                    .open("ws://diffusion.example.com:80");
        session.addListener(new Listener() {
            @Override
            public void onSessionStateChanged(Session session, State oldState, State newState) {

                if (newState == State.RECOVERING_RECONNECT) {
                    // The session has been disconnected, and has entered recovery state. It is during this state that
                    // the reconnect strategy will be called
                }

                if (newState == State.CONNECTED_ACTIVE) {
                    // The session has connected for the first time, or it has been reconnected.
                    retries = 0;
                }

                if (oldState == State.RECOVERING_RECONNECT) {
                    // The session has left recovery state. It may either be attempting to reconnect, or the attempt has
                    // been aborted; this will be reflected in the newState.
                }
            }
        });
    }
}
.NET
using System;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Session.Reconnection;

namespace Examples {
    /// <summary>
    /// These examples show how to configure and enable the reconnection feature of the API.
    /// Every method represents a different selfcontained example.
    /// </summary>
    public class ClientReconnection {
        private static int counter = 0;

        /// <summary>
        /// Sets the reconnection timeout that represents the duration in which the client is trying to reconnect to
        /// the server.
        /// If we are not reconnected after the timeout, the client will close the session.
        /// </summary>
        public void SetReconnectionTimeout() {
            // The timeout is set in milliseconds and should be high enough to
            // account for actual reconnection time
            var sessionFactory = Diffusion.Sessions.ReconnectionTimeout( 60000 );
        }

        /// <summary>
        /// Disables the reconnection feature.
        /// </summary>
        public void DisableReconnection() {
            // This will disable the reconnection feature completely and instead of switching to the RECOVERING_RECONNECT
            // session state it will switch straight to CLOSED_BY_SERVER.
            var sessionFactoryNoReconnection = Diffusion.Sessions.NoReconnection();

            // This call has exactly the same effect as the above statement.
            var sessionFactoryNoTimeout = Diffusion.Sessions.ReconnectionTimeout( 0 );
        }

        /// <summary>
        /// This is a custom reconnection strategy that will try to reconnect
        /// to the server up to 3 times and then abort.
        /// </summary>
        public class MyReconnectionStrategy : IReconnectionStrategy {
            /// <summary>
            /// Here we put our actual reconnection logic. The async keyword should always be added since it makes
            /// things easier for a void return type.
            /// </summary>
            /// <param name="reconnectionAttempt">The reconnection attempt wil be given by the session.</param>
            public async Task PerformReconnection( IReconnectionAttempt reconnectionAttempt ) {
                ++counter;
                if ( counter <= 3 ) {
                    // We start the next reconnection attempt
                    reconnectionAttempt.Start();
                } else {
                    counter = 0;

                    // We abort any other reconnection attempt and let the session switch to CLOSED_BY_SERVER.
                    reconnectionAttempt.Abort();
                }
            }
        }

        /// <summary>
        /// This applies the custom reconnection strategy.
        /// </summary>
        public void SetCustomReconnectionStrategy() {
            // We don't need to hold a reference to the reconnection strategy
            var sessionFactoryWithCustomStrategy = Diffusion.Sessions.ReconnectionStrategy( new MyReconnectionStrategy() );
        }

        /// <summary>
        /// Reconnection can be observed via session state changes within the SessionStateChangeHandler.
        /// </summary>
        public void ObserveReconnection() {
            var sessionFactory = Diffusion.Sessions.SessionStateChangedHandler( ( sender, args ) => {
                if ( args.NewState.Equals( SessionState.RECOVERING_RECONNECT ) ) {
                    // This will be set on a connection loss and indicates a reconnection attempt.
                    // Unless reconnection is disabled, at which point the session never gets switched to this state.
                    Console.WriteLine( "We are in the process of reconnecting." );
                } else if ( args.NewState.Equals( SessionState.CONNECTION_ATTEMPT_FAILED ) ) {
                    // If a reconnection attempt fails because the server session timed out, we won't be able
                    // to reconnect anymore. At which point the session will switch to this state.
                    Console.WriteLine( "We couldn't connect." );
                } else if ( args.NewState.Equals( SessionState.CLOSED_BY_SERVER ) ) {
                    // If the reconnection timeout is over, we will switch to this state. In case of disabled
                    // reconnection we will switch directly to this state on a connection loss.
                    Console.WriteLine( "We lost connection." );
                } else if ( args.NewState.Equals( SessionState.CONNECTED_ACTIVE ) ) {
                    // This is the obvious state on our first connection. It is also the state to which we switch
                    // after a successful reconnection attempt.
                    Console.WriteLine( "We are connected." );
                    counter = 0;
                }
            } );
        }
    }
}
C
/*
 * This example shows how to make a synchronous connection to
 * Diffusion, with user-provided reconnection logic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr_time.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'s', "sleep", "Time to sleep before disconnecting (in seconds).", ARG_OPTIONAL, ARG_HAS_VALUE, "5" },
        END_OF_ARG_OPTS
};

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void
on_session_state_changed(SESSION_T *session,
        const SESSION_STATE_T old_state,
        const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}

typedef struct {
        long current_wait;
        long max_wait;
} BACKOFF_STRATEGY_ARGS_T;

static RECONNECTION_ATTEMPT_ACTION_T
backoff_reconnection_strategy(SESSION_T *session, void *args)
{
        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        printf("Waiting for %ld ms\n", backoff_args->current_wait);

        apr_sleep(backoff_args->current_wait * 1000); // µs -> ms

        // But only up to some maximum time.
        if(backoff_args->current_wait > backoff_args->max_wait) {
                backoff_args->current_wait = backoff_args->max_wait;
        }

        return RECONNECTION_ATTEMPT_ACTION_START;
}

static void
backoff_success(SESSION_T *session, void *args)
{
        printf("Reconnection successful\n");

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
        backoff_args->current_wait = 0; // Reset wait.
}

static void
backoff_failure(SESSION_T *session, void *args)
{
        printf("Reconnection failed (%s)\n", session_state_as_string(session->state));

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        // Exponential backoff.
        if(backoff_args->current_wait == 0) {
                backoff_args->current_wait = 1;
        }
        else {
                backoff_args->current_wait *= 2;
        }
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        const unsigned int sleep_time = atol(hash_get(options, "sleep"));

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        SESSION_LISTENER_T session_listener = { 0 };
        session_listener.on_state_changed = &on_session_state_changed;

        /*
         * Set the arguments to our exponential backoff strategy.
         */
        BACKOFF_STRATEGY_ARGS_T *backoff_args = calloc(1, sizeof(BACKOFF_STRATEGY_ARGS_T));
        backoff_args->current_wait = 0;
        backoff_args->max_wait = 5000;

        /*
         * Create the backoff strategy.
         */
        RECONNECTION_STRATEGY_T *reconnection_strategy =
                make_reconnection_strategy_user_function(backoff_reconnection_strategy,
                                                         backoff_args,
                                                         backoff_success,
                                                         backoff_failure,
                                                         NULL);

        /*
         * Only ever retry for 30 seconds.
         */
        reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);

        /*
         * Create a session, synchronously.
         */
        session = session_create(url, principal, credentials, &session_listener, reconnection_strategy, &error);
        if(session != NULL) {
                char *sid_str = session_id_to_string(session->id);
                printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
                free(sid_str);
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
        }

        // With the exception of backoff_args, the reconnection strategy is
        // copied withing session_create() and may be freed now.
        free(reconnection_strategy);

        /*
         * Sleep for a while.
         */
        sleep(sleep_time);

        /*
         * Close the session, and release resources and memory.
         */
        session_close(session, NULL);
        session_free(session);

        free(backoff_args);

        credentials_free(credentials);
        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}