TTSQSync

The TTSQSync encapsultes the synchronization mechanisms for SQs, which is one of the most essential and novel parts of the timed dataflow graph computation model TTPython uses.

Traditionally, dataflow uses synchronization barriers to ensure the right values are present before executing the code on them. We expand this by adding time, such that we generate streams of data and label tokens with time-intervals, where an overlap in time between tokens establishes a notion of concurrency that we use to evaluate the firing rule in most cases. However, there are other types of firing rules and add-on mechanisms like deadlines that increase complexity. The firing rule itself may require parameterization to set how to behave when a timeout occurs or when to rerun the SQ to generate a periodic stream of data. In this way, the synchronization portion of an SQ performs the bulk of the work at the control layer, in that it decides when and on what data an SQ should execute.

The TTSQSync implements the firing rule and a ‘Waiting-Matching’ section that stores inputs until they are ready for use

class ticktalkpython.SQSync.TTInputPort(port_type)

A TTInputPort holds tokens for a port, and is enumerated with a port type to better distinguish how to handle values as they are inserted/removed from the port.

Parameters:

port_type (TTPortType) – The type of input port to create

class ticktalkpython.SQSync.TTPortType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

The port type is necessary for streaming graphs and certain types of firing rules.

  • Vanilla ports are normal; a token comes in, synchronizes with others, and gets consumed on execution. It will be removed when it finds another set of tokens.

  • Sticky ports will force the token to stick around for more than one iteration so that it can be reused. These are necessary for periodic sources (TimedRetrigger firing rule) and when a streaming SQ receives input from streaming and non-streaming sources.

  • Control ports are extra special, in that they receive from arcs that are not within the compiled graph; they exist to control the mechanisms to force sequentiality or release a token/execution context on a timed schedule, e.g. stateful SQ or a deadline, resp. They may be used in several different ways, but are never returned as part of an execution context during synchronization.

class ticktalkpython.SQSync.TTSQInputTokenStorage(n_ports, firing_rule, sq, is_streaming=False, is_singleton=False, use_deadline=False)

The Token Storage for an SQ; alternatively called a waiting-matching section, but only for this SQ. Tokens will be stored in ports and synchronized based on their time tags, which are intervals. We use the intervaltree library to search for intersections between stored tokens. This should be a sufficiently efficient data structure for these operations.

Parameters:
  • n_ports (int) – The number of input (data) ports

  • firing_rule (TTFiringRule) – The firing rule that will be cheecking this input token storage

  • sq (TTSQ) – A reference to the SQ this will be used for determining port types by analyzing the arcs.

  • is_streaming (bool) – An indicator for whether this SQ operates on streams of data or not. Also used for determining port types while analyzing arcs. Defaults to False

  • use_deadline (bool) – An indicator for wheter this SQ implements deadlines during synchronization. Defaults to False

check_token_clock(token)

All times labeled on tokens should be using the same clock. If this fails, we will raise an exception

Parameters:

token (TTToken) – An input token to check the clock for against the clock the other tokens use

Returns:

None

insert_token(token, port_number)

Insert the token into the port at the designated number (determined by the token’s tag). The port’s storage is implemented as an interval tree, and this interval is none other than start and stop ticks on the token’s TTTime.

Parameters:
  • token (TTToken) – The token to add to the port

  • port_number (int) – The port number the token should be added to

Returns:

None

insert_token_control_port(token, port_number)

Insert the token into the control port at the designated number (determined by the token’s tag). The port’s storage is implemented as an interval tree, and this interval is none other than start and stop ticks on the token’s TTTime.

Control ports start at N within the token’s tag, where N is the number of input arcs that accept ordinary data tokens, e.g., Control port 1 would be port number N+1

Parameters:
  • token (TTToken) – The token to add to the port

  • port_number (int) – The pork number the token should be added to

Returns:

None

match_token(token, port_number, ports=None)

Try to find a set of candidate tokens that all align in time with the input token

Parameters:
  • token (TTToken) – The token to remove to the port

  • port_number (int) – The port number the token should be added to

  • ports (list(TTInputPort)) – The set of ports to match over. Defaults to the set of data ports

Returns:

an overlap time, a set of matched tokens (one per port, including the one the token was intended for). If no match is found, returns None, None

Return type:

TTTime | None, list(TTToken) | None

match_token_strict(token, port_number)

Match tokens strictly, meaning that the intervals must align exactly

Parameters:
  • token (TTToken) – The token to remove to the port

  • port_number (int) – The port number the token should be added to

