Table of Contents
Just a second...

Example: Create a JSON topic

The following examples create a JSON topic and receive a stream of values from the topic.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {

    // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier
    // to assign these directly to a local variable.
    var jsonDataType = diffusion.datatypes.json();

    // 2. Data Types are currently provided for JSON and Binary topic types.
    session.topics.add('topic/json', diffusion.topics.TopicType.JSON);

    // 3. Values can be created directly from the data type.
    var jsonValue = jsonDataType.from({
        "foo" : "bar"
    });

    // Topics are updated using the standard update mechanisms
    session.topics.update('topic/json', jsonValue);

    // Subscriptions are performed normally
    session.subscribe('topic/json');

    // 4. Streams can be specialised to provide values from a specific datatype.
    session.stream('topic/json').asType(jsonDataType).on('value', function(topic, specification, newValue, oldValue) {
        // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the
        // new value, and the old value.
   
        // The oldValue parameter will be undefined if this is the first value received for a topic.

        // For JSON topics, value#get returns a JavaScript object
        // For Binary topics, value#get returns a Buffer instance
        console.log("Update for " + topic, newValue.get());
    });

    // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. 
    // For example, plain JSON objects can be used to update JSON topics.
    session.topics.update('topic/json', {
         "foo" : "baz",
         "numbers" : [1, 2, 3]
    });
});
Java and Android
package com.pushtechnology.diffusion.examples;

import static java.util.Objects.requireNonNull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.fasterxml.jackson.dataformat.cbor.CBORGenerator;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.Registration;
import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalContextCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateContextCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;

/**
 * This example shows a control client creating a JSON topic and sending updates
 * to it.
 * <P>
 * There will be a topic for each currency for which rates are provided. The
 * topic will be created under the FX topic - so, for example FX/GBP will
 * contain a map of all rate conversions from the base GBP currency. The rates
 * are represented as string decimal values (e.g. "12.457").
 * <P>
 * The {@code addRates} method shows how to create a new rates topic, specifying
 * its initial map of values.
 * <P>
 * The {@code changeRates} method which takes a map shows how to completely
 * replace the set of rates for a currency with a new map of rates.
 * <P>
 * The {@code changeRates} method which takes a string shows an alternative
 * mechanism where the new rates are simply supplied as a JSON string.
 * <P>
 * Either of the changeRates methods could be used and after the first usage for
 * any topic the values is cached, and so subsequent set calls can compare with
 * the last value and send only the differences to the server.
 *
 * @author Push Technology Limited
 * @since 5.7
 * @see ClientConsumingJSONTopics
 */
public final class ControlClientUpdatingJSONTopics {

    private static final String ROOT_TOPIC = "FX";

    private final Session session;
    private final TopicControl topicControl;
    private volatile TopicUpdateControl.ValueUpdater<JSON> valueUpdater;
    private volatile Registration updateSourceRegistration;
    private final CBORFactory cborFactory = new CBORFactory();
    private final JSONDataType jsonDataType = Diffusion.dataTypes().json();

    /**
     * Constructor.
     *
     * @param serverUrl for example "ws://diffusion.example.com:80"
     */
    public ControlClientUpdatingJSONTopics(String serverUrl) {

        cborFactory.setCodec(new ObjectMapper());

        session =
            Diffusion.sessions().principal("control").password("password")
                .open(serverUrl);

        topicControl = session.feature(TopicControl.class);

        // Register as an updater for all topics under the root and request
        // that all topics created are removed when the session closes
        session.feature(TopicUpdateControl.class).registerUpdateSource(
            ROOT_TOPIC,
            new UpdateSource.Default() {
                @Override
                public void onRegistered(
                    String topicPath,
                    Registration registration) {
                    updateSourceRegistration = registration;
                }

                @Override
                public void onActive(String topicPath, Updater updater) {
                    topicControl.removeTopicsWithSession(
                        ROOT_TOPIC,
                        new TopicTreeHandler.Default());
                    valueUpdater = updater.valueUpdater(JSON.class);
                }

                @Override
                public void onClose(String topicPath) {
                    session.close();
                }
            });

    }

    /**
     * Add a new rates topic.
     *
     * @param currency the base currency
     * @param values the full map of initial rates values
     * @param callback reports outcome
     * @throws IOException if unable to convert rates map
     */
    public void addRates(
        String currency,
        Map<String, String> values,
        AddContextCallback<String> callback) throws IOException {

        topicControl.addTopic(
            rateTopicName(currency),
            TopicType.JSON,
            mapToJSON(values),
            currency,
            callback);
    }

