Tutorial - Stream Synchronization
The program needs to now calculate the car speed based off of the two
inductances produced by the sensors. We will call the function
calculate_car_speed to do so, and we will revisit this function
later in greater depth.
Let’s expand our program:
@STREAMify def sample_magnetometer(A, f, phi): from math import cos, pi return A - (A * cos(get_time() * 2 * f / pi + phi))) def get_time(): ... @SQify def calculate_car_speed(...): ... @GRAPHify def main(trigger): A1 = 1 A2 = 2 f = 0.25 phi = 0 with TTClock.root() as root_clock: inductance1 = sample_magnetometer(A1, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0) inductance2 = sample_magnetometer(A2, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0) return calculate_car_speed()
We see that the GRAPHified function has become much more interesting as it
utilizes two SQs and adds the two values together. This
here (and other basic Python arithmetic and boolean operations) is
semantically different from a Python
+. Sensed values in IoT and CPS
have a notion of time associated with them. It doesn’t make sense to add
two values together if they are “far” away from each in time. For example,
if the time value for
inductance1 was at 1:00 PM and
was at 2:00 PM, we would probably not want to consider these values
valid to be added together. However, 1:00 PM and 1:01 ostensibly could
be added together. We represent this leeway of data validity of adding
values together with the following keyword argument:
@STREAMify def sample_magnetometer(A, f, phi): return A * sin(get_time() * 2*f/pi + phi) def get_time(): ... @SQify def calculate_car_speed(): ... @GRAPHify def main(trigger): A1 = 1 A2 = 2 f = 0.25 phi = 0 with TTClock.root() as root_clock: inductance1 = sample_magnetometer(A1, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0, TTDataIntervalWidth=100000) inductance2 = sample_magnetometer(A2, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0, TTDataIntervalWidth=100000) return inductance1 + inductance2
TTDataIntervalWidth=100000 tells us that we will accept time
values between 100 milliseconds of other sensed values. This represents
the length of the interval for the validity of the sensed data; so if the
sensed value was taken at 1:00 PM, other valid sensed data to be fused with
this one would need to have a data validity interval overlapping with
12:59:59.950 and 1:00:00.050.
Thus, the kwarg
TTDataInervalWidth can directly affect how loose or
strict timing is in our applications.
WIP - below is preliminary
It’s quite easy. Just perform some operation on multiple streams, such as addition or a function call!
In reality, it’s a bit more semantically complex. Stream synchronization is performed based on time intervals tagged onto tokens. An overlap between these intervals implies a notion of concurrency, which, for sampled data, implies that data was gathered in a similar context, making the corresponding data reasonable to compare or combine.
Things to mention * Specifying the function is easy, but understanding what may happen under the hood requires a bit more knowledge. Synchronization may fail otherwise * Synchronizing input values looks for a set of tokens with intersecting time-intervals. * When synchronization completes, all tokens are evicted from intermediate storage, unless they came from SQs that are not downwind of some streaming source (in which case, they are sticky and will persist indefinitely) * Small intervals are much more likely to fail synchronization. These intervals can be specified during stream generation, but will nominally shrink as more stream fusion occurs. This is expected and necessary if synchronizing concurrently gathered data is our primary concern. The user can override this is they directly work on the TTTokens at runtime * If there are multiple overlapping intervals between waiting tokens, we use the largest size overlap to determine which tokens to execute on. If streams are of different periodicities, then we don’t handle that particularly well.