Returns:

an overlap time, a set of matched tokens (one per port, including the one the token was intended for). If no match is found, None

Return type:

TTime | None, list(TTToken) | None

remove_token(token, port_number)

Delete a token from the port. If the token does not exist, an error will be thrown signaling the token is not present in the storage

Parameters:
  • token (TTToken) – The token to remove to the port

  • port_number (int) – The port number the token should be added to

Returns:

None

remove_token_control_port(token, port_number)

Delete a token from the port. If the token does not exist, ValueError will be thrown signaling the token is not present in the storage

Parameters:
  • token (TTToken) – The token to remove to the port

  • port_number (int) – The port number the token should be added to

Returns:

None

class ticktalkpython.SQSync.TTSQSync(firing_rule, n_input_ports, sq, is_streaming=False, is_singleton=False, use_deadline=False)

The part of an SQ engaged in token synchronization. It will accept new tokens tagged with this SQ’s name/ID. It maintains storage for tokens, and upon accepting new ones, will check against the TTFiringRule to see if there are sufficient tokens to fire. If so, it pulls the tokens from storage, wraps them into an executable context, sends it onward to the scheduler. If it is not ready to fire, it stores the token until it is ready to be used later.

NB: this will be added onto as necessary to account for more types of firing rules and, perhaps, more efficient structures for storing tokens for that type of rule. We may also have to modify this to support multiple application contexts (that, or duplicate SQs, which is harder to manage)

Parameters:
  • firing_rule (TTFiringRule) – A description of the firing rule, using an enumerated type

  • n_input_ports (int) – The number of input ports associated with this SQ

  • is_streaming (bool) – An indicator for whether this SQ will processing a stream of tokens or not. This is used to determine if the tokens that arrive on ports should be ‘Sticky’ or not (meaning they persist longer than one invocation). Defaults to False

  • use_deadline (bool) – An indicator whether or not this SQ uses a deadline. The exact semantics of deadlines are TBD. Defaults to False

calculate_next_trigger_time(start_from=None)

Determine the next sampling time for a streaming node; when it will need to retrigger itself next (or timeout). Base this on the curent time, rather than the last iteration or we may get trapped in a perpetual state of lateness (or negative delays)

Parameters:

start_from (int) – calculate the next trigger time based on some starting time; it should be within one period of this start time. Defaults to None, in which case we read the clock associated with this SQ for the current time and calculate it from there.

check_basic_firing_rule_on_input(token)

Check a standard Timed/Strict firing rule using all the data ports and the new token. If we find a match (i.e. an overlap) among the tokens, then create an execution context.

If we pass the firing rule check, the tokens used for execution will be removed from their ports unless the ports are designated ‘Sticky’

NB: if the firing rule is Strict, then all tokens time-labels must be identical.

Parameters:

token (TTToken) – The newest input token.

Returns:

an execution context to run tokens on, if a match if found. Else, None.

Return type:

TTExecutionContext | None

check_sequential_retrigger_firing_rule(input_token)

This firing rule enforces sequential (technically, chronological is more accurate) processing on input token streams by never allowing the SQ to synchronize on tokens older than the most recent iteration. This is mainly used in SQs that use internal state. A feedback token is used in the control port such that its time interval starts at the start tick of the previous iteration and stop tick at an effectively infinite timestamp. Any old sets of tokens, even if they have a valid overlap, will fail to align with this control token that is generated in the TTExecuteProcess after completing.

A valid execution context will not contain the control token(s)

Parameters:

input_token (TTToken) – The newest input token.

Returns:

an execution context to run tokens on, if a match if found. Else, None.

Return type:

TTExecutionContext | None

check_timed_retrigger_firing_rule(input_token)

Check the TimedRetrigger firing rule for this new token. This requires a control token and special timing behavior, so it is more complex.

This firing rule executes by looking at the overlap between data ports, and using that to define over what period of time is should be producing values. Then, it will attempt to synchronize and produce new tokens according to the period, phase and clock associated with the firing rule. A local feedback token will cause the SQ to be retriggered according to this period and phase until the current time (according to the clock associated with this function) is beyond the stop_tick of the time overlap between the input tokens (whose values will be kept and reused until then)

The first time it runs, this will not have anything for the control input, so it is ignored. The token is meant to best represent the time that a sample in the stream is produced, so the first retrigger token will be generated based on the current time. After that, it will produce a TTTimedEvent to establish the next iteration’s release time. A separate process will wait until that time has arrived, and release the control token back such that this function will run again.

