Tutorial - Generating Streams

You may have noticed that the previous example camera_sampler was only called once. In reality, we’d like to have an indefinite continuous stream of data, not just call it once! In many cyberphysical systems and IoT applications, periodic sensing is inherent. Fortunately, we have a TTPython construct just for that.

Previously, we mentioned how to take a Python function camera_sampler and add it to our dataflow graph with the decorator @SQify. However, @SQify is the most basic function and requires tokens to come in to trigger its computation, i.e., it needs a caller. We can simulate this caller periodically by using a different decorator, @STREAMify, which creates a special type of SQ. @STREAMify will cause a function to produce outputs periodically, creating a stream of data.

@STREAMify
def camera_sampler(trigger):
    ...

A STREAMified node in TTPython has a unique firing rule. Once a token has been received, it will generate tokens until the stop time as specified by the programmer. One can think of this as a token entering the node and the node generating tokens until the time specified. The function will retrigger itself periodically with the TT parameters specified in the STREAMify section.

Here, the initial caller of camera_sampler needs to provide more information for a STREAMified function to work. As it periodically executes by itself, the single call to start this operation needs to specify by which clock it will use to retrigger itself, the period of time between iterations, and the phase of the clock to synchronize to. Futhermore, as the function will periodically generate new data, this data needs to be timestamped correspondingly. The caller of this function needs to provide this information through the keywords TTClock, TTPeriod, TTPhase and TTDataIntervalWidth.

...
cam_sample = camera_sampler(sample_window, TTClock=root_clock, TTPeriod=750000, TTPhase=0, TTDataIntervalWidth=250000)
...

In TTPython, we have defaulted our root clock to use the system time to synchronize with NIST’s clock. The default root clock runs on the microsecond precision, so if we were to specify a period of 0.750 seconds, we would assign TTPeriod=750000.

These clocks assume that there is an internal microsecond clock ticking, where its counter resets after 750,000 ticks. The programmer can specify when the SQ will trigger during the counting. A TTPhase=0 specifies that the camera_sampler will trigger when the counter reads 0.

When running periodic computation, the function also needs to define the time context of the data it generates. We use the keyword TTDataIntervalWidth to do so. This keyword informs the runtime how to generate a new time-interval for this sample in the stream. When the runtime initiates an instance of the SQ, the runtime will take the start and stop time for executing this SQ and take the average. The new interval is this timestamp average, plus-minus the TTDataIntervalWidth divided by 2. The width of the resulting interval is then TTDataIntervalWidth.

For our car position tracking example, we’ll have STREAMified SQ use the root clock. We have rewritten the camera application to this periodic nature, which now takes a single frame with every iteration and outputs the coodinates in a real-time fashion.

@STREAMify #streamify is meant for generating sampled data streams
    def camera_sampler(trigger):
            import camera_recognition
            global sq_state

            # Setup the class if we have not done so already
            if sq_state.get('camera', None) == None:
                    # Setup our various camera settings
                    camera_specifications = camera_recognition.Settings()
                    camera_specifications.useCamera = False
                    camera_specifications.inputFilename = '/content/yolofiles/cav1/live_test_output.avi'
                    camera_specifications.camTimeFile = '/content/yolofiles/cav1/cam_output.txt'
                    camera_specifications.cameraHeight = .2
                    camera_specifications.cameraAdjustmentAngle = 0.0
                    camera_specifications.fps = 60
                    camera_specifications.width = 1280
                    camera_specifications.height = 720
                    camera_specifications.flip = 2
                    sq_state['camera'] = camera_recognition.Camera(camera_specifications)

            # Take the camera frame from either a camera or a prerecorded video
            frame_read, camera_timestamp = sq_state['camera'].takeCameraFrame()

return [frame_read, camera_timestamp]

@SQify
    def process_camera(cam_sample):
            import camera_recognition
            global sq_state

            camera_frame = cam_sample[0]
            camera_timestamp = cam_sample[1]

            # Setup the class if we have not done so already
            if sq_state.get('camera_recognition', None) == None:
                    # Setup our various camera settings
                    camera_specifications = camera_recognition.Settings()
                    camera_specifications.darknetPath = '/content/darknet/'
                    camera_specifications.cameraHeight = .2
                    camera_specifications.cameraAdjustmentAngle = 0.0
                    camera_specifications.fps = 60
                    camera_specifications.width = 1280
                    camera_specifications.height = 720
                    camera_specifications.flip = 2
                    sq_state['camera_recognition'] = camera_recognition.ProcessCamera(camera_specifications)

            # Process the camera frame that we have recieved
            coordinates, processed_timestamp = sq_state['camera_recognition'].processCameraFrame(camera_frame, camera_timestamp)

            return [coordinates, processed_timestamp]

    @GRAPHify
    def example_1_test(trigger):
            A_1 = 1
            with TTClock.root() as root_clock:
                    # This is for setting the start-tick of the STREAMify's periodic firing rule
                    start_time = READ_TTCLOCK(trigger, TTClock=root_clock)
                    # Set the number of iterations that this will run for
                    N = 50
                    # Setup the stop-tick of the STREAMify's firing rule
                    stop_time = start_time + (1000000 * N) # sample for N seconds

                    # create a sampling interval by copying the start and stop tick from token values to the token time interval
                    sampling_time = VALUES_TO_TTTIME(start_time, stop_time)

                    # copy the sampling interval to the input values to the STREAMify node; these input values will be treated as sticky tokens, and define the duration over which STREAMify'd nodes must run
                    sample_window = COPY_TTTIME(A_1, sampling_time)

                    # Call the camera sampler periodically @ 750ms interval
                    cam_sample = camera_sampler(sample_window, TTClock=root_clock, TTPeriod=750000, TTPhase=0, TTDataIntervalWidth=250000)

                    # Process the camera frame returned by streamify
                    processed_camera = process_camera(cam_sample)

                    # Write our output to a file for later overvation
                    write = write_to_file(processed_camera)

The input token’s time interval intersection between all arguments of the function dictates the period of time over which the stream will be generated. You can see that we modify the input token’s time interval with the two functions VALUES_TO_TTIME and COPY_TTIME. We’ve provided a slew of functions to the programmer to interface with the TTPython architecture. An in depth look at these internal functions can be found here. In the above example, by setting N=50, we are creating a sampling interval of 50 seconds.

We reserve keywords that start with TT as special keywords that are TTPython facing; that is, camera_sampler does not have access to these keywords. These help the TTPython runtime to correctly set up the clock synchronization of the ensemble that hosts that particular SQ.

Now that we have the necessary steps to run a periodic application, check out and run Steps 4-8 in the Jupyter Notebook. We will not be covering the camera and LIDAR sensor fusion section of the application in this tutorial as the TTPython concepts needed to understand it are equivalent in the camera intersection data pipeline.