Example: Create a JSON topic
The following examples create a JSON topic and receive a stream of values from the topic.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier // to assign these directly to a local variable. var jsonDataType = diffusion.datatypes.json(); // 2. Data Types are currently provided for JSON and Binary topic types. session.topics.add('topic/json', diffusion.topics.TopicType.JSON); // 3. Values can be created directly from the data type. var jsonValue = jsonDataType.from({ "foo" : "bar" }); // Topics are updated using the standard update mechanisms session.topics.update('topic/json', jsonValue); // Subscriptions are performed normally session.subscribe('topic/json'); // 4. Streams can be specialised to provide values from a specific datatype. session.stream('topic/json').asType(jsonDataType).on('value', function(topic, specification, newValue, oldValue) { // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the // new value, and the old value. // The oldValue parameter will be undefined if this is the first value received for a topic. // For JSON topics, value#get returns a JavaScript object // For Binary topics, value#get returns a Buffer instance console.log("Update for " + topic, newValue.get()); }); // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. // For example, plain JSON objects can be used to update JSON topics. session.topics.update('topic/json', { "foo" : "baz", "numbers" : [1, 2, 3] }); });
Java
and Android
package com.pushtechnology.diffusion.examples; import static java.util.Objects.requireNonNull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import com.fasterxml.jackson.dataformat.cbor.CBORGenerator; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.Registration; import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateContextCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionClosedException; import com.pushtechnology.diffusion.client.topics.details.TopicType; import com.pushtechnology.diffusion.datatype.json.JSON; import com.pushtechnology.diffusion.datatype.json.JSONDataType; /** * This example shows a control client creating a JSON topic and sending updates * to it. * <P> * There will be a topic for each currency for which rates are provided. The * topic will be created under the FX topic - so, for example FX/GBP will * contain a map of all rate conversions from the base GBP currency. The rates * are represented as string decimal values (e.g. "12.457"). * <P> * The {@code addRates} method shows how to create a new rates topic, specifying * its initial map of values. * <P> * The {@code changeRates} method which takes a map shows how to completely * replace the set of rates for a currency with a new map of rates. * <P> * The {@code changeRates} method which takes a string shows an alternative * mechanism where the new rates are simply supplied as a JSON string. * <P> * Either of the changeRates methods could be used and after the first usage for * any topic the values is cached, and so subsequent set calls can compare with * the last value and send only the differences to the server. * * @author Push Technology Limited * @since 5.7 * @see ClientConsumingJSONTopics */ public final class ControlClientUpdatingJSONTopics { private static final String ROOT_TOPIC = "FX"; private final Session session; private final TopicControl topicControl; private volatile TopicUpdateControl.ValueUpdater<JSON> valueUpdater; private volatile Registration updateSourceRegistration; private final CBORFactory cborFactory = new CBORFactory(); private final JSONDataType jsonDataType = Diffusion.dataTypes().json(); /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" */ public ControlClientUpdatingJSONTopics(String serverUrl) { cborFactory.setCodec(new ObjectMapper()); session = Diffusion.sessions().principal("control").password("password") .open(serverUrl); topicControl = session.feature(TopicControl.class); // Register as an updater for all topics under the root and request // that all topics created are removed when the session closes session.feature(TopicUpdateControl.class).registerUpdateSource( ROOT_TOPIC, new UpdateSource.Default() { @Override public void onRegistered( String topicPath, Registration registration) { updateSourceRegistration = registration; } @Override public void onActive(String topicPath, Updater updater) { topicControl.removeTopicsWithSession( ROOT_TOPIC, new TopicTreeHandler.Default()); valueUpdater = updater.valueUpdater(JSON.class); } @Override public void onClose(String topicPath) { session.close(); } }); } /** * Add a new rates topic. * * @param currency the base currency * @param values the full map of initial rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void addRates( String currency, Map<String, String> values, AddContextCallback<String> callback) throws IOException { topicControl.addTopic( rateTopicName(currency), TopicType.JSON, mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings. * * @param currency the base currency * @param values the new rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void changeRates( String currency, Map<String, String> values, UpdateContextCallback<String> callback) throws IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings specified as a JSON string, for example * {"USD":"123.45","HKD":"456.3"}. * * @param currency the base currency * @param jsonString a JSON string specifying the map of currency rates * @param callback reports the outcome * @throws IOException if unable to convert string */ public void changeRates( String currency, String jsonString, UpdateContextCallback<String> callback) throws SessionClosedException, IllegalArgumentException, IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), jsonDataType.fromJsonString(jsonString), currency, callback); } /** * Convert a given map to a JSON object. */ private JSON mapToJSON(Map<String, String> values) throws IOException { // Use the third-party Jackson library to write out the values map as a // CBOR-format binary. final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CBORGenerator generator = cborFactory.createGenerator(baos); generator.writeObject(values); return jsonDataType.readValue(baos.toByteArray()); } /** * Remove a rates entry (removes its topic) and clear cached value for the * topic. * * @param currency the currency * * @param callback reports the outcome */ public void removeRates( String currency, RemovalContextCallback<String> callback) { final String topicName = rateTopicName(currency); if (valueUpdater != null) { valueUpdater.removeCachedValues(topicName); } topicControl.remove(topicName, currency, callback); } /** * Close the session. */ public void close() { updateSourceRegistration.close(); } /** * Generate a hierarchical topic name for a rates topic. * <P> * e.g. for currency=GBP would return "FX/GBP". * * @param currency the currency * @return the topic name */ private static String rateTopicName(String currency) { return String.format("%s/%s", ROOT_TOPIC, requireNonNull(currency)); } }
.NET
using System; using System.Threading; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Data.JSON; using PushTechnology.ClientInterface.Examples.Runner; namespace PushTechnology.ClientInterface.Examples.Control { /// <summary> /// Control Client implementation that Adds and Updates a JSON topic. /// </summary> public sealed class UpdatingJSONTopics : IExample { /// <summary> /// Runs the JSON topic Control Client example. /// </summary> /// <param name="cancellationToken">A token used to end the client example.</param> /// <param name="args">A single string should be used for the server url.</param> public void Run( CancellationToken cancellationToken, string[] args ) { var serverUrl = args[ 0 ]; var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ).Open( serverUrl ); var topicControl = session.GetTopicControlFeature(); var updateControl = session.GetTopicUpdateControlFeature(); // Create a JSON topic 'random/JSON' var topic = "random/JSON"; var addCallback = new AddCallback(); topicControl.AddTopicFromValue( topic, Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"To be updated\",\"time\":\"To be updated\"}" ), addCallback ); // Wait for the OnTopicAdded callback, or a failure if ( !addCallback.Wait( TimeSpan.FromSeconds( 5 ) ) ) { Console.WriteLine( "Callback not received within timeout." ); session.Close(); return; } else if ( addCallback.Error != null ) { Console.WriteLine( "Error : {0}", addCallback.Error.ToString() ); session.Close(); return; } // Update topic every 300 ms until user requests cancellation of enxample var updateCallback = new UpdateCallback( topic ); while ( !cancellationToken.IsCancellationRequested ) { var newValue = Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"" + DateTime.Today.Date.ToString( "D" ) + "\"," + "\"time\":\"" + DateTime.Now.TimeOfDay.ToString( "g" ) + "\"}" ); updateControl.Updater.ValueUpdater<IJSON>().Update( topic, newValue, updateCallback ); Thread.Sleep( 300 ); } // Remove the JSON topic 'random/JSON' var removeCallback = new RemoveCallback( topic ); topicControl.Remove( topic, removeCallback ); if ( !removeCallback.Wait( TimeSpan.FromSeconds( 5 ) ) ) { Console.WriteLine( "Callback not received within timeout." ); } else if ( removeCallback.Error != null ) { Console.WriteLine( "Error : {0}", removeCallback.Error.ToString() ); } // Close the session session.Close(); } } /// <summary> /// Basic implementation of the ITopicControlAddCallback. /// </summary> internal sealed class AddCallback : ITopicControlAddCallback { private readonly AutoResetEvent resetEvent = new AutoResetEvent( false ); /// <summary> /// Any error from this AddCallback will be stored here. /// </summary> public Exception Error { get; private set; } /// <summary> /// Constructor. /// </summary> public AddCallback() { Error = null; } /// <summary> /// This is called to notify that a call context was closed prematurely, typically due to a timeout or the /// session being closed. /// </summary> /// <remarks> /// No further calls will be made for the context. /// </remarks> public void OnDiscard() { Error = new Exception( "This context was closed prematurely." ); resetEvent.Set(); } /// <summary> /// This is called to notify that the topic has been added. /// </summary> /// <param name="topicPath">The full path of the topic that was added.</param> public void OnTopicAdded( string topicPath ) { Console.WriteLine( "Topic {0} added.", topicPath ); resetEvent.Set(); } /// <summary> /// This is called to notify that an attempt to add a topic has failed. /// </summary> /// <param name="topicPath">The topic path as supplied to the add request.</param> /// <param name="reason">The reason for failure.</param> public void OnTopicAddFailed( string topicPath, TopicAddFailReason reason ) { Error = new Exception( string.Format( "Failed to add topic {0} : {1}", topicPath, reason ) ); resetEvent.Set(); } /// <summary> /// Wait for one of the callbacks for a given time. /// </summary> /// <param name="timeout">Time to wait for the callback.</param> /// <returns><c>true</c> if either of the callbacks has been triggered. Otherwise <c>false</c>.</returns> public bool Wait( TimeSpan timeout ) { return resetEvent.WaitOne( timeout ); } } /// <summary> /// A simple ITopicUpdaterUpdateCallback implementation that prints confimation of the actions completed. /// </summary> internal sealed class UpdateCallback : ITopicUpdaterUpdateCallback { private readonly string topicPath; /// <summary> /// Constructor. /// </summary> /// <param name="topicPath">The topic path.</param> public UpdateCallback( string topicPath ) { this.topicPath = topicPath; } /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <remarks> /// Situations in which <code>OnError</code> is called include the session being closed, a communication /// timeout, or a problem with the provided parameters. No further calls will be made to this callback. /// </remarks> /// <param name="errorReason">A value representing the error.</param> public void OnError( ErrorReason errorReason ) { Console.WriteLine( "Topic {0} could not be updated : {1}", topicPath, errorReason ); } /// <summary> /// Indicates a successful update. /// </summary> public void OnSuccess() { Console.WriteLine( "Topic {0} updated successfully.", topicPath ); } } /// <summary> /// Basic implementation of the ITopicRemovalCallback. /// </summary> internal sealed class RemoveCallback : ITopicControlRemovalCallback { private readonly AutoResetEvent resetEvent = new AutoResetEvent( false ); private readonly string topicPath; /// <summary> /// Any error from this AddCallback will be stored here. /// </summary> public Exception Error { get; private set; } /// <summary> /// Constructor. /// </summary> /// <param name="topicPath">The topic path.</param> public RemoveCallback( string topicPath ) { this.topicPath = topicPath; Error = null; } /// <summary> /// Notification that a call context was closed prematurely, typically due to a timeout or /// the session being closed. /// </summary> /// <param name="errorReason">The error reason.</param> /// <remarks> /// No further calls will be made for the context. /// </remarks> public void OnError( ErrorReason errorReason ) { Error = new Exception( "This context was closed prematurely. Reason=" + errorReason.ToString() ); resetEvent.Set(); } /// <summary> /// Topic(s) have been removed. /// </summary> public void OnTopicsRemoved() { Console.WriteLine( "Topic {0} removed", topicPath ); resetEvent.Set(); } /// <summary> /// Wait for one of the callbacks for a given time. /// </summary> /// <param name="timeout">Time to wait for the callback.</param> /// <returns><c>true</c> if either of the callbacks has been triggered, and <c>false</c> otherwise.</returns> public bool Wait( TimeSpan timeout ) { return resetEvent.WaitOne( timeout ); } } }
Change the URL from that provided in the example to the URL of the Diffusion™ server.