Tutorial - Generating Streams
You may have noticed that the previous example camera_sampler
was only
called once. In reality, we’d like to have an indefinite continuous stream
of data, not just call it once! In many cyberphysical systems and IoT
applications, periodic sensing is inherent. Fortunately, we have a TTPython
construct just for that.
Previously, we mentioned how to take a Python function camera_sampler
and add it to our dataflow graph with the decorator @SQify
. However, @SQify
is the most basic function and requires tokens to come in to trigger its
computation, i.e., it needs a caller. We can simulate this caller periodically
by using a different decorator, @STREAMify
, which creates a special type of SQ.
@STREAMify
will cause a function to produce outputs periodically, creating a stream of data.
@STREAMify
def camera_sampler(trigger):
...
A STREAMified node in TTPython has a unique firing rule. Once a token has been received,
it will generate tokens until the stop time as specified by the programmer.
One can think of this as a token entering the node and the node generating tokens until
the time specified. The function will retrigger itself periodically with the TT
parameters specified in the STREAMify section.
Here, the initial caller of camera_sampler
needs to provide more
information for a STREAMified function to work. As it periodically executes by
itself, the single call to start this operation needs to specify by which
clock it will use to retrigger itself, the period of time between iterations,
and the phase of the clock to synchronize to. Futhermore, as the function will
periodically generate new data, this data needs to be timestamped correspondingly.
The caller of this function needs to provide this information through the keywords
TTClock
, TTPeriod
, TTPhase
and TTDataIntervalWidth
.
...
cam_sample = camera_sampler(sample_window, TTClock=root_clock, TTPeriod=750000, TTPhase=0, TTDataIntervalWidth=250000)
...
In TTPython, we have defaulted our root clock to use the system time
to synchronize with NIST’s clock. The default root clock runs on the
microsecond precision, so if we were to specify a period of 0.750 seconds,
we would assign TTPeriod=750000
.
These clocks assume that there is an internal microsecond clock ticking,
where its counter resets after 750,000 ticks. The programmer can specify
when the SQ will trigger during the counting. A TTPhase=0
specifies that
the camera_sampler
will trigger when the counter reads 0.
When running periodic computation, the function also needs to define the
time context of the data it generates. We use the keyword
TTDataIntervalWidth
to do so. This keyword informs the runtime
how to generate a new time-interval for this sample in the stream. When
the runtime initiates an instance of the SQ, the runtime will take the
start and stop time for executing this SQ and take the average. The new
interval is this timestamp average, plus-minus the TTDataIntervalWidth
divided by 2. The width of the resulting interval is then
TTDataIntervalWidth
.
For our car position tracking example, we’ll have STREAMified SQ use the root clock. We have rewritten the camera application to this periodic nature, which now takes a single frame with every iteration and outputs the coodinates in a real-time fashion.
@STREAMify #streamify is meant for generating sampled data streams
def camera_sampler(trigger):
import camera_recognition
global sq_state
# Setup the class if we have not done so already
if sq_state.get('camera', None) == None:
# Setup our various camera settings
camera_specifications = camera_recognition.Settings()
camera_specifications.useCamera = False
camera_specifications.inputFilename = '/content/yolofiles/cav1/live_test_output.avi'
camera_specifications.camTimeFile = '/content/yolofiles/cav1/cam_output.txt'
camera_specifications.cameraHeight = .2
camera_specifications.cameraAdjustmentAngle = 0.0
camera_specifications.fps = 60
camera_specifications.width = 1280
camera_specifications.height = 720
camera_specifications.flip = 2
sq_state['camera'] = camera_recognition.Camera(camera_specifications)
# Take the camera frame from either a camera or a prerecorded video
frame_read, camera_timestamp = sq_state['camera'].takeCameraFrame()
return [frame_read, camera_timestamp]
@SQify
def process_camera(cam_sample):
import camera_recognition
global sq_state
camera_frame = cam_sample[0]
camera_timestamp = cam_sample[1]
# Setup the class if we have not done so already
if sq_state.get('camera_recognition', None) == None:
# Setup our various camera settings
camera_specifications = camera_recognition.Settings()
camera_specifications.darknetPath = '/content/darknet/'
camera_specifications.cameraHeight = .2
camera_specifications.cameraAdjustmentAngle = 0.0
camera_specifications.fps = 60
camera_specifications.width = 1280
camera_specifications.height = 720
camera_specifications.flip = 2
sq_state['camera_recognition'] = camera_recognition.ProcessCamera(camera_specifications)
# Process the camera frame that we have recieved
coordinates, processed_timestamp = sq_state['camera_recognition'].processCameraFrame(camera_frame, camera_timestamp)
return [coordinates, processed_timestamp]
@GRAPHify
def example_1_test(trigger):
A_1 = 1
with TTClock.root() as root_clock:
# This is for setting the start-tick of the STREAMify's periodic firing rule
start_time = READ_TTCLOCK(trigger, TTClock=root_clock)
# Set the number of iterations that this will run for
N = 50
# Setup the stop-tick of the STREAMify's firing rule
stop_time = start_time + (1000000 * N) # sample for N seconds
# create a sampling interval by copying the start and stop tick from token values to the token time interval
sampling_time = VALUES_TO_TTTIME(start_time, stop_time)
# copy the sampling interval to the input values to the STREAMify node; these input values will be treated as sticky tokens, and define the duration over which STREAMify'd nodes must run
sample_window = COPY_TTTIME(A_1, sampling_time)
# Call the camera sampler periodically @ 750ms interval
cam_sample = camera_sampler(sample_window, TTClock=root_clock, TTPeriod=750000, TTPhase=0, TTDataIntervalWidth=250000)
# Process the camera frame returned by streamify
processed_camera = process_camera(cam_sample)
# Write our output to a file for later overvation
write = write_to_file(processed_camera)
The input token’s time interval intersection between all arguments of the
function dictates the period of time over which the stream will be generated.
You can see that we modify the input token’s time interval with the two
functions VALUES_TO_TTIME
and COPY_TTIME
. We’ve provided a slew of
functions to the programmer to interface with the TTPython architecture.
An in depth look at these internal functions can be found
here. In the above example, by setting N=50, we are
creating a sampling interval of 50 seconds.
We reserve keywords that start with TT
as special keywords that are
TTPython facing; that is, camera_sampler
does not have access to these
keywords. These help the TTPython runtime to correctly set up the clock
synchronization of the ensemble that hosts that particular SQ.
Now that we have the necessary steps to run a periodic application, check out and run Steps 4-8 in the Jupyter Notebook. We will not be covering the camera and LIDAR sensor fusion section of the application in this tutorial as the TTPython concepts needed to understand it are equivalent in the camera intersection data pipeline.