Source code for clockblocks.tempo_envelope

"""
Module defining the :class:`TempoEnvelope` class for describing a time-varying tempo, the :class:`TempoHistory` class,
which adds to that a tracking of the current beat and time, and the :class:`MetricPhaseTarget` class, which specifies a
goal arrival point within the beat (or meter) cycle.
"""

#  ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++  #
#  This file is part of SCAMP (Suite for Computer-Assisted Music in Python)                      #
#  Copyright © 2020 Marc Evanstein <marc@marcevanstein.com>.                                     #
#                                                                                                #
#  This program is free software: you can redistribute it and/or modify it under the terms of    #
#  the GNU General Public License as published by the Free Software Foundation, either version   #
#  3 of the License, or (at your option) any later version.                                      #
#                                                                                                #
#  This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;     #
#  without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.     #
#  See the GNU General Public License for more details.                                          #
#                                                                                                #
#  You should have received a copy of the GNU General Public License along with this program.    #
#  If not, see <http://www.gnu.org/licenses/>.                                                   #
#  ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++  #

from __future__ import annotations
from expenvelope import Envelope, EnvelopeSegment
from copy import deepcopy
from .utilities import snap_float_to_nice_decimal, current_clock
import logging
import math
from typing import Sequence


[docs]class TempoEnvelope(Envelope): r""" A subclass of :class:`~expenvelope.envelope.Envelope` that is specifically designed for representing changing tempo curves. The underlying envelope represents beat length as a function of the current beat, which means that the area under the curve represents how much time should pass from one beat to the next (beats * sec/beat = sec). Although the methods take a "units" argument, which can be "beatlength", "tempo", or "rate", these are always converted to beat length in the underlying representation :param levels: levels of the curve segments (i.e. tempo values) in the units specified by the `units` argument :param durations: durations of the curve segments in the units specified by the `duration_units` argument :param curve_shapes: see :func:`~expenvelope.envelope.Envelope.from_levels_and_durations` :param units: one of "tempo", "rate" or "beat length", determining how we interpret the levels given :param duration_units: either "beats" or "time", determining how we interpret the durations given """ def __init__(self, levels: Sequence = (60,), durations: Sequence[float] = (), curve_shapes: Sequence[float | str] = None, units: str = "tempo", duration_units: str = "beats"): units = units.lower().replace(" ", "") if units not in ("tempo", "rate", "beatlength"): raise ValueError("Units must be either \"tempo\" or \"rate\" or \"beatlength\".") if duration_units not in ("beats", "time"): raise ValueError("Duration units must be either \"beats\" or \"time\".") # Whatever units are given, convert them to the underlying beatlength curve when creating the TempoEnvelope super(TempoEnvelope, self).__init__( TempoEnvelope.convert_units(levels, units, "beatlength"), durations, curve_shapes, 0 ) if duration_units == "time": self.convert_durations_to_times() ################################################################################################################## # Class Methods ##################################################################################################################
[docs] @classmethod def from_levels_and_durations(cls, levels: Sequence = (0, 0), durations: Sequence[float] = (0,), curve_shapes: Sequence[float | str] = None, units: str = "tempo", duration_units: str = "beats") -> TempoEnvelope: """ Constructs a TempoEnvelope from the given levels, durations and curve shapes, using the specified units. :param levels: levels of the curve segments (i.e. tempo values) in the units specified by the `units` argument :param durations: durations of the curve segments in the units specified by the `duration_units` argument :param curve_shapes: see :func:`~expenvelope.envelope.Envelope.from_levels_and_durations` :param units: one of "tempo", "rate" or "beat length", determining how we interpret the levels given :param duration_units: either "beats" or "time", determining how we interpret the durations given :return: a TempoEnvelope, constructed accordingly """ return cls(levels, durations, curve_shapes, units=units, duration_units=duration_units)
[docs] @classmethod def from_levels(cls, levels: Sequence[float], length: float = 1.0, units: str = "tempo", duration_units: str = "beats") -> TempoEnvelope: """ Constructs a TempoEnvelope from the given levels and total length, using the specified units. :param levels: levels of the curve segments (i.e. tempo values) in the units specified by the `units` argument :param length: total length of the tempo curve, in the units specified by the `duration_units` argument :param units: one of "tempo", "rate" or "beat length", determining how we interpret the levels given :param duration_units: either "beats" or "time", determining how we interpret the durations given :return: a TempoEnvelope, constructed accordingly """ assert duration_units in ("beats", "time"), "Duration units must be either \"beats\" or \"time\"." return cls( *TempoEnvelope._levels_and_length_to_levels_durations_and_curves(levels, length), units=units, duration_units=duration_units )
[docs] @classmethod def from_list(cls, constructor_list: Sequence, units: str = "tempo", duration_units: str = "beats") -> TempoEnvelope: """ Construct a TempoEnvelope from a list that can take a number of formats :param constructor_list: see :func:`~expenvelope.envelope.Envelope.from_list` :param units: one of "tempo", "rate" or "beat length", determining how we interpret the levels given :param duration_units: either "beats" or "time", determining how we interpret the durations given :return: a TempoEnvelope, constructed accordingly """ assert hasattr(constructor_list, "__len__") if hasattr(constructor_list[0], "__len__"): # we were given levels and durations, and possibly curvature values if len(constructor_list) == 2: if hasattr(constructor_list[1], "__len__"): # given levels and durations return cls.from_levels_and_durations(constructor_list[0], constructor_list[1], units=units, duration_units=duration_units) else: # given levels and the total length return cls.from_levels(constructor_list[0], length=constructor_list[1], units=units, duration_units=duration_units) elif len(constructor_list) >= 3: # given levels, durations, and curvature values return cls.from_levels_and_durations(constructor_list[0], constructor_list[1], constructor_list[2], units=units, duration_units=duration_units) else: # just given levels return cls.from_levels(constructor_list, units=units, duration_units=duration_units)
[docs] @classmethod def from_points(cls, *points, units: str = "tempo", duration_units: str = "beats") -> TempoEnvelope: """ Construct an envelope from a list of (beat/time, tempo/rate/beat length) pairs. Units are defined by the `units` and `duration_units` parameters. :param points: list of points, each of which is of the form (time, value) or (time, value, curve_shape) :param units: one of "tempo", "rate" or "beat length", determining how we interpret the tempo values :param duration_units: either "beats" or "time", determining how we interpret the time values :return: a TempoEnvelope, constructed accordingly """ levels, durations, curve_shapes, offset = TempoEnvelope._unwrap_points(*points) if offset != 0: raise ValueError("TempoEnvelope must start from beat/time zero; when constructing from points, the " "first point must be of the form (0, [start tempo], [optional curve shape]).") return cls(levels, durations, curve_shapes, units=units, duration_units=duration_units)
[docs] @classmethod def from_function(cls, function, domain_start=0, domain_end=1, units: str = "tempo", duration_units: str = "beats", scanning_step_size: float = 0.05, key_point_resolution_multiple: int = 2, iterations: int = 6, min_key_point_distance: float = 1e-7) -> TempoEnvelope: """ Constructs a TempoEnvelope that approximates an arbitrary function. The domain of the function is in units defined by the `duration_units` parameter, and the range is in units defined by the `units` parameter. :param function: A function from beat/time to tempo/rate/beat length, as defined by the `duration_units` and `units` parameters. :param domain_start: see :func:`~expenvelope.envelope.Envelope.from_function` :param domain_end: see :func:`~expenvelope.envelope.Envelope.from_function` :param units: one of "tempo", "rate" or "beat length", determining how we interpret the function output :param duration_units: either "beats" or "time", determining how we interpret the function input :param scanning_step_size: when analyzing the function for discontinuities, maxima and minima, inflection points, etc., use this step size for the initial pass. :param key_point_resolution_multiple: factor by which we add extra key points between the extrema and inflection points to improve the curve fit. :param iterations: when a potential key point is found, we zoom in and scan again in the viscinity of the point. This determines how many iterations of zooming we do. :param min_key_point_distance: after scanning for key points, any that are closer than this distance are merged. :return: a TempoEnvelope, constructed accordingly """ assert duration_units in ("beats", "time"), "Duration units must be either \"beats\" or \"time\"." converted_function = (lambda x: TempoEnvelope.convert_units(function(x), units, "beatlength")) \ if units.lower().replace(" ", "") != "beatlength" else function out_envelope = super().from_function(converted_function, domain_start, domain_end, scanning_step_size=scanning_step_size, key_point_resolution_multiple=key_point_resolution_multiple, iterations=iterations, min_key_point_distance=min_key_point_distance) if duration_units == "time": return out_envelope.convert_durations_to_times() else: return out_envelope
################################################################################################################## # Basic Functionality ##################################################################################################################
[docs] def beat_length_at(self, beat: float, from_left: bool = False) -> float: """ Get the beat length at the given beat. If the beat length jumps at the given beat, the default is to return the beat length after the jump, though this can be overridden with the `from_left` argument. :param beat: the beat at which to get the beat length :param from_left: whether to evaluate from the right or left-hand side of the beat in question """ return self.value_at(beat, from_left)
[docs] def rate_at(self, beat: float, from_left: bool = False) -> float: """ Get the beat rate (in beats/second) at the given beat. If the rate jumps at the given beat, the default is to return the rate after the jump, though this can be overridden with the `from_left` argument. :param beat: the beat at which to get the rate :param from_left: whether to evaluate from the right or left-hand side of the beat in question """ return 1 / self.beat_length_at(beat, from_left)
[docs] def tempo_at(self, beat: float, from_left: bool = False) -> float: """ Get the tempo (in beats/minute) at the given beat. If the tempo jumps at the given beat, the default is to return the tempo after the jump, though this can be overridden with the `from_left` argument. :param beat: the beat at which to get the tempo :param from_left: whether to evaluate from the right or left-hand side of the beat in question """ return self.rate_at(beat, from_left) * 60
[docs] def extend_to(self, beat: float) -> TempoEnvelope: """ Extends the end of this TempoEnvelope to the given beat (if needed) by adding a constant segment at the end. """ if self.length() < beat: # no explicit segments have been made for a while, insert a constant segment to bring us up to date self.append_segment(self.end_level(), beat - self.length()) return self
[docs] def truncate_at(self, beat: float) -> TempoEnvelope: """ Removes all segments after the given beat and adds a constant segment if necessary to bring us up to that beat. :param beat: the beat that we are truncating the tempo envelope after :return: self, for chaining purposes """ self.remove_segments_after(beat) self.extend_to(beat) return self
[docs] def time_at_beat(self, b): return snap_float_to_nice_decimal(self.integrate_interval(0, b))
[docs] def beat_at_time(self, t): return self.get_upper_integration_bound(0, t, max_error=0.00000001)
################################################################################################################## # Conversion Utilities ##################################################################################################################
[docs] @staticmethod def convert_units(values: float | Sequence[float], input_units: str, output_units: str) -> float | Sequence[float]: """ Utility method to convert values between unites of tempo, rate and beat length. :param values: value or list of values in terms of the input_units :param input_units: current units of the given values (either "tempo", "rate", or "beat length") :param output_units: desired units to convert to (either "tempo", "rate", or "beat length") :return: the list of values, converted to output units """ in_units = input_units.lower().replace(" ", "") out_units = output_units.lower().replace(" ", "") assert in_units in ("tempo", "rate", "beatlength") and out_units in ("tempo", "rate", "beatlength"), \ "Invalid value of {} for units. Must be \"tempo\", \"rate\" or \"beat length\"".format(input_units) if in_units == out_units: return values else: convert_input_to_beat_length = (lambda x: 1 / x) if in_units == "rate" \ else (lambda x: 60 / x) if in_units == "tempo" else (lambda x: x) convert_beat_length_to_output = (lambda x: 1 / x) if out_units == "rate" \ else (lambda x: 60 / x) if out_units == "tempo" else (lambda x: x) if hasattr(values, "__len__"): return tuple(convert_beat_length_to_output(convert_input_to_beat_length(x)) for x in values) else: return convert_beat_length_to_output(convert_input_to_beat_length(values))
[docs] def convert_durations_to_times(self): """ Warps this tempo_curve so that all the locations of key points get re-interpreted as times instead of beat locations. For instance, a tempo curve where the rate hovers around 2 will see a segment of length 3 get stretched into a segment of length 6, since if it's supposed to take 3 seconds, it would take 6 beats. Pretty confusing, but when we want to construct a tempo curve specifying the *times* that changes occur rather than the beats, we can first construct it as though the durations were in beats, then call this function to warp it so that the durations are in time. :return: self, altered accordingly """ # this is a little confusing, like everything else about this function, but it represents the start beat of # the curve, which gets scaled inversely to the initial beat length, since a long initial beat length means # it won't take that many beats to get to the desired time. t = self.start_time() / self.start_level() for segment in self.segments: """ The following has been condensed into a single statement for efficiency: actual_time_duration = segment.integrate_segment(segment.start_time, segment.end_time) # the segment duration is currently in beats, but it also represents the duration we would like # the segment to have in time. so the scale factor is desired time duration / actual time duration scale_factor = segment.duration / actual_time_duration # here's we're scaling the duration to now last as long in time as it used to in beats modified_segment_length = scale_factor * segment.duration """ modified_segment_length = segment.duration ** 2 / segment.integrate_segment(segment.start_time, segment.end_time) segment.start_time = t t = segment.end_time = t + modified_segment_length return self
################################################################################################################## # Other Utilities ##################################################################################################################
[docs] def show_plot(self, title=None, resolution=25, show_segment_divisions=True, units="tempo", x_range=None, y_range=None): """ Shows a plot of this TempoEnvelope using matplotlib. :param title: A title to give the plot. :param resolution: number of points to use per envelope segment :param show_segment_divisions: Whether to place dots at the division points between envelope segments :param units: one of "tempo", "rate" or "beat length", determining the units of the y-axis :param x_range: min and max value shown on the x-axis :param y_range: min and max value shown on the y-axis """ # if we're past the end of the envelope, we want to plot that as a final constant segment # bring_up_to_date adds that segment, but we don't want to modify the original envelope, so we deepcopy if x_range is None: x_range = self.start_time(), self.end_time() env_to_plot = deepcopy(self).extend_to(x_range[1]) if self.end_time() < x_range[1] else self try: import matplotlib.pyplot as plt except ImportError: raise ImportError("Could not find matplotlib, which is needed for plotting.") fig, ax = plt.subplots() x_values, y_values = env_to_plot._get_graphable_point_pairs(resolution) ax.plot(x_values, TempoEnvelope.convert_units(y_values, "beat length", units)) if show_segment_divisions: ax.plot(env_to_plot.times, TempoEnvelope.convert_units(env_to_plot.levels, "beat length", units), 'o') plt.xlabel("Beat") plt.ylabel("Tempo" if units == "tempo" else "Rate" if units == "rate" else "Beat Length") plt.xlim(x_range) if y_range is not None: plt.ylim(y_range) ax.set_title('Graph of TempoEnvelope' if title is None else title) plt.show()
@classmethod def _from_dict(cls, json_dict): curve_shapes = None if 'curve_shapes' not in json_dict else json_dict['curve_shapes'] if 'length' in json_dict: return cls.from_levels(json_dict['levels'], json_dict['length'], units="beatlength") else: return cls.from_levels_and_durations(json_dict['levels'], json_dict['durations'], curve_shapes, units="beatlength") def __repr__(self): return "TempoEnvelope({}, {}, {})".format( TempoEnvelope.convert_units(self.levels, "beatlength", "tempo"), self.durations, self.curve_shapes)
[docs]class TempoHistory(TempoEnvelope): r""" Subclass of TempoEnvelope that keeps track of a current beat and time, and provides functionality for moving forward a certain number of beats or seconds, and/or setting tempo target(s) to reach in the future. :param levels: see :class:`TempoEnvelope` :param durations: see :class:`TempoEnvelope` :param curve_shapes: see :class:`TempoEnvelope` :param units: see :class:`TempoEnvelope` :param duration_units: see :class:`TempoEnvelope` :param beat: Where to set the current beat """ def __init__(self, levels: Sequence = (60,), durations: Sequence[float] = (), curve_shapes: Sequence[float | str] = None, units: str = "tempo", duration_units: str = "beats", beat: float = 0.0): super().__init__(levels, durations, curve_shapes, units, duration_units) self.go_to_beat(beat) ################################################################################################################## # Basic Properties ##################################################################################################################
[docs] def time(self): """ The current time. Time is found by integrating under the beat length curve: seconds/beat * beats = seconds. """ return self._t
[docs] def beat(self): """ The current beat. """ return self._beat
@property def beat_length(self): """ The current beat length. """ return self.beat_length_at(self._beat) @beat_length.setter def beat_length(self, beat_length): self.truncate() if len(self.segments) == 1 and self.length() == 0: # if this is an essentially empty tempo envelope, reset its starting beat_length to the given value self.segments[0].start_level = self.segments[0].end_level = beat_length else: self.append_segment(beat_length, 0) @property def rate(self): """ The current rate in beats/second """ return 1 / self.beat_length @rate.setter def rate(self, rate): self.beat_length = 1/rate @property def tempo(self): """ The current tempo in beats/minute """ return self.rate * 60 @tempo.setter def tempo(self, tempo): self.rate = tempo / 60 ################################################################################################################## # Tempo Changes ##################################################################################################################
[docs] def set_beat_length_target(self, beat_length_target: float, duration: float, curve_shape: float = 0, metric_phase_target: float | MetricPhaseTarget | tuple = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Set a target beat length for this TempoEnvelope to reach in duration beats/seconds (with the unit defined by duration_units). :param beat_length_target: The beat length we want to reach :param duration: How long until we reach that beat length :param curve_shape: > 0 makes change happen later, < 0 makes change happen sooner :param metric_phase_target: This argument lets us align the arrival at the given beat length with a particular part of the parent beat (time), or, if we specified "time" as our duration units, it allows us to align the arrival at that specified time with a particular part of this clock's beat. This argument takes either a float in [0, 1), a MetricPhaseTarget object, or a tuple of arguments to the MetricPhaseTarget constructor :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting this target. """ if duration_units not in ("beats", "time"): raise ValueError("Argument duration_units must be either \"beat\" or \"time\".") if metric_phase_target is not None: metric_phase_target = MetricPhaseTarget.interpret(metric_phase_target) # truncate removes any segments that extend into the future if truncate: self.remove_segments_after(self.beat()) # add a flat segment up to the current beat if needed self.extend_to(self.beat()) self._add_segment(beat_length_target, duration, curve_shape, metric_phase_target, duration_units)
def _add_segment(self, beat_length_target: float, duration: float, curve_shape: float = 0, metric_phase_target: float | MetricPhaseTarget | tuple = None, duration_units: str = "beats") -> None: """ The guts of adding a new segment, minus argument checking and truncating/bringing up to date. """ if duration_units == "beats": extension_into_future = self.length() - self.beat() if duration < extension_into_future: raise ValueError("Duration to target must extend beyond the last existing target.") self.append_segment(beat_length_target, duration - extension_into_future, curve_shape) if metric_phase_target is not None: if not self._adjust_segment_end_time_to_metric_phase_target(self.segments[-1], metric_phase_target): logging.warning("Metric phase target {} was not reachable".format(metric_phase_target)) else: # units == "time", so we need to figure out how many beats are necessary time_extension_into_future = self.integrate_interval(self.beat(), self.length()) if duration < time_extension_into_future: raise ValueError("Duration to target must extend beyond the last existing target.") # normalized_time = how long the curve would take if it were one beat long normalized_time = EnvelopeSegment( 0, 1, self.value_at(self.length()), beat_length_target, curve_shape ).integrate_segment(0, 1) desired_curve_length = duration - time_extension_into_future self.append_segment(beat_length_target, desired_curve_length / normalized_time, curve_shape) if metric_phase_target is not None: if not self._adjust_segment_end_beat_to_metric_phase_target(self.segments[-1], metric_phase_target): logging.warning("Metric phase target {} was not reachable".format(metric_phase_target))
[docs] def set_beat_length_targets(self, beat_length_targets: Sequence[float], durations: Sequence[float], curve_shapes: Sequence[float] = None, metric_phase_targets: Sequence[float | MetricPhaseTarget | tuple] = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Same as set_beat_length_target, except that you can set multiple targets at once by providing lists to each of the arguments. :param beat_length_targets: list of the target beat_lengths :param durations: list of segment durations (in beats or seconds, as defined by duration_units) :param curve_shapes: list of segment curve_shapes (or none to not set curve shape) :param metric_phase_targets: list of metric phase targets for each segment (or None to ignore metric phase) :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds/parent beats. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting these targets. """ num_targets = len(beat_length_targets) if duration_units not in ("beats", "time"): raise ValueError("Argument duration_units must be either \"beat\" or \"time\".") curve_shapes = [0] * num_targets if curve_shapes is None else curve_shapes if len(durations) != num_targets: raise ValueError("Inconsistent number of targets and durations.") if len(curve_shapes) != num_targets: raise ValueError("Inconsistent number of targets and curve_shapes.") if metric_phase_targets is not None and len(metric_phase_targets) != num_targets: raise ValueError("Inconsistent number of metric phase targets and curve_shapes.") # truncate removes any segments that extend into the future if truncate: self.remove_segments_after(self.beat()) # add a flat segment up to the current beat if needed self.extend_to(self.beat()) if metric_phase_targets is None: # no segments have phase targets, so it's simple for beat_length_target, duration, curve_shape in zip(beat_length_targets, durations, curve_shapes): self._add_segment(beat_length_target, duration, curve_shape, None, duration_units) else: metric_phase_targets = [(MetricPhaseTarget.interpret(x) if x is not None else None) for x in metric_phase_targets] # This is used to adjust metric phase, if desired. We keep track of all the segments # we've added since we last adjusted the metric phase. segments_to_adjust = [] # We also keep track of the start and end beat/time of the current group of segments so that # we don't have to recalculate it all the time current_group_start_beat = current_group_end_beat = self.end_time() current_group_start_time = current_group_end_time = \ self.time() + self.integrate_interval(self.beat(), self.end_time()) for beat_length_target, duration, curve_shape, metric_phase_target in \ zip(beat_length_targets, durations, curve_shapes, metric_phase_targets): if metric_phase_target is None: # no metric phase target for this segment, but some segments do have metric phase targets # so we add it to our list of segments to adjust when we next have to adjust to a target self._add_segment(beat_length_target, duration, curve_shape, metric_phase_target, duration_units) added_segment = self.segments[-1] segments_to_adjust.append(added_segment) current_group_end_beat += added_segment.duration current_group_end_time += added_segment.integrate_segment(added_segment.start_time, added_segment.end_time) else: # if we're here then there is a metric phase target for the end of this segment if len(segments_to_adjust) == 0: # if we haven't built up any segments to adjust, then just add this one segment, # adjusting it in the process self._add_segment(beat_length_target, duration, curve_shape, metric_phase_target, duration_units) added_segment = self.segments[-1] current_group_end_beat += added_segment.duration current_group_end_time += added_segment.integrate_segment(added_segment.start_time, added_segment.end_time) current_group_start_beat = current_group_end_beat current_group_start_time = current_group_end_time continue # Otherwise, we add the segment without adjusting it in the process... self._add_segment(beat_length_target, duration, curve_shape, None, duration_units) added_segment = self.segments[-1] segments_to_adjust.append(added_segment) current_group_end_beat += added_segment.duration current_group_end_time += added_segment.integrate_segment(added_segment.start_time, added_segment.end_time) # ...and then we try to reach the target by adjusting all of the segments since the last adjustment success = False # did we successfully adjust? if duration_units == "beats": for goal_end_time in metric_phase_target.get_nearest_matching_times(current_group_end_time): # try both the nearest matching time before and after goal_time_duration = goal_end_time - current_group_start_time if self._adjust_segments_time_duration(segments_to_adjust, goal_time_duration): # if one of them works, declare success and break current_group_end_time = goal_end_time # reset the end time based on the adjustment success = True break elif duration_units == "time": for goal_end_beat in metric_phase_target.get_nearest_matching_beats(current_group_end_beat): # try both the nearest matching beat before and after # first, squeeze/stretch all the segments to take up an appropriate number of beats proportional_adjustment = (goal_end_beat - current_group_start_beat) / \ (current_group_end_beat - current_group_start_beat) b = current_group_start_beat for segment in segments_to_adjust: old_dur = segment.duration segment.start_time = b segment.end_time = b = b + proportional_adjustment * old_dur # then try to re-adjust to get back to the original end time if self._adjust_segments_time_duration(segments_to_adjust, current_group_end_time - current_group_start_time): # if it works, declare success and break current_group_end_beat = goal_end_beat # reset the end beat based on the adjustment success = True break if not success: logging.warning("Metric phase target {} was not reachable.".format(metric_phase_target)) else: # If it did succeed, clear the segments_to_adjust. We don't want to be adjusting any of the # segments that we just adjusted, since they would get messed up. segments_to_adjust.clear() # also reset the group start and end beat/time current_group_start_beat = current_group_end_beat current_group_start_time = current_group_end_time
[docs] def set_rate_target(self, rate_target: float, duration: float, curve_shape: float = 0, metric_phase_target: float | MetricPhaseTarget | tuple = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Set a target beat rate for this TempoEnvelope to reach in duration beats/seconds (with the unit defined by duration_units). :param rate_target: The beat rate we want to reach :param duration: How long until we reach that beat rate :param curve_shape: > 0 makes change happen later, < 0 makes change happen sooner :param metric_phase_target: This argument lets us align the arrival at the given beat length with a particular part of the parent beat (time), or, if we specified "time" as our duration units, it allows us to align the arrival at that specified time with a particular part of this clock's beat. This argument takes either a float in [0, 1), a MetricPhaseTarget object, or a tuple of arguments to the MetricPhaseTarget constructor :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting this target. """ self.set_beat_length_target(1 / rate_target, duration, curve_shape, metric_phase_target, duration_units, truncate)
[docs] def set_rate_targets(self, rate_targets: Sequence[float], durations: Sequence[float], curve_shapes: Sequence[float] = None, metric_phase_targets: Sequence[float | MetricPhaseTarget | tuple] = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Same as set_rate_target, except that you can set multiple targets at once by providing lists to each of the arguments. :param rate_targets: list of the target beat rates :param durations: list of segment durations (in beats or seconds, as defined by duration_units) :param curve_shapes: list of segment curve_shapes (or none to not set curve shape) :param metric_phase_targets: list of metric phase targets for each segment (or None to ignore metric phase) :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds/parent beats. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting these targets. """ self.set_beat_length_targets([1 / x for x in rate_targets], durations, curve_shapes, metric_phase_targets, duration_units, truncate)
[docs] def set_tempo_target(self, tempo_target: float, duration: float, curve_shape: float = 0, metric_phase_target: float | MetricPhaseTarget | tuple = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Set a target tempo for this TempoEnvelope to reach in duration beats/seconds (with the unit defined by duration_units). :param tempo_target: The tempo we want to reach :param duration: How long until we reach that tempo :param curve_shape: > 0 makes change happen later, < 0 makes change happen sooner :param metric_phase_target: This argument lets us align the arrival at the given beat length with a particular part of the parent beat (time), or, if we specified "time" as our duration units, it allows us to align the arrival at that specified time with a particular part of this clock's beat. This argument takes either a float in [0, 1), a MetricPhaseTarget object, or a tuple of arguments to the MetricPhaseTarget constructor :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting this target. """ self.set_beat_length_target(60 / tempo_target, duration, curve_shape, metric_phase_target, duration_units, truncate)
[docs] def set_tempo_targets(self, tempo_targets: Sequence[float], durations: Sequence[float], curve_shapes: Sequence[float] = None, metric_phase_targets: Sequence[float | MetricPhaseTarget | tuple] = None, duration_units: str = "beats", truncate: bool = True) -> None: """ Same as set_tempo_target, except that you can set multiple targets at once by providing lists to each of the arguments. :param tempo_targets: list of the target tempos :param durations: list of segment durations (in beats or seconds, as defined by duration_units) :param curve_shapes: list of segment curve_shapes (or none to not set curve shape) :param metric_phase_targets: list of metric phase targets for each segment (or None to ignore metric phase) :param duration_units: one of ("beats", "time"); defines whether the duration is in beats or in seconds/parent beats. :param truncate: Whether or not to truncate this TempoEnvelope to the current beat before setting these targets. """ self.set_beat_length_targets([60 / x for x in tempo_targets], durations, curve_shapes, metric_phase_targets, duration_units, truncate)
# ----------------------------------------- Metric Phase adjustments --------------------------------------------- # These two methods are for just adjusting a single segment's metric phase in beat or time. # They are used when adding single segments that we want to adjust the phase of def _adjust_segment_end_time_to_metric_phase_target(self, segment, metric_phase_target): # this is confusing; segment.start_time and segment.end_time are really the start and end *beats* segment_start_time = self.time() + self.integrate_interval(self.beat(), segment.start_time) segment_end_time = segment_start_time + segment.integrate_segment(segment.start_time, segment.end_time) for new_end_time in metric_phase_target.get_nearest_matching_times(segment_end_time): try: segment.set_curvature_to_desired_integral(new_end_time - segment_start_time) return True except ValueError: pass return False @staticmethod def _adjust_segment_end_beat_to_metric_phase_target(segment, metric_phase_target): # this is how long the segment currently takes; we want to end up with this being the same original_integral = segment.integrate_segment(segment.start_time, segment.end_time) for new_end_beat in metric_phase_target.get_nearest_matching_beats(segment.end_time): try: # shrink the segment segment.end_time = new_end_beat # and then try to change the curvature to return to the original time duration segment.set_curvature_to_desired_integral(original_integral) return True except ValueError: pass return False # These methods are used when we want to adjust the metric phase at the end of a group of segments.
[docs] def adjust_metric_phase_at_beat(self, beat: float, metric_phase_target: float | MetricPhaseTarget | tuple) -> bool: """ Sets the goal (time) metric phase at the given beat. So, for instance, if we called ``adjust_metric_phase_at_beat(5, 0.5)``, this would mean that we want to be at time 1.5, 2.5, 3.5 etc. at beat 5. If we called ``adjust_metric_phase_at_beat(7, 1.25, 3)``, this would mean that at beat 7, we would want to be at time 1.25, 4.25, 7.25, etc. :param beat: The beat at which to have the given phase in time :param metric_phase_target: either a :class:`MetricPhaseTarget`, or the argument to construct one :return: True, if the adjustment is possible, False if not """ if beat > self.length() or beat <= self.beat(): raise ValueError("Cannot adjust metric phase before current beat or beyond the end of the TempoEnvelope") metric_phase_target = MetricPhaseTarget.interpret(metric_phase_target) # what's the current time at the beat? time_at_beat = self.time() + self.integrate_interval(self.beat(), beat) # try to adjust that to one of the nearby target phases for good_phase_time in metric_phase_target.get_nearest_matching_times(time_at_beat): if self.adjust_time_at_beat(beat, good_phase_time): # the adjustment worked (returned true), so return True to say that we succeeded return True # if we get here, neither adjustment was possible, so we failed. Return false. return False
[docs] def adjust_time_at_beat(self, beat_to_adjust: float, desired_time: float) -> bool: """ Adjusts the curvature of segments from now until beat so that we reach it at desired_time, if possible. If not possible, leaves the TempoCurve unchanged and returns False :param beat_to_adjust: the beat at which we want to be at a particular time :param desired_time: the time we want to be at :return: True if the adjustment worked, False if it's impossible """ assert self.beat() < beat_to_adjust <= self.length() # make a copy of the original segments lists to fall back on in case we fail back_up = deepcopy(self.segments) self.insert_interpolated(self.beat()) self.insert_interpolated(beat_to_adjust) adjustable_segments = self.segments[self._get_index_of_segment_at(self.beat(), right_most=True): self._get_index_of_segment_at(beat_to_adjust, left_most=True) + 1] goal_total_time = desired_time - self.time() result = TempoHistory._adjust_segments_time_duration(adjustable_segments, goal_total_time) if result == "no change": # it worked, but we didn't have to change anything # no there's no need for the interpolations self.segments = back_up return True elif result: # it worked, return True return True else: # the adjustment failed, so return to the old segments before interpolation # and return False to signal the failure self.segments = back_up return False
@staticmethod def _adjust_segments_time_duration(which_segments: Sequence[EnvelopeSegment], goal_total_time: float): """ Adjusts the total time that the segments take without changing the total beats :param which_segments: which segments to adjust. :param goal_total_time: the total time we want them to take :return: True if it's possible, False if not, and "no change" in the off-chance that no change was needed """ # ranges of how long each segment could take by adjusting curvature segment_time_ranges = [segment.get_integral_range() for segment in which_segments] # range of how long the entire thing could take total_time_range = (sum(x[0] for x in segment_time_ranges), sum(x[1] for x in segment_time_ranges)) # check if it's even possible to get to the desired time by simply adjusting curvatures if not total_time_range[0] < goal_total_time < total_time_range[1]: # if not return False return False # how long each segment currently takes segment_times = [segment.integrate_segment(segment.start_time, segment.end_time) for segment in which_segments] # how long all the segments take total_time = sum(segment_times) # on the off-chance that it already works perfectly, return "no change" to indicate that it worked, # but that it was totally unnecessary if goal_total_time == total_time: return "no change" # if we've reached this point, we're ready to make the adjustments # delta_time is how much of an adjustment we need total delta_time = goal_total_time - total_time # we distribute this total adjustment between the segments based on how much room they have to move # in the direction we want them to move. Longer segments and segments with more room to wiggle do the # majority of the adjusting. if delta_time < 0: weightings = [segment_time - segment_time_range[0] for segment_time, segment_time_range in zip(segment_times, segment_time_ranges)] else: weightings = [segment_time_range[1] - segment_time for segment_time, segment_time_range in zip(segment_times, segment_time_ranges)] weightings_sum = sum(weightings) segment_adjustments = [weighting / weightings_sum * delta_time for weighting in weightings] for segment, segment_time, segment_adjustment in zip(which_segments, segment_times, segment_adjustments): segment.set_curvature_to_desired_integral(segment_time + segment_adjustment) return True
[docs] def adjust_metric_phase_at_time(self, target_time: float, metric_phase_target: float | MetricPhaseTarget | tuple) -> bool: """ Sets the goal (beat) metric phase at the given time. So, for instance, if we called ``adjust_metric_phase_at_time(5, 0.5)``, this would mean that at time 5 we want to be at beat 1.5, 2.5, 3.5 etc. If we called ``adjust_metric_phase_at_time(7, 1.25, 3)``, this would mean that at time 7, we would want to be at beat 1.25, 4.25, 7.25, etc. :param target_time: The time at which to have the given phase in beat :param metric_phase_target: either a MetricPhaseTarget, or the argument to construct one :return: True, if the adjustment is possible, False if not """ envelope_end_time = self.time() + self.integrate_interval(self.beat(), self.end_time()) if target_time > envelope_end_time or target_time <= self.time(): raise ValueError("Cannot adjust metric phase before current beat or beyond the end of the TempoEnvelope") metric_phase_target = MetricPhaseTarget.interpret(metric_phase_target) # what's the current beat at the time? beat_at_time = self.beat() + self.get_beat_wait_from_time_wait(target_time - self.time()) # try to adjust that to one of the nearby target phases for good_phase_beat in metric_phase_target.get_nearest_matching_beats(beat_at_time): if self.adjust_beat_at_time(target_time, good_phase_beat): # the adjustment worked (returned true), so return True to say that we succeeded return True # if we get here, neither adjustment was possible, so we failed. Return false. return False
[docs] def adjust_beat_at_time(self, time_to_adjust: float, desired_beat: float) -> bool: """ Adjusts the curvature of segments from now until the specified time so that we reach it at desired_beat, if possible. If not possible, leaves the TempoCurve unchanged and returns False. :param time_to_adjust: the time at which we want to be at a particular beat :param desired_beat: the beat we want to be at :return: True if the adjustment worked, False if it's impossible """ envelope_end_time = self.time() + self.integrate_interval(self.beat(), self.end_time()) assert self.time() < time_to_adjust <= envelope_end_time # make a copy of the original segments lists to fall back on in case we fail back_up = deepcopy(self.segments) current_beat_at_adjust_point = self.beat() + self.get_beat_wait_from_time_wait(time_to_adjust - self.time()) start_beat = self.insert_interpolated(self.beat()) # if the insertion does nothing because it's too close to an existing point, it will return the existing point current_beat_at_adjust_point = self.insert_interpolated(current_beat_at_adjust_point) adjustable_index_start = self._get_index_of_segment_at(self.beat(), right_most=True) adjustable_index_end = self._get_index_of_segment_at(current_beat_at_adjust_point, left_most=True) + 1 adjustable_segments = self.segments[adjustable_index_start: adjustable_index_end] # first we squeeze or stretch all the segments so that we reach the right beat at the end of the last one delta_beat = desired_beat - current_beat_at_adjust_point proportional_length_adjustment = (desired_beat - start_beat) / (current_beat_at_adjust_point - start_beat) b = adjustable_segments[0].start_time for segment in adjustable_segments: old_dur = segment.duration segment.start_time = b segment.end_time = b = b + proportional_length_adjustment * old_dur for segment in self.segments[adjustable_index_end:]: segment.start_time += delta_beat segment.end_time += delta_beat # now that we squeezed or stretched so as to be at the correct moment in the curve, on the correct beat # see if we can adjust the curvature of the segments so that the time at that moment is unchanged if self.adjust_time_at_beat(desired_beat, time_to_adjust): # if it works, return True return True else: # otherwise, revert and return false self.segments = back_up return False
################################################################################################################## # Advancing Time ##################################################################################################################
[docs] def get_wait_time(self, beats: float) -> float: """ Get the amount of time it would take to wait the specified number of beats starting at the current point in the TempoEnvelope. :param beats: how many beats to wait :return: how much time that will take """ return self.integrate_interval(self._beat, self._beat + beats)
[docs] def advance(self, beats: float) -> tuple[float, float]: """ Advance the current beat/time in the envelope by the given number of beats. :param beats: how many beats to advance by :return: tuple of delta beats, delta time """ wait_time = self.get_wait_time(beats) self._beat = snap_float_to_nice_decimal(self._beat + beats) self._t = snap_float_to_nice_decimal(self._t + wait_time) return beats, wait_time
[docs] def get_beat_wait_from_time_wait(self, seconds: float) -> float: """ Get the amount of beats we would have to wait in order to wait for the given number of seconds, starting at the current point in the TempoHistory. :param seconds: how many seconds to wait :return: how many beats that would correspond to """ beat_to_get_to = self.get_upper_integration_bound(self._beat, seconds, max_error=0.00000001) return beat_to_get_to - self._beat
[docs] def advance_time(self, seconds: float): """ Advance the current beat/time in the envelope by the given number of seconds. :param seconds: how many seconds to advance by :return: tuple of delta beats, delta time """ beats = self.get_beat_wait_from_time_wait(seconds) self.advance(beats) return beats, seconds
[docs] def go_to_beat(self, b: float) -> TempoEnvelope: """ Jump straight to the given beat in this TempoEnvelope :param b: the beat to jump to :return: self, for chaining purposes """ self._beat = snap_float_to_nice_decimal(b) self._t = snap_float_to_nice_decimal(self.integrate_interval(0, b)) return self
################################################################################################################## # Utilities ##################################################################################################################
[docs] def truncate(self) -> TempoEnvelope: """ Removes all segments after the current beat. :return: self, for chaining purposes """ return self.truncate_at(self._beat)
[docs] def show_plot(self, title=None, resolution=25, show_segment_divisions=True, units="tempo", x_range=None, y_range=None): title = "Graph of TempoHistory" if title is None else title super().show_plot(title, resolution, show_segment_divisions, units, (min(0, self.start_time()), max(self.end_time(), self.beat())) if x_range is None else x_range, y_range)
[docs] def as_tempo_envelope(self) -> TempoEnvelope: """ Converts this TempoHistory to a simpler TempoEnvelope (removing reference to current beat and time) """ return TempoEnvelope(self.levels, self.durations, self.curve_shapes, "beatlength")
def __repr__(self): return "TempoHistory({}, {}, {}{})".format( TempoEnvelope.convert_units(self.levels, "beatlength", "tempo"), self.durations, self.curve_shapes, ", beat={}".format(self._beat) if self._beat != 0 else "" )
[docs]class MetricPhaseTarget: """ Class representing a particular point in a (beat or measure) cycle. :param phase_or_phases: Where we are in the cycle. :param divisor: Length of the cycle (defaults to one beat, meaning that this specifies where we are in the beat) :param relative: Whether or not the start of the cycle is measured relative to the current beat/time or to the start of the clock. """ def __init__(self, phase_or_phases: float | Sequence[float], divisor: float = 1, relative: bool = False): self.phases = (phase_or_phases, ) if not hasattr(phase_or_phases, "__len__") else phase_or_phases if not all(0 <= x < divisor for x in self.phases): raise ValueError("One or more phases out of range for divisor.") self.divisor = divisor self.relative = relative
[docs] @classmethod def interpret(cls, value: float | Sequence) -> MetricPhaseTarget: """ Interpret a tuple or just a number as a MetricPhaseTarget. E.g. we want the user to be able to hand in a tuple like (0.5, 3) and have it get interpreted as a target of 0.5 with divisor 3. :param value: either a tuple (which becomes the constructor arguments), a MetricPhaseTarget (which is passed through unchanged), or a number which is treated as the phase with other args as defaults. :return: A MetricPhaseTarget, interpreted from the argument """ if isinstance(value, MetricPhaseTarget): return value elif hasattr(value, "__len__"): return MetricPhaseTarget(*value) else: return MetricPhaseTarget(value)
def _get_nearest_matches(self, t: float, offset: float = 0) -> tuple[float, float]: floored_value = math.floor(t / self.divisor) * self.divisor closest_below = None closest_above = None min_dist_below = float("inf") min_dist_above = float("inf") for base_multiple in (floored_value - self.divisor, floored_value, floored_value + self.divisor): for remainder in self.phases: remainder = (remainder + offset) % self.divisor this_value = base_multiple + remainder if this_value < t and t - this_value < min_dist_below: closest_below = this_value min_dist_below = t - this_value elif this_value >= t and this_value - t < min_dist_above: closest_above = this_value min_dist_above = this_value - t # return the closest above and below in order of closeness return (closest_below, closest_above) if min_dist_below <= min_dist_above else (closest_above, closest_below)
[docs] def get_nearest_matching_beats(self, beat: float) -> tuple[float, float]: """ Get the nearest beats below and above the given beat with the correct metric phase :param beat: the beat to search around :return: tuple of nearest beat below, nearest beat above """ if self.relative: return self._get_nearest_matches(beat, current_clock().beat()) else: return self._get_nearest_matches(beat)
[docs] def get_nearest_matching_times(self, time: float) -> tuple[float, float]: """ Get the nearest times below and above the given time with the correct metric phase :param time: the time to search around :return: tuple of nearest time below, nearest time above """ if self.relative: return self._get_nearest_matches(time, current_clock().time()) else: return self._get_nearest_matches(time)
def __repr__(self): return "MetricPhaseTarget({}{}{})".format( str(self.phases[0]) if hasattr(self.phases, "__len__") else self.phases, (", " + str(self.divisor)) if self.divisor != 1 else "", ", True" if self.relative else "", )