    /**
     * Update an existing rates topic, replacing the rates mappings with a new
     * set of mappings.
     *
     * @param currency the base currency
     * @param values the new rates values
     * @param callback reports outcome
     * @throws IOException if unable to convert rates map
     */
    public void changeRates(
        String currency,
        Map<String, String> values,
        UpdateContextCallback<String> callback) throws IOException {

        if (valueUpdater == null) {
            throw new IllegalStateException("Not registered as updater");
        }

        valueUpdater.update(
            rateTopicName(currency),
            mapToJSON(values),
            currency,
            callback);
    }

    /**
     * Update an existing rates topic, replacing the rates mappings with a new
     * set of mappings specified as a JSON string, for example
     * {"USD":"123.45","HKD":"456.3"}.
     *
     * @param currency the base currency
     * @param jsonString a JSON string specifying the map of currency rates
     * @param callback reports the outcome
     * @throws IOException if unable to convert string
     */
    public void changeRates(
        String currency,
        String jsonString,
        UpdateContextCallback<String> callback) throws SessionClosedException,
        IllegalArgumentException, IOException {

        if (valueUpdater == null) {
            throw new IllegalStateException("Not registered as updater");
        }

        valueUpdater.update(
            rateTopicName(currency),
            jsonDataType.fromJsonString(jsonString),
            currency,
            callback);

    }

    /**
     * Convert a given map to a JSON object.
     */
    private JSON mapToJSON(Map<String, String> values) throws IOException {
        // Use the third-party Jackson library to write out the values map as a
        // CBOR-format binary.
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        final CBORGenerator generator = cborFactory.createGenerator(baos);
        generator.writeObject(values);
        return jsonDataType.readValue(baos.toByteArray());
    }

    /**
     * Remove a rates entry (removes its topic) and clear cached value for the
     * topic.
     *
     * @param currency the currency
     *
     * @param callback reports the outcome
     */
    public void removeRates(
        String currency,
        RemovalContextCallback<String> callback) {

        final String topicName = rateTopicName(currency);

        if (valueUpdater != null) {
            valueUpdater.removeCachedValues(topicName);
        }

        topicControl.remove(topicName, currency, callback);
    }

    /**
     * Close the session.
     */
    public void close() {
        updateSourceRegistration.close();
    }

    /**
     * Generate a hierarchical topic name for a rates topic.
     * <P>
     * e.g. for currency=GBP would return "FX/GBP".
     *
     * @param currency the currency
     * @return the topic name
     */
    private static String rateTopicName(String currency) {
        return String.format("%s/%s", ROOT_TOPIC, requireNonNull(currency));
    }

}
.NET
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Data.JSON;
using PushTechnology.ClientInterface.Examples.Runner;

namespace PushTechnology.ClientInterface.Examples.Control {

    /// <summary>
    /// Control Client implementation that Adds and Updates a JSON topic.
    /// </summary>
    public sealed class UpdatingJSONTopics : IExample {
        /// <summary>
        /// Runs the JSON topic Control Client example.
        /// </summary>
        /// <param name="cancellationToken">A token used to end the client example.</param>
        /// <param name="args">A single string should be used for the server url.</param>
        public void Run( CancellationToken cancellationToken, string[] args ) {
            var serverUrl = args[ 0 ];
            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ).Open( serverUrl );
            var topicControl = session.GetTopicControlFeature();
            var updateControl = session.GetTopicUpdateControlFeature();

            // Create a JSON topic 'random/JSON'
            var topic = "random/JSON";
            var addCallback = new AddCallback();
            topicControl.AddTopicFromValue(
                topic,
                Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"To be updated\",\"time\":\"To be updated\"}" ),
                addCallback );

            // Wait for the OnTopicAdded callback, or a failure
            if ( !addCallback.Wait( TimeSpan.FromSeconds( 5 ) ) ) {
                Console.WriteLine( "Callback not received within timeout." );
                session.Close();
                return;
            } else if ( addCallback.Error != null ) {
                Console.WriteLine( "Error : {0}", addCallback.Error.ToString() );
                session.Close();
                return;
            }

            // Update topic every 300 ms until user requests cancellation of enxample
            var updateCallback = new UpdateCallback( topic );
            while ( !cancellationToken.IsCancellationRequested ) {
                var newValue = Diffusion.DataTypes.JSON.FromJSONString(
                    "{\"date\":\"" + DateTime.Today.Date.ToString( "D" ) + "\"," +
                    "\"time\":\"" + DateTime.Now.TimeOfDay.ToString( "g" ) + "\"}" );
                updateControl.Updater.ValueUpdater<IJSON>().Update( topic, newValue, updateCallback );

                Thread.Sleep( 300 );
            }

            // Remove the JSON topic 'random/JSON'
            var removeCallback = new RemoveCallback( topic );
            topicControl.Remove( topic, removeCallback );
            if ( !removeCallback.Wait( TimeSpan.FromSeconds( 5 ) ) ) {
                Console.WriteLine( "Callback not received within timeout." );
            } else if ( removeCallback.Error != null ) {
                Console.WriteLine( "Error : {0}", removeCallback.Error.ToString() );
            }