Parameters:

token (TTToken) – The newest input token.

Returns:

an execution context to run tokens on, if a match if found. Else, None. Also return a trigger token that is timed to be released when the next iteration should run

Return type:

TTExecutionContext, TTToken | None, None

clean_ports(readied_tokens, unadded_token, time_overlap, remove_sticky=False)

Remove tokens from the ports. Tokens that are ready to be executed on should be removed, but the newest token that triggered firing rule execution should not be, since it was not added to the ports in the first place. Some ports may be ‘sticky’ such that their values may persist for more than one iteration; these may be optionally removed, but are not by default. We also do some time-based garbage collection by looking at the token overlap here, and removing anything particularly old (by default, 30M ticks of the associated clock, which should be equivalent to at least 30 seconds).

Parameters:
  • readied_tokens (list(TTToken)) – The set of tokens to be executed on; these should be in a returned TTExecutionContext

  • unadded_token (TTToken) – The token whose arrival triggered the firing rule to be satisfied. This should not have been added to the ports in the waiting matching section, we should not try to remove it.

  • time_overlap (TTTime) – The time overlap between the set of readied tokens. Primarily used to clean out sufficiently old tokens.

  • remove_sticky (bool) – An indicator to determine whether sticky ports should be considered or not for cleaning/garbage collection. Defaults to False

Returns:

None

create_execution_context(tokens, time_overlap, est_runtime=0)

Creeate an execution context for the TTExecuteProcess to accept and execute this SQ on.

Parameters:
  • tokens (list(TTTokens)) – The set of tokens to execute on

  • time_overlap (TTTime) – A time interval describing the overlap in Time tags on all the tokens

  • est_runtime (int) – An estimate of the runtime

static from_json(json_in)

Create from the ‘sync’ portion of the SQ’s specification in JSON (represented as a dictionary)

instantiate_at_runtime(clocks, initial_context='u1')

Instantiate the SQ at runtime, performing operations like registering the correct clock (for retriggering firing rules) or adding default tokens to ports that require them.

Parameters:
  • clocks (list(TTClock)) – A set of clocks to search for the one designated by this SQ for operations like timed retriggering

  • initial_context (string) – A context identifier for the context (u) portion of the token tag; it is assumed that all input tokens will have the same context tag.

receive_token(token)

Receive a token meant for this particular SQ. This causes us to check the firing rule.

It will be compared with the existing tokens across the ports, potentially returning an executable context and a control token (depending on the rule). An execution context will signal that we have found a set of viable tokens, and will attempt to execute on them in the TTExecuteProcess.

Parameters:

token (TTToken) – An input token to this SQ

Returns:

An executable context for the scheduler and a control token. Either of these may be None, depending on how the firing rule operates

Return type:

list(TTToken), TTToken

time_based_garbage_collection(recent_time_overlap, staleness_duration=30000000, remove_control=False, remove_sticky=False)

Remove old tokens based on how long they have been around, according to their time tags (specifically, the stop-tick)

Parameters:
  • recent_time_overlap (TTTime) – The overlap in time between a set of tokens that are ready to be executed on

  • staleness_duration (int) – A parameter to determine how old is too old for tokens to still be used. If they’re too old, they’re considereed ‘stale’ and are removed. By default, this is equivalent to 30 seconds in the root domain (assuming microsecond granularity). This will be important to parameterize (perhap using the firing rule and knowledge about timescales in the application) in certain types of system/applications. Defaults to 30000000.

  • remove_control (bool) – An indicator for whether to consider control ports as well when removing tokens. Defaults to False

  • remove_sticky – An indicator for whether to consider sticky ports as well when removing tokens. Defaults to False

ticktalkpython.SQSync.interval_intersection(interval_1, interval_2)

Search for an intersection between two intervals

Parameters:
  • interval_1 (intervaltree.Interval) – The first interval

  • interval_2 (intervaltree.Interval) – The second interval

Returns:

An interval representing the intersection of the two Intervals. None if there is not one found. Reuse the data from the first interval if an interval is found

Return type:

intervaltree.Interval

ticktalkpython.SQSync.interval_union(interval_1, interval_2)

Search for a union between two intervals

Parameters:
  • interval_1 (intervaltree.Interval) – The first interval

  • interval_2 (intervaltree.Interval) – The second interval

Returns:

An interval representing the union of the two Intervals. Reuse the data from the first interval if an interval is found

Return type:

intervaltree.Interval