Tutorial - Stream Synchronization ================================= .. _tutorial-streamsync: The program needs to now calculate the car speed based off of the two inductances produced by the sensors. We will call the function ``calculate_car_speed`` to do so, and we will revisit this function later in greater depth. Let's expand our program: .. code-block:: python @STREAMify def sample_magnetometer(A, f, phi): from math import cos, pi return A - (A * cos(get_time() * 2 * f / pi + phi))) def get_time(): ... @SQify def calculate_car_speed(...): ... @GRAPHify def main(trigger): A1 = 1 A2 = 2 f = 0.25 phi = 0 with TTClock.root() as root_clock: inductance1 = sample_magnetometer(A1, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0) inductance2 = sample_magnetometer(A2, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0) return calculate_car_speed() We see that the GRAPHified function has become much more interesting as it utilizes two SQs and adds the two values together. This ``+`` operator here (and other basic Python arithmetic and boolean operations) is semantically different from a Python ``+``. Sensed values in IoT and CPS have a notion of time associated with them. It doesn't make sense to add two values together if they are "far" away from each in time. For example, if the time value for ``inductance1`` was at 1:00 PM and ``inductance2`` was at 2:00 PM, we would probably not want to consider these values valid to be added together. However, 1:00 PM and 1:01 ostensibly could be added together. We represent this leeway of data validity of adding values together with the following keyword argument: ``TTDataIntervalWidth``. .. code-block:: python @STREAMify def sample_magnetometer(A, f, phi): return A * sin(get_time() * 2*f/pi + phi) def get_time(): ... @SQify def calculate_car_speed(): ... @GRAPHify def main(trigger): A1 = 1 A2 = 2 f = 0.25 phi = 0 with TTClock.root() as root_clock: inductance1 = sample_magnetometer(A1, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0, TTDataIntervalWidth=100000) inductance2 = sample_magnetometer(A2, f, phi, TTClock=root_clock, TTPeriod=500000, TTPhase=0, TTDataIntervalWidth=100000) return inductance1 + inductance2 The kwarg ``TTDataIntervalWidth=100000`` tells us that we will accept time values between 100 milliseconds of other sensed values. This represents the length of the interval for the validity of the sensed data; so if the sensed value was taken at 1:00 PM, other valid sensed data to be fused with this one would need to have a data validity interval overlapping with 12:59:59.950 and 1:00:00.050. Thus, the kwarg ``TTDataInervalWidth`` can directly affect how loose or strict timing is in our applications. WIP - below is preliminary It's quite easy. Just perform some operation on multiple streams, such as addition or a function call! In reality, it's a bit more semantically complex. Stream synchronization is performed based on time intervals tagged onto tokens. An overlap between these intervals implies a notion of concurrency, which, for sampled data, implies that data was gathered in a similar context, making the corresponding data reasonable to compare or combine. **Things to mention** * Specifying the function is easy, but understanding what may happen under the hood requires a bit more knowledge. Synchronization may fail otherwise * Synchronizing input values looks for a set of tokens with intersecting time-intervals. * When synchronization completes, all tokens are evicted from intermediate storage, unless they came from SQs that are not downwind of some streaming source (in which case, they are sticky and will persist indefinitely) * Small intervals are much more likely to fail synchronization. These intervals can be specified during stream generation, but will nominally shrink as more stream fusion occurs. This is expected and necessary if synchronizing concurrently gathered data is our primary concern. The user can override this is they directly work on the TTTokens at runtime * If there are multiple overlapping intervals between waiting tokens, we use the largest size overlap to determine which tokens to execute on. If streams are of different periodicities, then we don't handle that particularly well.