            // Close the session
            session.Close();
        }
    }

    /// <summary>
    /// Basic implementation of the ITopicControlAddCallback.
    /// </summary>
    internal sealed class AddCallback : ITopicControlAddCallback {
        private readonly AutoResetEvent resetEvent = new AutoResetEvent( false );

        /// <summary>
        /// Any error from this AddCallback will be stored here.
        /// </summary>
        public Exception Error {
            get;
            private set;
        }

        /// <summary>
        /// Constructor.
        /// </summary>
        public AddCallback() {
            Error = null;
        }

        /// <summary>
        /// This is called to notify that a call context was closed prematurely, typically due to a timeout or the
        /// session being closed.
        /// </summary>
        /// <remarks>
        /// No further calls will be made for the context.
        /// </remarks>
        public void OnDiscard() {
            Error = new Exception( "This context was closed prematurely." );
            resetEvent.Set();
        }

        /// <summary>
        /// This is called to notify that the topic has been added.
        /// </summary>
        /// <param name="topicPath">The full path of the topic that was added.</param>
        public void OnTopicAdded( string topicPath ) {
            Console.WriteLine( "Topic {0} added.", topicPath );
            resetEvent.Set();
        }

        /// <summary>
        /// This is called to notify that an attempt to add a topic has failed.
        /// </summary>
        /// <param name="topicPath">The topic path as supplied to the add request.</param>
        /// <param name="reason">The reason for failure.</param>
        public void OnTopicAddFailed( string topicPath, TopicAddFailReason reason ) {
            Error = new Exception( string.Format( "Failed to add topic {0} : {1}", topicPath, reason ) );
            resetEvent.Set();
        }

        /// <summary>
        /// Wait for one of the callbacks for a given time.
        /// </summary>
        /// <param name="timeout">Time to wait for the callback.</param>
        /// <returns><c>true</c> if either of the callbacks has been triggered. Otherwise <c>false</c>.</returns>
        public bool Wait( TimeSpan timeout ) {
            return resetEvent.WaitOne( timeout );
        }
    }

    /// <summary>
    /// A simple ITopicUpdaterUpdateCallback implementation that prints confimation of the actions completed.
    /// </summary>
    internal sealed class UpdateCallback : ITopicUpdaterUpdateCallback {
        private readonly string topicPath;

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="topicPath">The topic path.</param>
        public UpdateCallback( string topicPath ) {
            this.topicPath = topicPath;
        }

        /// <summary>
        /// Notification of a contextual error related to this callback.
        /// </summary>
        /// <remarks>
        /// Situations in which <code>OnError</code> is called include the session being closed, a communication
        /// timeout, or a problem with the provided parameters. No further calls will be made to this callback.
        /// </remarks>
        /// <param name="errorReason">A value representing the error.</param>
        public void OnError( ErrorReason errorReason ) {
            Console.WriteLine( "Topic {0} could not be updated : {1}", topicPath, errorReason );
        }

        /// <summary>
        /// Indicates a successful update.
        /// </summary>
        public void OnSuccess() {
            Console.WriteLine( "Topic {0} updated successfully.", topicPath );
        }
    }

    /// <summary>
    /// Basic implementation of the ITopicRemovalCallback.
    /// </summary>
    internal sealed class RemoveCallback : ITopicControlRemovalCallback {
        private readonly AutoResetEvent resetEvent = new AutoResetEvent( false );
        private readonly string topicPath;

        /// <summary>
        /// Any error from this AddCallback will be stored here.
        /// </summary>
        public Exception Error {
            get;
            private set;
        }

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="topicPath">The topic path.</param>
        public RemoveCallback( string topicPath ) {
            this.topicPath = topicPath;
            Error = null;
        }

        /// <summary>
        /// Notification that a call context was closed prematurely, typically due to a timeout or
        /// the session being closed.
        /// </summary>
        /// <param name="errorReason">The error reason.</param>
        /// <remarks>
        /// No further calls will be made for the context.
        /// </remarks>
        public void OnError( ErrorReason errorReason ) {
            Error = new Exception( "This context was closed prematurely. Reason=" + errorReason.ToString() );
            resetEvent.Set();
        }

        /// <summary>
        /// Topic(s) have been removed.
        /// </summary>
        public void OnTopicsRemoved() {
            Console.WriteLine( "Topic {0} removed", topicPath );
            resetEvent.Set();
        }

        /// <summary>
        /// Wait for one of the callbacks for a given time.
        /// </summary>
        /// <param name="timeout">Time to wait for the callback.</param>
        /// <returns><c>true</c> if either of the callbacks has been triggered, and <c>false</c> otherwise.</returns>
        public bool Wait( TimeSpan timeout ) {
            return resetEvent.WaitOne( timeout );
        }
    }
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.