Skip to content

Full listing of all docstrings by module

execute_simulation(inputs)

Run the selected simulation tool with the provided inputs. Returns output data based on the chosen simulation mode.

Source code in lyopronto/high_level.py
def execute_simulation(inputs):
    """
    Run the selected simulation tool with the provided inputs.
    Returns output data based on the chosen simulation mode.
    """
    sim_type = inputs["sim"]["tool"]
    output_data = None

    if sim_type == "Freezing Calculator":
        output_data = freezing.freeze(
            inputs["vial"],
            inputs["product"],
            inputs["h_freezing"],
            inputs["Tshelf"],
            inputs["dt"],
        )

    elif sim_type == "Primary Drying Calculator":
        if inputs["sim"]["Kv_known"] and inputs["sim"]["Rp_known"]:
            output_data = calc_knownRp.dry(
                inputs["vial"],
                inputs["product"],
                inputs["ht"],
                inputs["Pchamber"],
                inputs["Tshelf"],
                inputs["dt"],
            )
        elif not inputs["sim"]["Kv_known"] and inputs["sim"]["Rp_known"]:
            output_data = _optimize_kv_parameter(inputs)
        elif inputs["sim"]["Kv_known"] and not inputs["sim"]["Rp_known"]:
            output_data = _optimize_rp_parameter(inputs)
        else:
            raise ValueError(
                "With the current implementation, either Kv or Rp must be specified."
            )

    elif sim_type == "Design Space Generator":
        output_data = design_space.dry(
            inputs["vial"],
            inputs["product"],
            inputs["ht"],
            inputs["Pchamber"],
            inputs["Tshelf"],
            inputs["dt"],
            inputs["eq_cap"],
            inputs["nVial"],
        )

    elif sim_type == "Optimizer":
        output_data = _run_optimizer(inputs)

    else:
        raise ValueError(
            f"Invalid simulation tool {sim_type} selected. "
            "Valid options are: 'Freezing Calculator', 'Primary Drying Calculator', "
            "'Design Space Generator', 'Optimizer'."
        )

    return output_data

generate_visualizations(output_data, inputs, timestamp)

Create and save publication-quality plots based on simulation results.

Source code in lyopronto/high_level.py
def generate_visualizations(output_data, inputs, timestamp):
    """
    Create and save publication-quality plots based on simulation results.
    """

    # TODO: move these to kwargs for the function
    figure_props = {
        "figwidth": 30,
        "figheight": 20,
        "linewidth": 5,
        "marker_size": 20,
    }
    tool = inputs["sim"]["tool"]
    matplotlibrc("text.latex", preamble=r"\usepackage{color}")
    matplotlibrc("text", usetex=False)
    plt.rcParams["font.family"] = "Arial"

    if tool == "Freezing Calculator":
        _plot_freezing_results(output_data, figure_props, timestamp)
    elif tool in ["Primary Drying Calculator", "Optimizer"]:
        if tool == "Primary Drying Calculator" and not inputs["sim"]["Rp_known"]:
            _plot_rp_results(output_data, figure_props, timestamp)
            data = output_data[0]  # There are extra returns for Rp fitting
        else:
            data = output_data  # for all but unknown Rp, output_data is the only return
        _plot_drying_results(data, figure_props, timestamp)
    elif tool == "Design Space Generator":
        _plot_design_space(output_data, inputs, figure_props, timestamp)

read_inputs(filename)

Read inputs from a YAML file.

Source code in lyopronto/high_level.py
def read_inputs(filename):
    "Read inputs from a YAML file."
    with open(filename, "r") as yamlfile:
        inputs = yaml.load(yamlfile)
        if "product_temp_filename" in inputs:
            print(
                "Note: input specifies a product temperature data file. "
                + "This data should be loaded separately and added to the inputs "
                + "dictionary as `time_data` and `temp_data`."
            )
        return inputs

save_csv(output_data, inputs, timestamp)

Export simulation results to CSV file with appropriate formatting.

Source code in lyopronto/high_level.py
def save_csv(output_data, inputs, timestamp):
    """
    Export simulation results to CSV file with appropriate formatting.
    """
    filename = f"lyopronto_output_{timestamp}.csv"

    sim = inputs["sim"]

    if sim["tool"] == "Freezing Calculator":
        assert output_data.shape[1] == 3
        header = "Time [hr], Shelf Temp [°C], Product Temp [°C]"
        np.savetxt(filename, output_data, delimiter=", ", header=header)
    elif sim["tool"] == "Design Space Generator":
        _write_design_space_csv(output_data, inputs, filename)
    else:
        if sim["tool"] == "Primary Drying Calculator" and not sim["Rp_known"]:
            assert len(output_data) == 3  # output, product_res, params
            header = ",".join(
                [
                    "Time [hr]",
                    "Cake Length [cm]",
                    "Product Resistance [cm^2-hr-Torr/g]",
                ]
            )
            rpfile = f"lyo_Rp_data_{timestamp}.csv"
            np.savetxt(rpfile, output_data[1], delimiter=", ", header=header)
            data = output_data[0]
        else:
            data = output_data  # for all but unknown Rp, output_data is the only return

        header = ",".join(
            [
                "Time [hr]",
                "Sublimation Front Temp [°C]",
                "Vial Bottom Temperature [°C]",
                "Shelf Temp [°C]",
                "Chamber Pressure [mTorr]",
                "Sublimation Flux [kg/hr/m²]",
                "Percent Dried",
            ]
        )
        np.savetxt(filename, data, delimiter=", ", header=header)

save_inputs(inputs, timestamp)

Save inputs to a YAML file with timestamped filename.

Source code in lyopronto/high_level.py
def save_inputs(inputs, timestamp):
    "Save inputs to a YAML file with timestamped filename."
    copied = inputs.copy()
    # If the inputs include large arrays of data, strip those out
    copied.pop("time_data", None)
    copied.pop("temp_data", None)
    # Then save
    with open(f"lyopronto_input_{timestamp}.yaml", "w") as yamlfile:
        yaml.dump(copied, yamlfile)

save_inputs_legacy(inputs, timestamp)

Save inputs to a CSV file with timestamped filename.

Source code in lyopronto/high_level.py
def save_inputs_legacy(inputs, timestamp):
    """
    Save inputs to a CSV file with timestamped filename.
    """
    filename = f"lyopronto_input_{timestamp}.csv"
    sim = inputs["sim"]
    vial = inputs["vial"]
    product = inputs["product"]

    with open(filename, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)

        writer.writerow(["Simulation Tool", sim["tool"]])
        writer.writerow(["Kv Known", sim["Kv_known"]])
        writer.writerow(["Rp Known", sim["Rp_known"]])
        writer.writerow(["Variable Chamber Pressure", sim["Variable_Pch"]])
        writer.writerow(["Variable Shelf Temperature", sim["Variable_Tsh"]])
        writer.writerow([])

        writer.writerow(["Vial Cross-Section [cm²]", vial["Av"]])
        writer.writerow(["Product Area [cm²]", vial["Ap"]])
        writer.writerow(["Fill Volume [mL]", vial["Vfill"]])
        writer.writerow([])

        writer.writerow(["Fractional solute concentration:", product["cSolid"]])
        if sim["tool"] == "Freezing Calculator":
            writer.writerow(["Intial product temperature [C]:", product["Tpr0"]])
            writer.writerow(["Freezing temperature [C]:", product["Tf"]])
            writer.writerow(["Nucleation temperature [C]:", product["Tn"]])
        elif not (sim["tool"] == "Primary Drying Calculator" and not sim["Rp_known"]):
            writer.writerow(["R0 [cm^2-hr-Torr/g]:", product["R0"]])
            writer.writerow(["A1 [cm-hr-Torr/g]:", product["A1"]])
            writer.writerow(["A2 [1/cm]:", product["A2"]])
        if not (
            sim["tool"] == "Freezing Calculator"
            or sim["tool"] == "Primary Drying Calculator"
        ):
            writer.writerow(["Critical product temperature [C]:", product["T_pr_crit"]])

        if sim["tool"] == "Freezing Calculator":
            writer.writerow(["h_freezing [W/m^2/K]:", inputs["h_freezing"]])
        elif sim["Kv_known"]:
            writer.writerow(["KC [cal/s/K/cm^2]:", inputs["ht"]["KC"]])
            writer.writerow(["KP [cal/s/K/cm^2/Torr]:", inputs["ht"]["KP"]])
            writer.writerow(["KD [1/Torr]:", inputs["ht"]["KD"]])
        elif not sim["Kv_known"]:
            writer.writerow(["Kv range [cal/s/K/cm^2]:", inputs["Kv_range"][:]])
            writer.writerow(["Experimental drying time [hr]:", inputs["t_dry_exp"]])

        if sim["tool"] == "Freezing Calculator":
            0
        elif sim["tool"] == "Design Space Generator":
            writer.writerow(
                ["Chamber pressure set points [Torr]:", inputs["Pchamber"]["setpt"][:]]
            )
        elif not (sim["tool"] == "Optimizer" and sim["Variable_Pch"]):
            for i in range(len(inputs["Pchamber"]["setpt"])):
                writer.writerow(
                    [
                        "Chamber pressure setpoint [Torr]:",
                        inputs["Pchamber"]["setpt"][i],
                        "Duration [min]:",
                        inputs["Pchamber"]["dt_setpt"][i],
                    ]
                )
            writer.writerow(
                [
                    "Chamber pressure ramping rate [Torr/min]:",
                    inputs["Pchamber"]["ramp_rate"],
                ]
            )
        else:
            writer.writerow(
                ["Minimum chamber pressure [Torr]:", inputs["Pchamber"]["min"]]
            )
            writer.writerow(
                ["Maximum chamber pressure [Torr]:", inputs["Pchamber"]["max"]]
            )
        writer.writerow([""])

        if sim["tool"] == "Design Space Generator":
            writer.writerow(["Intial shelf temperature [C]:", inputs["Tshelf"]["init"]])
            writer.writerow(
                ["Shelf temperature set points [C]:", inputs["Tshelf"]["setpt"][:]]
            )
            writer.writerow(
                [
                    "Shelf temperature ramping rate [C/min]:",
                    inputs["Tshelf"]["ramp_rate"],
                ]
            )
        elif not (sim["tool"] == "Optimizer" and sim["Variable_Tsh"]):
            for i in range(len(inputs["Tshelf"]["setpt"])):
                writer.writerow(
                    [
                        "Shelf temperature setpoint [C]:",
                        inputs["Tshelf"]["setpt"][i],
                        "Duration [min]:",
                        inputs["Tshelf"]["dt_setpt"][i],
                    ]
                )
            writer.writerow(
                [
                    "Shelf temperature ramping rate [C/min]:",
                    inputs["Tshelf"]["ramp_rate"],
                ]
            )
        else:
            writer.writerow(["Minimum shelf temperature [C]:", inputs["Tshelf"]["min"]])
            writer.writerow(["Maximum shelf temperature [C]:", inputs["Tshelf"]["max"]])

        writer.writerow(["Time Step [hr]", inputs["dt"]])
        writer.writerow(["Equipment Parameter a [kg/hr]", inputs["eq_cap"]["a"]])
        writer.writerow(["Equipment Parameter b [kg/hr/Torr]", inputs["eq_cap"]["b"]])
        writer.writerow(["Number of Vials", inputs["nVial"]])

calc_knownRp

dry(vial, product, ht, Pchamber, Tshelf, dt)

Simulate the primary drying process for known condiditions and parameters.

Parameters:

Name Type Description Default
vial dict

see master simulation inputs

required
product dict

see master simulation inputs

required
ht dict

see master simulation inputs

required
Pchamber dict

see master simulation inputs

required
Tshelf dict

see master simulation inputs

required
dt float

Fixed time step for output [hours]

required

Returns:

Name Type Description
output_table ndarray

Simulation output table with columns for: 0. Time [hr], 1. Sublimation front temperature [°C], 2. Vial bottom temperature [°C], 3. Shelf temperature [°C], 4. Chamber pressure [mTorr], 5. Sublimation flux [kg/hr/m²], 6. Drying percent [%]

Source code in lyopronto/calc_knownRp.py
def dry(vial,product,ht,Pchamber,Tshelf,dt):
    """Simulate the primary drying process for known condiditions and parameters.

    Args:
        vial (dict): see master simulation inputs
        product (dict): see master simulation inputs
        ht (dict): see master simulation inputs
        Pchamber (dict): see master simulation inputs
        Tshelf (dict): see master simulation inputs
        dt (float): Fixed time step for output [hours]

    Returns:
        output_table (ndarray): Simulation output table with columns for:
            0. Time [hr],
            1. Sublimation front temperature [°C],
            2. Vial bottom temperature [°C],
            3. Shelf temperature [°C],
            4. Chamber pressure [mTorr],
            5. Sublimation flux [kg/hr/m²],
            6. Drying percent [%]
    """

    ##################  Initialization ################

    # Initial fill height
    Lpr0 = functions.Lpr0_FUN(vial['Vfill'],vial['Ap'],product['cSolid'])   # [cm]

    # Time-dependent functions for Pchamber and Tshelf, take time in hours
    Pch_t = functions.RampInterpolator(Pchamber)
    Tsh_t = functions.RampInterpolator(Tshelf)

    # Get maximum simulation time based on shelf and chamber setpoints
    # This may not really be necessary, but is part of legacy behavior
    # Could remove in a future release
    max_t = min(Pch_t.max_time(), Tsh_t.max_time())   # [hr], add buffer

    if Pch_t.max_setpt() > functions.Vapor_pressure(Tsh_t.max_setpt()):
        warn("Chamber pressure setpoint exceeds vapor pressure at shelf temperature " +\
        "setpoint(s). Drying cannot proceed.")
        return np.array([[0.0, Tsh_t(0), Tsh_t(0), Tsh_t(0), Pch_t(0) * constant.Torr_to_mTorr, 0.0, 0.0]])

    inputs = (vial, product, ht, Pch_t, Tsh_t, dt, Lpr0)

    Lck0 = [0.0]
    T0 = Tsh_t(0)

    ################ Set up dynamic equation ######################
    # This function is defined here because it uses local variables, rather than
    # taking them as arguments.
    def calc_dLdt(t, u):
        # Time in hours
        Lck = u[0] # [cm]
        Tsh = Tsh_t(t)
        Pch = Pch_t(t)
        Kv = functions.Kv_FUN(ht['KC'],ht['KP'],ht['KD'],Pch)  # Vial heat transfer coefficient [cal/s/K/cm^2]
        Rp = functions.Rp_FUN(Lck,product['R0'],product['A1'],product['A2'])  # Product resistance [cm^2-hr-Torr/g]
        Tsub = fsolve(functions.T_sub_solver_FUN, T0, args = (Pch,vial['Av'],vial['Ap'],Kv,Lpr0,Lck,Rp,Tsh))[0] # Sublimation front temperature [degC]
        dmdt = functions.sub_rate(vial['Ap'],Rp,Tsub,Pch)   # Total sublimation rate [kg/hr]
        if dmdt<0:
            dmdt = 0.0
            dLdt = 0
            return [dLdt]
        # Tbot = functions.T_bot_FUN(Tsub,Lpr0,Lck,Pch,Rp)    # Vial bottom temperature array in degC

        dLdt = (dmdt*constant.kg_To_g)/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute) # [cm/hr]
        return [dLdt]

    ### ------ Condition for ending simulation: completed drying
    def finish(t, L):
        return Lpr0 - L[0]
    finish.terminal = True
    timestops = np.unique(np.sort(np.concatenate((Pch_t.times, Tsh_t.times)))) # These will be consumed
    def corner(t, L0):
        nearest = np.argmin(np.abs(timestops - t))
        return t - timestops[nearest]
    corner.terminal = True
    corner.direction = 1


    # ------- Solve the equations
    sols = []
    Lck = Lck0
    for i in range(0, len(timestops)-1):
        tspan = (timestops[0], timestops[1])
        timestops = timestops[1:] # Remove the first time point,
        # so the next iteration doesn't hit that same corner event immediately
        sol = solve_ivp(calc_dLdt, tspan, Lck, events=(finish, corner),
                        vectorized=False, dense_output=True, method="BDF")
        sols.append(sol)
        Lck[0] = sol.y[0, -1]
        if len(sol.t_events[0]) > 0: # Hit finish event
            break
        elif sol.t[-1] == max_t: # and Lpr0 > sol.y[0, -1]:
            warn("Maximum simulation time (specified by Pchamber and Tshelf) reached before drying completion.")
            break
        elif len(sol.t_events[1]) > 0: # Hit corner event
            continue



    output = functions.fill_output(sols, inputs)

    return output    

design_space

dry(vial, product, ht, Pchamber, Tshelf, dt, eq_cap, nVial)

Compute quantities necessary for constructing a graphical design space.

Parameters:

Name Type Description Default
vial dict

description

required
product dict

description

required
ht dict

description

required
Pchamber dict

description

required
Tshelf dict

description

required
dt float

description

required
eq_cap dict

description

required
nVial int

description

required

Returns:

Name Type Description
ndarray

table of results for shelf isotherms

ndarray

table of results for product isotherms

ndarray

table of results for equipment capability curve

The first two returns have 5 rows corresponding to
  • Maximum product temperature [degC]
  • Primary drying time [hr]
  • Average sublimation flux [kg/hr/m^2]
  • Maximum/minimum sublimation flux [kg/hr/m^2]
  • Sublimation flux at the end of primary drying [kg/hr/m^2]

The third return has 3 rows corresponding to the first three of that list.

With nT setpoints in Tshelf['setpt'] and nP setpoints in Pchamber['setpt'], the returned arrays have the following shapes: - Shelf isotherms: (5, nT, nP) array - Product isotherms: (5, 2) array (for the lowest and highest Pchamber setpoints) - Equipment capability curve: (3, nP) array

Source code in lyopronto/design_space.py
def dry(vial,product,ht,Pchamber,Tshelf,dt,eq_cap,nVial):
    """Compute quantities necessary for constructing a graphical design space. 

    Args:
        vial (dict): _description_
        product (dict): _description_
        ht (dict): _description_
        Pchamber (dict): _description_
        Tshelf (dict): _description_
        dt (float): _description_
        eq_cap (dict): _description_
        nVial (int): _description_

    Returns:
        ndarray: table of results for shelf isotherms
        ndarray: table of results for product isotherms
        ndarray: table of results for equipment capability curve

    The first two returns have 5 rows corresponding to:
        - Maximum product temperature [degC]
        - Primary drying time [hr]
        - Average sublimation flux [kg/hr/m^2]
        - Maximum/minimum sublimation flux [kg/hr/m^2]
        - Sublimation flux at the end of primary drying [kg/hr/m^2]
    The third return has 3 rows corresponding to the first three of that list.

    With nT setpoints in Tshelf['setpt'] and nP setpoints in Pchamber['setpt'], the returned arrays have the following shapes:
        - Shelf isotherms: (5, nT, nP) array
        - Product isotherms: (5, 2) array (for the lowest and highest Pchamber setpoints)
        - Equipment capability curve: (3, nP) array

    """

    T_max = np.zeros([np.size(Tshelf['setpt']),np.size(Pchamber['setpt'])])
    drying_time = np.zeros([np.size(Tshelf['setpt']),np.size(Pchamber['setpt'])])
    sub_flux_avg = np.zeros([np.size(Tshelf['setpt']),np.size(Pchamber['setpt'])])
    sub_flux_max = np.zeros([np.size(Tshelf['setpt']),np.size(Pchamber['setpt'])])
    sub_flux_end = np.zeros([np.size(Tshelf['setpt']),np.size(Pchamber['setpt'])])

    # Initial fill height
    Lpr0 = functions.Lpr0_FUN(vial['Vfill'],vial['Ap'],product['cSolid'])   # [cm]

    ############  Shelf temperature isotherms ##########

    for i_Tsh,Tsh_setpt in enumerate(Tshelf['setpt']):

        for i_Pch,Pch in enumerate(Pchamber['setpt']):

            # Check for feasibility
            if functions.Vapor_pressure(Tsh_setpt) < Pch:
                # TODO: decide about how to gracefully exit
                # For now, just set outputs to NaN and continue
                warn(f"At Tshelf={Tsh_setpt} and Pch={Pch}, sublimation is not feasible (vapor pressure < chamber pressure).")
                T_max[i_Tsh,i_Pch] = np.nan
                drying_time[i_Tsh,i_Pch] = np.nan
                sub_flux_avg[i_Tsh,i_Pch] = np.nan
                sub_flux_max[i_Tsh,i_Pch] = np.nan
                sub_flux_end[i_Tsh,i_Pch] = np.nan
                continue

            ##################  Initialization ################

            # Initialization of time
            iStep = 0      # Time iteration number
            t = 0.0    # Time [hr]

            # Initialization of cake length
            Lck = 0.0    # Cake length [cm]

            # Initial shelf temperature
            Tsh = Tshelf['init']        # [degC]
            # Time at which shelf temperature reaches set point [hr]
            t_setpt = abs(Tsh_setpt-Tshelf['init'])/Tshelf['ramp_rate']/constant.hr_To_min

            # Intial product temperature
            T0=Tsh   # [degC]

            # Vial heat transfer coefficient [cal/s/K/cm^2]
            Kv = functions.Kv_FUN(ht['KC'],ht['KP'],ht['KD'],Pch) 

            ######################################################

            ################ Primary drying ######################

            while(Lck<=Lpr0): # Dry the entire frozen product

                Rp = functions.Rp_FUN(Lck,product['R0'],product['A1'],product['A2'])  # Product resistance [cm^2-hr-Torr/g]

                Tsub = fsolve(functions.T_sub_solver_FUN, T0, args = (Pch,vial['Av'],vial['Ap'],Kv,Lpr0,Lck,Rp,Tsh))[0] # Sublimation front temperature [degC]
                dmdt = functions.sub_rate(vial['Ap'],Rp,Tsub,Pch)   # Total sublimation rate [kg/hr]
                if dmdt<0:
                    warn(f"At t={t}hr, shelf temperature Tsh={Tsh} is too low for sublimation.")
                    dmdt = 0.0
                Tbot = functions.T_bot_FUN(Tsub,Lpr0,Lck,Pch,Rp)    # Vial bottom temperature [degC]

                # Sublimated ice length
                dL = (dmdt*constant.kg_To_g)*dt/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute) # [cm]

            # Update record as functions of the cycle time
                if (iStep==0):
                    output_saved = np.array([[t, float(Tbot), dmdt/(vial['Ap']*constant.cm_To_m**2)]])
                else:
                    output_saved = np.append(output_saved, [[t, float(Tbot), dmdt/(vial['Ap']*constant.cm_To_m**2)]],axis=0)

                # Advance counters
                Lck_prev = Lck # Previous cake length [cm]
                Lck = Lck + dL # Cake length [cm]
                if (Lck_prev < Lpr0) and (Lck > Lpr0):
                    Lck = Lpr0    # Final cake length [cm]
                    dL = Lck - Lck_prev   # Cake length dried [cm]
                    t = iStep*dt + dL/((dmdt*constant.kg_To_g)/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute)) # [hr]
                else:
                    t = (iStep+1) * dt # Time [hr]

                # Shelf temperature
                if t<t_setpt:
                    # Ramp till set point is reached
                    if Tshelf['init'] < Tsh_setpt:
                        Tsh = Tsh + Tshelf['ramp_rate']*constant.hr_To_min*dt
                    else:
                        Tsh = Tsh - Tshelf['ramp_rate']*constant.hr_To_min*dt
                else:
                    Tsh = Tsh_setpt    # Maintain at set point
                iStep = iStep + 1 # Time iteration number

            ######################################################

            T_max[i_Tsh,i_Pch] = np.max(output_saved[:,1])    # Maximum product temperature [degC]
            drying_time[i_Tsh,i_Pch] = t    # Total drying time [hr]
            # TODO: consider whether to make this error rather than return NaN
            if output_saved.shape[0] <= 2:
                warn(f"At Tsh={Tsh} and Pch={Pch}, drying completed in single timestep: check inputs.")
                sub_flux_avg[i_Tsh,i_Pch] = np.nan
                sub_flux_max[i_Tsh,i_Pch] = np.nan
                sub_flux_end[i_Tsh,i_Pch] = np.nan
                continue
            del_t = output_saved[1:,0]-output_saved[:-1,0]
            del_t = np.append(del_t,del_t[-1])
            sub_flux_avg[i_Tsh,i_Pch] = np.sum(output_saved[:,2]*del_t)/np.sum(del_t)    # Average sublimation flux [kg/hr/m^2]
            sub_flux_max[i_Tsh,i_Pch] = np.max(output_saved[:,2])    # Maximum sublimation flux [kg/hr/m^2]
            sub_flux_end[i_Tsh,i_Pch] = output_saved[-1,2]    # Sublimation flux at end of primary drying [kg/hr/m^2]

    ###########################################################################

    drying_time_pr = np.zeros([2])
    sub_flux_avg_pr = np.zeros([2])
    sub_flux_min_pr = np.zeros([2])
    sub_flux_end_pr = np.zeros([2])

    ############  Product temperature isotherms ##########

    for j,Pch in enumerate([Pchamber['setpt'][0],Pchamber['setpt'][-1]]):

        ##################  Initialization ################

        # Initialization of time
        iStep = 0      # Time iteration number
        t = 0.0    # Time [hr]

        # Initialization of cake length
        Lck = 0.0    # Cake length [cm]

        # Vial heat transfer coefficient [cal/s/K/cm^2]
        Kv = functions.Kv_FUN(ht['KC'],ht['KP'],ht['KD'],Pch) 

        ######################################################        

        ################ Primary drying ######################

        while(Lck<=Lpr0): # Dry the entire frozen product

            Rp = functions.Rp_FUN(Lck,product['R0'],product['A1'],product['A2'])  # Product resistance [cm^2-hr-Torr/g]

            Tsub = fsolve(functions.T_sub_fromTpr, product['T_pr_crit'], args = (product['T_pr_crit'],Lpr0,Lck,Pch,Rp))[0] # Sublimation front temperature [degC]
            dmdt = functions.sub_rate(vial['Ap'],Rp,Tsub,Pch)   # Total sublimation rate [kg/hr]

            # Sublimated ice length
            dL = (dmdt*constant.kg_To_g)*dt/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute) # [cm]

            # Update record as functions of the cycle time
            if (iStep==0):
                output_saved = np.array([[t, dmdt/(vial['Ap']*constant.cm_To_m**2)]])
            else:
                output_saved = np.append(output_saved, [[t, dmdt/(vial['Ap']*constant.cm_To_m**2)]],axis=0)

            # Advance counters
            Lck_prev = Lck # Previous cake length [cm]
            Lck = Lck + dL # Cake length [cm]
            if (Lck_prev < Lpr0) and (Lck > Lpr0):
                Lck = Lpr0    # Final cake length [cm]
                dL = Lck - Lck_prev   # Cake length dried [cm]
                t = iStep*dt + dL/((dmdt*constant.kg_To_g)/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute)) # [hr]
            else:
                t = (iStep+1) * dt # Time [hr]
            iStep = iStep + 1 # Time iteration number

        ######################################################

        drying_time_pr[j] = t    # Total drying time [hr]
        # TODO: consider whether this should error rather than return NaN
        if output_saved.shape[0] <= 2:
            warn(f"At Pch={Pch} and critical temp {product['T_pr_crit']}, drying completed in single timestep: check inputs.")
            sub_flux_avg_pr[j] = np.nan
            sub_flux_min_pr[j] = np.nan
            sub_flux_end_pr[j] = np.nan
            continue
        del_t = output_saved[1:,0]-output_saved[:-1,0]
        del_t = np.append(del_t,del_t[-1])
        sub_flux_avg_pr[j] = np.sum(output_saved[:,1]*del_t)/np.sum(del_t)    # Average sublimation flux [kg/hr/m^2]
        sub_flux_min_pr[j] = np.min(output_saved[:,1])    # Minimum sublimation flux [kg/hr/m^2]
        sub_flux_end_pr[j] = output_saved[-1,1]    # Sublimation flux at end of primary drying [kg/hr/m^2]

    ###########################################################################

    T_max_eq_cap = np.zeros([np.size(Pchamber['setpt'])])

    ############  Equipment Capability ##########

    dmdt_eq_cap = eq_cap['a'] + eq_cap['b']*np.array(Pchamber['setpt'])    # Sublimation rate [kg/hr]
    if np.any(dmdt_eq_cap < 0):
        warn("Equipment capability sublimation rate is negative for some chamber pressures; setting to nan.")
        # dmdt_eq_cap = np.maximum(dmdt_eq_cap, 0.0)
        dmdt_eq_cap[dmdt_eq_cap <=0.0] = np.nan
    sub_flux_eq_cap = dmdt_eq_cap/nVial/(vial['Ap']*constant.cm_To_m**2)    # Sublimation flux [kg/hr/m^2]

    drying_time_eq_cap = Lpr0/((dmdt_eq_cap/nVial*constant.kg_To_g)/(1-product['cSolid']*constant.rho_solution/constant.rho_solute)/(vial['Ap']*constant.rho_ice)*(1-product['cSolid']*(constant.rho_solution-constant.rho_ice)/constant.rho_solute))    # Drying time [hr]

    Lck = np.linspace(0,Lpr0,100)    # Cake length [cm]
    Rp = functions.Rp_FUN(Lck,product['R0'],product['A1'],product['A2'])    # Product resistance [cm^2-hr-Torr/g]
    for k,Pch in enumerate(Pchamber['setpt']):
        T_max_eq_cap[k] = functions.Tbot_max_eq_cap(Pch,dmdt_eq_cap[k],Lpr0,Lck,Rp,vial['Ap'])        # Maximum product temperature [degC]

    #####################################################

    return np.array([T_max,drying_time,sub_flux_avg,sub_flux_max,sub_flux_end]), \
        np.array([np.array([product['T_pr_crit'],product['T_pr_crit']]),drying_time_pr,sub_flux_avg_pr,sub_flux_min_pr,sub_flux_end_pr]), \
        np.array([T_max_eq_cap,drying_time_eq_cap,sub_flux_eq_cap])

functions

File with a bunch of functions in it.

RampInterpolator

Class to handle ramped setpoint interpolation.

Source code in lyopronto/functions.py
class RampInterpolator:
    """Class to handle ramped setpoint interpolation."""

    def __init__(self, rampspec, count_ramp_against_dt=True):
        self.ramp_sep = count_ramp_against_dt
        self.dt_setpt = np.array(rampspec["dt_setpt"])
        self.ramp_rate = rampspec["ramp_rate"]
        if "init" in rampspec:
            self.setpt = np.concatenate(([rampspec["init"]], rampspec["setpt"]))
            self.values = np.concatenate(([self.setpt[0]], np.repeat(self.setpt[1:], 2)))
            times = np.array([0.0])
        else:
            self.setpt = np.array(rampspec["setpt"])
            self.values = np.repeat(self.setpt, 2)
            times = np.array([0.0, self.dt_setpt[0] / constant.hr_To_min])
        # Older logic: setpoint_dt includes the ramp time.
        # Kept for backward compatibility, but add a check if insufficient time allowed for ramp
        if count_ramp_against_dt:
            # If there is no "init" value, dt_setpt[0] was already consumed.
            has_init = "init" in rampspec
            for i in range(1, len(self.setpt)):
                # If fewer dt_setpt than setpt provided, repeat the last dt
                dt_idx = i - 1 if has_init else i
                totaltime = self.dt_setpt[min(len(self.dt_setpt) - 1, dt_idx)] / constant.hr_To_min
                ramptime = abs((self.setpt[i] - self.setpt[i-1]) / self.ramp_rate) / constant.hr_To_min
                holdtime = totaltime - ramptime
                if holdtime < 0:
                    warn(f"Ramp time ({ramptime * constant.hr_To_min:.1f} min) from "
                         f"{self.setpt[i-1]:.2e} to {self.setpt[i]:.2e} exceeds "
                         f"total stage time ({totaltime * constant.hr_To_min:.1f} min). "
                         f"Clamping hold time to 0.")
                    holdtime = 0.0
                times = np.append(times, [ramptime, holdtime])
        else:
        # Newer logic: setpoint_dt applies *after* the ramp is complete.
            for i,v in enumerate(self.setpt[1:], start=1):
                ramptime = abs((v - self.setpt[i-1]) / self.ramp_rate) / constant.hr_To_min
                # If less dt_setpt than setpt provided, repeat the last dt
                holdtime = self.dt_setpt[min(len(self.dt_setpt)-1, i-1)] / constant.hr_To_min
                times = np.append(times, [ramptime, holdtime])
        self.times = np.cumsum(times)

    def __call__(self, t):
        return np.interp(t, self.times, self.values)

    def max_time(self):
        return self.times[-1]

    def max_setpt(self):
        return np.max(self.values)

Eq_Constraints(Pch, dmdt, Tbot, Tsh, Psub, Tsub, Kv, Lpr0, Lck, Av, Ap, Rp)

Defines the equality constraints for lyophilization. Inputs are chamber pressure [Torr], sublimation rate [kg/hr], vial bottom temperature [degC], shelf temperature [degC], sublimation front pressure [Torr], sublimation front temperature [degC], vial heat transfer coefficient [cal/s/cm^2/K], initial product length [cm], cake length [cm], vial area [cm^2], product area [cm^2], and product resistance [cm^2-Torr-hr/g]

Source code in lyopronto/functions.py
def Eq_Constraints(Pch,dmdt,Tbot,Tsh,Psub,Tsub,Kv,Lpr0,Lck,Av,Ap,Rp):
    """
    Defines the equality constraints for lyophilization. Inputs are chamber pressure [Torr],
    sublimation rate [kg/hr], vial bottom temperature [degC], shelf temperature [degC],
    sublimation front pressure [Torr], sublimation front temperature [degC], vial heat
    transfer coefficient [cal/s/cm^2/K], initial product length [cm], cake length [cm],
    vial area [cm^2], product area [cm^2], and product resistance [cm^2-Torr-hr/g]
    """

    C1 = Psub - 2.698e10*np.exp(-6144.96/(273.15+Tsub))   # Vapor pressure at the sublimation temperature [Torr]

    C2 = dmdt - Ap/Rp/constant.kg_To_g*(Psub-Pch)  # Sublimation rate [kg/hr]

    C3 = (Tsh-Tbot)*Av*Kv*(Lpr0-Lck) - Ap*(Tbot-Tsub)*constant.k_ice  # Vial heat transfer balance

    C4 = Tsh - dmdt*constant.kg_To_g/constant.hr_To_s*constant.dHs/Av/Kv - Tbot  # Shelf temperature [degC]

    return C1,C2,C3,C4

Ineq_Constraints(Pch, dm_dt, Tcrit, Tbot, a, b, nVial)

Defines the inequality constraints for lyophilization optimization within safe operation region inside the desgin space. Inputs are chamber pressure [Torr], sublimation rate [kg/hr], critical product temperature [degC], vial bottom temperature [degC], equipment capability parameters a [kg/hr] and b [kg/hr/Torr], and number of vials

Source code in lyopronto/functions.py
def Ineq_Constraints(Pch,dm_dt,Tcrit,Tbot,a,b,nVial):
    """
    Defines the inequality constraints for lyophilization optimization within
    safe operation region inside the desgin space. Inputs are chamber pressure [Torr],
    sublimation rate [kg/hr], critical product temperature [degC], vial bottom
    temperature [degC], equipment capability parameters a [kg/hr] and b [kg/hr/Torr],
    and number of vials
    """

    C1 = a + b*Pch - nVial*dm_dt	# Equipment capability limit

    C2 = Tcrit - Tbot		# Maximum product temperature limit

    return C1,C2

Kv_FUN(KC, KP, KD, Pch)

Calculates the vial heat transfer coefficient.

Parameters:

Name Type Description Default
KC float

Vial heat transfer parameter [cal/s/K/cm^2].

required
KP float

Vial heat transfer parameter [cal/s/K/cm^2/Torr].

required
KD float

Vial heat transfer parameter [1/Torr].

required
Pch float

Chamber pressure [Torr].

required

Returns:

Type Description
float

Vial heat transfer coefficient [cal/s/K/cm^2].

Source code in lyopronto/functions.py
def Kv_FUN(KC,KP,KD,Pch):
    """Calculates the vial heat transfer coefficient.

    Args:
        KC (float): Vial heat transfer parameter [cal/s/K/cm^2].
        KP (float): Vial heat transfer parameter [cal/s/K/cm^2/Torr].
        KD (float): Vial heat transfer parameter [1/Torr].
        Pch (float): Chamber pressure [Torr].

    Returns:
        (float): Vial heat transfer coefficient [cal/s/K/cm^2].
    """

    return KC + KP*Pch/(1.0+KD*Pch) # Kv [cal/s/K/cm^2]

Lpr0_FUN(Vfill, Ap, cSolid)

Calculates the intial fill height of the frozen product [cm].

Parameters:

Name Type Description Default
Vfill float

fill volume [mL]

required
Ap float

product area [cm^2]

required
cSolid float

concentration of the solute in solution [g/mL]

required

Returns:

Type Description
float

initial fill height of the frozen product [cm].

Source code in lyopronto/functions.py
def Lpr0_FUN(Vfill,Ap,cSolid):
    """Calculates the intial fill height of the frozen product [cm].

    Args:
        Vfill (float): fill volume [mL]
        Ap (float): product area [cm^2]
        cSolid (float): concentration of the solute in solution [g/mL]

    Returns:
        (float): initial fill height of the frozen product [cm].
    """

    dens_fac = (constant.rho_solution-cSolid*(constant.rho_solution-constant.rho_ice)/constant.rho_solute)
    return Vfill/(Ap*constant.rho_ice)*dens_fac  # Fill height [cm]

Rp_FUN(L, R0, A1, A2)

Calculates product resistance [cm^2-hr-Torr/g].

Parameters:

Name Type Description Default
L float

cake length [cm]

required
R0 float

base product resistance [cm^2-hr-Torr/g]

required
A1 float

product resistance parameter [cm-hr-Torr/g]

required
A2 float

product resistance parameter [1/cm]

required

Returns:

Type Description
float

product resistance [cm^2-hr-Torr/g]

Source code in lyopronto/functions.py
def Rp_FUN(L,R0,A1,A2):
    """Calculates product resistance [cm^2-hr-Torr/g].

    Args:
        L (float): cake length [cm]
        R0 (float): base product resistance [cm^2-hr-Torr/g]
        A1 (float): product resistance parameter [cm-hr-Torr/g]
        A2 (float): product resistance parameter [1/cm]

    Returns:
        (float): product resistance [cm^2-hr-Torr/g]
    """

    return R0 + A1*L/(1+A2*L) # Product resistance [cm^2-hr-Torr/g]

Rp_finder(T_sub, Lpr0, Lck, Pch, Tbot)

Calculates product resistance [cm^2-hr-Torr/g]. Inputs are sublimation temperature [degC], initial product length [cm], cake length [cm], chamber pressure [Torr], and vial bottom temperature [degC]

Source code in lyopronto/functions.py
def Rp_finder(T_sub,Lpr0,Lck,Pch,Tbot):
    """
    Calculates product resistance [cm^2-hr-Torr/g]. Inputs are sublimation
    temperature [degC], initial product length [cm], cake length [cm],
    chamber pressure [Torr], and vial bottom temperature [degC]
    """

    P_sub = Vapor_pressure(T_sub)   # Vapor pressure at the sublimation temperature [Torr]

    Rp = (Lpr0-Lck)*(P_sub-Pch)*constant.dHs/(Tbot-T_sub)/constant.hr_To_s/constant.k_ice	# Product resistance [cm^2-Torr-hr/g]

    return Rp

T_bot_FUN(T_sub, Lpr0, Lck, Pch, Rp)

Calculates the temperature at the bottom of the vial [degC]. Inputs are sublimation front temperature [degC], initial product length [cm], cake length [cm], chamber pressure [Torr], and product resistance [cm^2-Torr-hr/g]

Source code in lyopronto/functions.py
def T_bot_FUN(T_sub,Lpr0,Lck,Pch,Rp):
    """
    Calculates the temperature at the bottom of the vial [degC]. Inputs are
    sublimation front temperature [degC], initial product length [cm], cake
    length [cm], chamber pressure [Torr], and product resistance
    [cm^2-Torr-hr/g]
    """

    P_sub = Vapor_pressure(T_sub)   # Vapor pressure at the sublimation temperature [Torr]

    Tbot = T_sub + (Lpr0-Lck)*(P_sub-Pch)*constant.dHs/Rp/constant.hr_To_s/constant.k_ice # Vial bottom temperature [degC]

    return Tbot

T_sub_Rp_finder(T_sub, *data)

Tsub is found from solving for T_unknown. Determines the function to calculate the sublimation temperature represented as T_unknown. Other inputs are chamber pressure [Torr], vial area [cm^2], product area [cm^2], vial heat transfer coefficient [cal/s/K/cm^2], initial product length [cm], cake length [cm], vial bottom temperature [degC], and shelf temperature [degC]

Source code in lyopronto/functions.py
def T_sub_Rp_finder(T_sub, *data):
    """
    Tsub is found from solving for T_unknown.
    Determines the function to calculate the sublimation temperature
    represented as T_unknown. Other inputs are chamber pressure [Torr], vial
    area [cm^2], product area [cm^2], vial heat transfer coefficient
    [cal/s/K/cm^2], initial product length [cm], cake length [cm], vial bottom
    temperature [degC], and shelf temperature [degC]
    """

    Av, Ap, Kv, Lpr0, Lck, Tbot, Tsh = data

    LHS = (Tsh - Tbot)*Av*Kv
    RHS = (Tbot - T_sub)*Ap*constant.k_ice/(Lpr0-Lck)

    return LHS-RHS

T_sub_fromTpr(T_unknown, *data)

Tsub is found from solving for T_unknown. # Determines the function to calculate the sublimation temperature represented as T_unknown. Other inputs are vial bottom temperature [degC], initial product length [cm], cake length [cm], chamber pressure [Torr], and product resistance [cm^2-Torr-hr/g]

Source code in lyopronto/functions.py
def T_sub_fromTpr(T_unknown, *data):
    """
    Tsub is found from solving for T_unknown.  # Determines the function to calculate the sublimation temperature
    represented as T_unknown. Other inputs are vial bottom temperature [degC],
    initial product length [cm], cake length [cm], chamber pressure [Torr],
    and product resistance [cm^2-Torr-hr/g]
    """

    Tbot, Lpr0, Lck, Pch, Rp = data

    P_sub = Vapor_pressure(T_unknown)   # Vapor pressure at the sublimation temperature [Torr]

    F = T_unknown - Tbot + (P_sub-Pch)*(Lpr0-Lck)*constant.dHs/Rp/constant.hr_To_s/constant.k_ice  # Heat and mass transfer energy balance function - Should be zero

    return F

T_sub_solver_FUN(T_sub_guess, *data)

Tsub is found from solving for T_unknown. Determines the function to calculate the sublimation temperature represented as T_unknown. Other inputs are chamber pressure [Torr], vial area [cm^2], product area [cm^2], vial heat transfer coefficient [cal/s/K/cm^2], initial product length [cm], cake length [cm], product resistance [cm^2-Torr-hr/g], and shelf temperature [degC]

Parameters:

Name Type Description Default
T_sub_guess float

Initial guess for the sublimation temperature [degC].

required
data tuple

Pch, Av, Ap, Kv, Lpr0, Lck, Rp, Tsh

()

Returns:

Type Description
float

residual for the pseudosteady heat balance

Source code in lyopronto/functions.py
def T_sub_solver_FUN(T_sub_guess, *data):
    """Tsub is found from solving for T_unknown.
    Determines the function to calculate the sublimation temperature
    represented as T_unknown. Other inputs are chamber pressure [Torr], vial
    area [cm^2], product area [cm^2], vial heat transfer coefficient
    [cal/s/K/cm^2], initial product length [cm], cake length [cm], product
    resistance [cm^2-Torr-hr/g], and shelf temperature [degC]

    Args:
        T_sub_guess (float): Initial guess for the sublimation temperature [degC].
        data (tuple): Pch, Av, Ap, Kv, Lpr0, Lck, Rp, Tsh

    Returns:
        (float): residual for the pseudosteady heat balance
    """

    Pch, Av, Ap, Kv, Lpr0, Lck, Rp, Tsh = data

    P_sub = Vapor_pressure(T_sub_guess)   # Vapor pressure at the sublimation temperature [Torr]
    Qsub = constant.dHs*(P_sub-Pch)*Ap/Rp /constant.hr_To_s # Sublimation heat
    T_b = T_sub_guess + Qsub/Ap/constant.k_ice*(Lpr0-Lck) # Corresponding bottom temperature
    Qsh = Kv*Av*(Tsh - T_b) # Heat transfer from shelf
    return Qsub-Qsh

Tbot_max_eq_cap(Pch, dm_dt, Lpr0, Lck, Rp, Ap)

Calculates the maximum product temperature (occurs at vial bottom) [degC]. Inputs are chamber pressure [Torr], sublimation rate based on equipment capability [kg/hr], initial product length [cm], cake length [cm], product resistance [cm^2-hr-Torr/g], and product area [cm^2]

Source code in lyopronto/functions.py
def Tbot_max_eq_cap(Pch,dm_dt,Lpr0,Lck,Rp,Ap):
    """
    Calculates the maximum product temperature (occurs at vial bottom)
    [degC]. Inputs are chamber pressure [Torr], sublimation rate based on
    equipment capability [kg/hr], initial product length [cm], cake length
    [cm], product resistance [cm^2-hr-Torr/g], and product area [cm^2]
    """

    P_sub = dm_dt/Ap*Rp + Pch     # Sublimation front pressure [Torr]
    T_sub = -6144.96/np.log(P_sub/2.698e10) - 273.15    # Sublimation front temperature [degC]
    Tbot = T_sub + (Lpr0-Lck)*(P_sub-Pch)*constant.dHs/Rp/constant.hr_To_s/constant.k_ice	# Vial bottom temperature [degC]
    Tbot_max = np.max(Tbot)   # Maximum vial bottom temperature [degC]

    return Tbot_max

Vapor_pressure(T_sub)

Calculates the vapor pressure [Torr]. Input is sublimation front temperature [degC]

Source code in lyopronto/functions.py
def Vapor_pressure(T_sub):
    """
    Calculates the vapor pressure [Torr]. Input is sublimation front
    temperature [degC]
    """

    p = 2.698e10*np.exp(-6144.96/(273.15+T_sub))   # Vapor pressure at the sublimation temperature [Torr]

    return p

calc_step(t, Lck, inputs)

Calculate the full set of system states at a given time step from ODE solution states.

Parameters:

Name Type Description Default
t float

The current time in hours.

required
Lck float

The cake thickness [cm].

required
inputs tuple

A tuple containing the inputs parameters.

required

Returns:

Type Description
ndarray

The full set of system states at the given time step: 0. Time [hr], 1. Sublimation front temperature [°C], 2. Vial bottom temperature [°C], 3. Shelf temperature [°C], 4. Chamber pressure [mTorr], 5. Sublimation flux [kg/hr/m²], 6. Drying percent [%]

Source code in lyopronto/functions.py
def calc_step(t, Lck, inputs):
    """Calculate the full set of system states at a given time step from ODE solution states.

    Args:
        t (float): The current time in hours.
        Lck (float): The cake thickness [cm].
        inputs (tuple): A tuple containing the inputs parameters.

    Returns:
        (np.ndarray): The full set of system states at the given time step:
            0. Time [hr],
            1. Sublimation front temperature [°C],
            2. Vial bottom temperature [°C],
            3. Shelf temperature [°C],
            4. Chamber pressure [mTorr],
            5. Sublimation flux [kg/hr/m²],
            6. Drying percent [%]
    """
    vial, product, ht, Pch_t, Tsh_t, dt, Lpr0 = inputs
    Tsh = Tsh_t(t)
    Pch = Pch_t(t)
    Kv = Kv_FUN(ht['KC'],ht['KP'],ht['KD'],Pch)  # Vial heat transfer coefficient [cal/s/K/cm^2]
    Rp = Rp_FUN(Lck,product['R0'],product['A1'],product['A2'])  # Product resistance [cm^2-hr-Torr/g]
    Tsub = fsolve(T_sub_solver_FUN, 250, args = (Pch,vial['Av'],vial['Ap'],Kv,Lpr0,Lck,Rp,Tsh))[0] # Sublimation front temperature [degC]
    dmdt = sub_rate(vial['Ap'],Rp,Tsub,Pch)   # Total sublimation rate [kg/hr]
    if dmdt<0:
        dmdt = 0.0
        Tsub = Tsh  # No sublimation, Tsub equals shelf temp
        Tbot = Tsh
    else:
        Tbot = T_bot_FUN(Tsub,Lpr0,Lck,Pch,Rp)    # Vial bottom temperature [degC]
    dry_percent = (Lck/Lpr0)*100

    col = np.array([t, Tsub, Tbot, Tsh, Pch*constant.Torr_to_mTorr, dmdt/(vial['Ap']*constant.cm_To_m**2), dry_percent])
    return col

crystallization_time_FUN(V, h, Av, Tf, Tn, Tsh_func, t0)

Calculates the crystallization time [hr]. Inputs are fill volume [mL], heat transfer coefficient [W/m^2/K], vial area [cm^2], freezing temperature [degC], nucleation temperature [degC], shelf temperature [degC]

Source code in lyopronto/functions.py
def crystallization_time_FUN(V,h,Av,Tf,Tn,Tsh_func,t0):
    """
    Calculates the crystallization time [hr]. Inputs are fill volume [mL], heat transfer coefficient [W/m^2/K], vial area [cm^2], freezing temperature [degC], nucleation temperature [degC], shelf temperature [degC]
    """

    # t = constant.rho_solution*V*(constant.dHf*constant.cal_To_J-constant.Cp_solution/constant.kg_To_g*(Tf-Tn))/h/constant.hr_To_s/Av/constant.cm_To_m**2/(Tf-Tsh)
    rhoV = constant.rho_solution*V  # Mass of the solution [g]
    Hf = constant.dHf*constant.cal_To_J # Fusion enthalpy [J/g]
    Cp = constant.Cp_solution/constant.kg_To_g # Specific heat capacity [J/g/K]
    hA = h*constant.hr_To_s * Av*constant.cm_To_m**2 # Heat transfer coefficient [J/K/hr]
    # t = rhoV*(Hf-Cp*(Tf-Tn))/hA/(Tf-Tsh) # time: g*(J/g- J/g/K*K)/(J/m^2/K/hr*m^2*K) = hr
    lhs = rhoV*(Hf-Cp*(Tf-Tn))/hA
    def integrand(dt):
        return Tf - Tsh_func(t0+dt)
    def resid(delta_t):
        integral, _ = quad(integrand, 0, delta_t)
        return integral - lhs
    delta_t = brentq(resid, 0, 100.0)
    return delta_t

fill_output(sols, inputs)

Fill the output array with the results from the ODE solver.

Parameters:

Name Type Description Default
sols list

A list of solution objects returned by the ODE solver.

required
inputs tuple

A tuple containing the input parameters.

required

Returns:

Type Description
ndarray

The output array filled with the results from the ODE solver.

Each call to calc_step requires a nonlinear solve for Tsub, so doing this for thousands of points is impractical. Instead, we calculate at the the ODE solver points, and interpolate elsewhere.

Source code in lyopronto/functions.py
def fill_output(sols, inputs):
    """Fill the output array with the results from the ODE solver.

    Args:
        sols (list): A list of solution objects returned by the ODE solver.
        inputs (tuple): A tuple containing the input parameters.

    Returns:
        (np.ndarray): The output array filled with the results from the ODE solver.

    Each call to calc_step requires a nonlinear solve for Tsub, so doing this for thousands 
    of points is impractical. Instead, we calculate at the the ODE solver points, and 
    interpolate elsewhere.
    """
    dt = inputs[5]

    total_len = np.sum([len(sol.t) for sol in sols])
    interp_points = np.zeros((total_len, 7))
    i = 0
    for sol in sols:
        for t, y in zip(sol.t, sol.y[0]):
            interp_points[i,:] = calc_step(t, y, inputs)
            i += 1
    sols_t, unique_inds = np.unique(np.concatenate([sol.t for sol in sols]), return_index=True)
    interp_uniques = interp_points[unique_inds, :]
    if dt is None:
        return interp_uniques

    out_t = np.arange(0, sol.t[-1], dt)   
    fullout = np.zeros((len(out_t), 7))
    interp_func = make_interp_spline(sols_t, interp_uniques, k=1)
    for i, t in enumerate(out_t):
        if np.any(sols_t == t):
            fullout[i,:] = interp_uniques[sols_t == t, :]
        else:
            fullout[i,:] = interp_func(t)

    return fullout

lumped_cap_Tpr_abstract(t, Tpr0, V, h, Av, Tsh, Tsh0, Tsh_ramp, rho, Cpi)

Calculates the product temperature [degC]. Inputs are time [hr], initial product temperature [degC], product density [g/mL], constant pressure specific heat of the product [J/kg/K], product volume [mL], heat transfer coefficient [W/m^2/K], vial area [cm^2], current shelf temperature [degC], initial shelf temperature [degC], shelf temperature ramping rate [degC/min]

Source code in lyopronto/functions.py
def lumped_cap_Tpr_abstract(t,Tpr0,V,h,Av,Tsh,Tsh0,Tsh_ramp,rho,Cpi):
    """
    Calculates the product temperature [degC]. Inputs are time [hr], initial product temperature [degC], product density [g/mL], constant pressure specific heat of the product [J/kg/K], product volume [mL], heat transfer coefficient [W/m^2/K], vial area [cm^2], current shelf temperature [degC], initial shelf temperature [degC], shelf temperature ramping rate [degC/min]
    """

    rr = Tsh_ramp/constant.min_To_s # Ramp rate [K/s]
    rhoV = rho*V  # Mass of solution [g]
    Cp = Cpi/constant.kg_To_g # Specific heat capacity [J/g/K]
    hA = h*Av*constant.cm_To_m**2  # Heat transfer coefficient times area [W/K]
    ts = t*constant.hr_To_s # Time [s]

    tau = rhoV*Cp/hA  # Time constant [s]
    asymp_T = (Tpr0 - Tsh0 + rr*rhoV*Cp/hA) # Prefactor in solution [degC]

    return asymp_T*np.exp(-ts/tau) - rr*tau + Tsh

sub_rate(Ap, Rp, T_sub, Pch)

Calculates the sublimation rate from each vial [kg/hr]. Inputs are product area [cm^2], product resistance [cm^2-Torr-hr/g], sublimation front temperature [degC], and chamber pressure [Torr]

Source code in lyopronto/functions.py
def sub_rate(Ap,Rp,T_sub,Pch):
    """
    Calculates the sublimation rate from each vial [kg/hr]. Inputs are
    product area [cm^2], product resistance [cm^2-Torr-hr/g], sublimation
    front temperature [degC], and chamber pressure [Torr]
    """

    P_sub = Vapor_pressure(T_sub)   # Vapor pressure at the sublimation temperature [Torr]

    dm_dt = Ap/Rp/constant.kg_To_g*(P_sub-Pch)  # Sublimation rate [kg/hr]

    return dm_dt

high_level

execute_simulation(inputs)

Run the selected simulation tool with the provided inputs. Returns output data based on the chosen simulation mode.

Source code in lyopronto/high_level.py
def execute_simulation(inputs):
    """
    Run the selected simulation tool with the provided inputs.
    Returns output data based on the chosen simulation mode.
    """
    sim_type = inputs["sim"]["tool"]
    output_data = None

    if sim_type == "Freezing Calculator":
        output_data = freezing.freeze(
            inputs["vial"],
            inputs["product"],
            inputs["h_freezing"],
            inputs["Tshelf"],
            inputs["dt"],
        )

    elif sim_type == "Primary Drying Calculator":
        if inputs["sim"]["Kv_known"] and inputs["sim"]["Rp_known"]:
            output_data = calc_knownRp.dry(
                inputs["vial"],
                inputs["product"],
                inputs["ht"],
                inputs["Pchamber"],
                inputs["Tshelf"],
                inputs["dt"],
            )
        elif not inputs["sim"]["Kv_known"] and inputs["sim"]["Rp_known"]:
            output_data = _optimize_kv_parameter(inputs)
        elif inputs["sim"]["Kv_known"] and not inputs["sim"]["Rp_known"]:
            output_data = _optimize_rp_parameter(inputs)
        else:
            raise ValueError(
                "With the current implementation, either Kv or Rp must be specified."
            )

    elif sim_type == "Design Space Generator":
        output_data = design_space.dry(
            inputs["vial"],
            inputs["product"],
            inputs["ht"],
            inputs["Pchamber"],
            inputs["Tshelf"],
            inputs["dt"],
            inputs["eq_cap"],
            inputs["nVial"],
        )

    elif sim_type == "Optimizer":
        output_data = _run_optimizer(inputs)

    else:
        raise ValueError(
            f"Invalid simulation tool {sim_type} selected. "
            "Valid options are: 'Freezing Calculator', 'Primary Drying Calculator', "
            "'Design Space Generator', 'Optimizer'."
        )

    return output_data

generate_visualizations(output_data, inputs, timestamp)

Create and save publication-quality plots based on simulation results.

Source code in lyopronto/high_level.py
def generate_visualizations(output_data, inputs, timestamp):
    """
    Create and save publication-quality plots based on simulation results.
    """

    # TODO: move these to kwargs for the function
    figure_props = {
        "figwidth": 30,
        "figheight": 20,
        "linewidth": 5,
        "marker_size": 20,
    }
    tool = inputs["sim"]["tool"]
    matplotlibrc("text.latex", preamble=r"\usepackage{color}")
    matplotlibrc("text", usetex=False)
    plt.rcParams["font.family"] = "Arial"

    if tool == "Freezing Calculator":
        _plot_freezing_results(output_data, figure_props, timestamp)
    elif tool in ["Primary Drying Calculator", "Optimizer"]:
        if tool == "Primary Drying Calculator" and not inputs["sim"]["Rp_known"]:
            _plot_rp_results(output_data, figure_props, timestamp)
            data = output_data[0]  # There are extra returns for Rp fitting
        else:
            data = output_data  # for all but unknown Rp, output_data is the only return
        _plot_drying_results(data, figure_props, timestamp)
    elif tool == "Design Space Generator":
        _plot_design_space(output_data, inputs, figure_props, timestamp)

read_inputs(filename)

Read inputs from a YAML file.

Source code in lyopronto/high_level.py
def read_inputs(filename):
    "Read inputs from a YAML file."
    with open(filename, "r") as yamlfile:
        inputs = yaml.load(yamlfile)
        if "product_temp_filename" in inputs:
            print(
                "Note: input specifies a product temperature data file. "
                + "This data should be loaded separately and added to the inputs "
                + "dictionary as `time_data` and `temp_data`."
            )
        return inputs

save_csv(output_data, inputs, timestamp)

Export simulation results to CSV file with appropriate formatting.

Source code in lyopronto/high_level.py
def save_csv(output_data, inputs, timestamp):
    """
    Export simulation results to CSV file with appropriate formatting.
    """
    filename = f"lyopronto_output_{timestamp}.csv"

    sim = inputs["sim"]

    if sim["tool"] == "Freezing Calculator":
        assert output_data.shape[1] == 3
        header = "Time [hr], Shelf Temp [°C], Product Temp [°C]"
        np.savetxt(filename, output_data, delimiter=", ", header=header)
    elif sim["tool"] == "Design Space Generator":
        _write_design_space_csv(output_data, inputs, filename)
    else:
        if sim["tool"] == "Primary Drying Calculator" and not sim["Rp_known"]:
            assert len(output_data) == 3  # output, product_res, params
            header = ",".join(
                [
                    "Time [hr]",
                    "Cake Length [cm]",
                    "Product Resistance [cm^2-hr-Torr/g]",
                ]
            )
            rpfile = f"lyo_Rp_data_{timestamp}.csv"
            np.savetxt(rpfile, output_data[1], delimiter=", ", header=header)
            data = output_data[0]
        else:
            data = output_data  # for all but unknown Rp, output_data is the only return

        header = ",".join(
            [
                "Time [hr]",
                "Sublimation Front Temp [°C]",
                "Vial Bottom Temperature [°C]",
                "Shelf Temp [°C]",
                "Chamber Pressure [mTorr]",
                "Sublimation Flux [kg/hr/m²]",
                "Percent Dried",
            ]
        )
        np.savetxt(filename, data, delimiter=", ", header=header)

save_inputs(inputs, timestamp)

Save inputs to a YAML file with timestamped filename.

Source code in lyopronto/high_level.py
def save_inputs(inputs, timestamp):
    "Save inputs to a YAML file with timestamped filename."
    copied = inputs.copy()
    # If the inputs include large arrays of data, strip those out
    copied.pop("time_data", None)
    copied.pop("temp_data", None)
    # Then save
    with open(f"lyopronto_input_{timestamp}.yaml", "w") as yamlfile:
        yaml.dump(copied, yamlfile)

save_inputs_legacy(inputs, timestamp)

Save inputs to a CSV file with timestamped filename.

Source code in lyopronto/high_level.py
def save_inputs_legacy(inputs, timestamp):
    """
    Save inputs to a CSV file with timestamped filename.
    """
    filename = f"lyopronto_input_{timestamp}.csv"
    sim = inputs["sim"]
    vial = inputs["vial"]
    product = inputs["product"]

    with open(filename, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)

        writer.writerow(["Simulation Tool", sim["tool"]])
        writer.writerow(["Kv Known", sim["Kv_known"]])
        writer.writerow(["Rp Known", sim["Rp_known"]])
        writer.writerow(["Variable Chamber Pressure", sim["Variable_Pch"]])
        writer.writerow(["Variable Shelf Temperature", sim["Variable_Tsh"]])
        writer.writerow([])

        writer.writerow(["Vial Cross-Section [cm²]", vial["Av"]])
        writer.writerow(["Product Area [cm²]", vial["Ap"]])
        writer.writerow(["Fill Volume [mL]", vial["Vfill"]])
        writer.writerow([])

        writer.writerow(["Fractional solute concentration:", product["cSolid"]])
        if sim["tool"] == "Freezing Calculator":
            writer.writerow(["Intial product temperature [C]:", product["Tpr0"]])
            writer.writerow(["Freezing temperature [C]:", product["Tf"]])
            writer.writerow(["Nucleation temperature [C]:", product["Tn"]])
        elif not (sim["tool"] == "Primary Drying Calculator" and not sim["Rp_known"]):
            writer.writerow(["R0 [cm^2-hr-Torr/g]:", product["R0"]])
            writer.writerow(["A1 [cm-hr-Torr/g]:", product["A1"]])
            writer.writerow(["A2 [1/cm]:", product["A2"]])
        if not (
            sim["tool"] == "Freezing Calculator"
            or sim["tool"] == "Primary Drying Calculator"
        ):
            writer.writerow(["Critical product temperature [C]:", product["T_pr_crit"]])

        if sim["tool"] == "Freezing Calculator":
            writer.writerow(["h_freezing [W/m^2/K]:", inputs["h_freezing"]])
        elif sim["Kv_known"]:
            writer.writerow(["KC [cal/s/K/cm^2]:", inputs["ht"]["KC"]])
            writer.writerow(["KP [cal/s/K/cm^2/Torr]:", inputs["ht"]["KP"]])
            writer.writerow(["KD [1/Torr]:", inputs["ht"]["KD"]])
        elif not sim["Kv_known"]:
            writer.writerow(["Kv range [cal/s/K/cm^2]:", inputs["Kv_range"][:]])
            writer.writerow(["Experimental drying time [hr]:", inputs["t_dry_exp"]])

        if sim["tool"] == "Freezing Calculator":
            0
        elif sim["tool"] == "Design Space Generator":
            writer.writerow(
                ["Chamber pressure set points [Torr]:", inputs["Pchamber"]["setpt"][:]]
            )
        elif not (sim["tool"] == "Optimizer" and sim["Variable_Pch"]):
            for i in range(len(inputs["Pchamber"]["setpt"])):
                writer.writerow(
                    [
                        "Chamber pressure setpoint [Torr]:",
                        inputs["Pchamber"]["setpt"][i],
                        "Duration [min]:",
                        inputs["Pchamber"]["dt_setpt"][i],
                    ]
                )
            writer.writerow(
                [
                    "Chamber pressure ramping rate [Torr/min]:",
                    inputs["Pchamber"]["ramp_rate"],
                ]
            )
        else:
            writer.writerow(
                ["Minimum chamber pressure [Torr]:", inputs["Pchamber"]["min"]]
            )
            writer.writerow(
                ["Maximum chamber pressure [Torr]:", inputs["Pchamber"]["max"]]
            )
        writer.writerow([""])

        if sim["tool"] == "Design Space Generator":
            writer.writerow(["Intial shelf temperature [C]:", inputs["Tshelf"]["init"]])
            writer.writerow(
                ["Shelf temperature set points [C]:", inputs["Tshelf"]["setpt"][:]]
            )
            writer.writerow(
                [
                    "Shelf temperature ramping rate [C/min]:",
                    inputs["Tshelf"]["ramp_rate"],
                ]
            )
        elif not (sim["tool"] == "Optimizer" and sim["Variable_Tsh"]):
            for i in range(len(inputs["Tshelf"]["setpt"])):
                writer.writerow(
                    [
                        "Shelf temperature setpoint [C]:",
                        inputs["Tshelf"]["setpt"][i],
                        "Duration [min]:",
                        inputs["Tshelf"]["dt_setpt"][i],
                    ]
                )
            writer.writerow(
                [
                    "Shelf temperature ramping rate [C/min]:",
                    inputs["Tshelf"]["ramp_rate"],
                ]
            )
        else:
            writer.writerow(["Minimum shelf temperature [C]:", inputs["Tshelf"]["min"]])
            writer.writerow(["Maximum shelf temperature [C]:", inputs["Tshelf"]["max"]])

        writer.writerow(["Time Step [hr]", inputs["dt"]])
        writer.writerow(["Equipment Parameter a [kg/hr]", inputs["eq_cap"]["a"]])
        writer.writerow(["Equipment Parameter b [kg/hr/Torr]", inputs["eq_cap"]["b"]])
        writer.writerow(["Number of Vials", inputs["nVial"]])

plot_styling

axis_style_designspace(ax, ylabel, **kwargs)

Function to set styling for axes, with pressure on x and sublimation flux on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_designspace(ax, ylabel, **kwargs):  
    """ Function to set styling for axes, with pressure on x and sublimation flux on y.
    See axis_tick_styling for more usable kwargs.
    """
    gcafontSize = kwargs.get('gcafontSize',60)
    ax.set_xlabel("Chamber Pressure [mTorr]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel(ylabel,fontsize=gcafontSize,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_style_percdried(ax, **kwargs)

Function to set styling for axes, with time on x and percent dried on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_percdried( ax, **kwargs):  
    """ Function to set styling for axes, with time on x and percent dried on y.
    See axis_tick_styling for more usable kwargs.
    """
    color = kwargs.get('color','k')
    gcafontSize = kwargs.get('gcafontSize',60)
    ax.set_xlabel("Time [hr]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel("Percent Dried",fontsize=gcafontSize,color=color,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_style_pressure(ax, **kwargs)

Function to set styling for axes, with time on x and pressure on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_pressure(ax, **kwargs):
    """ Function to set styling for axes, with time on x and pressure on y.
    See axis_tick_styling for more usable kwargs.
    """
    color = kwargs.get('color','b')
    gcafontSize = kwargs.get('gcafontSize',60)
    ax.set_xlabel("Time [hr]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel("Chamber Pressure [mTorr]",fontsize=gcafontSize,color=color,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_style_rp(ax, **kwargs)

Function to set styling for axes, with dry layer height on x and product resistance on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_rp(ax, **kwargs):
    """ Function to set styling for axes, with dry layer height on x and product resistance on y.
    See axis_tick_styling for more usable kwargs.
    """
    color = kwargs.get('color','k')
    gcafontSize = kwargs.get('gcafontSize',60)  
    ax.set_xlabel("Dry Layer Height [cm]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel("Product Resistance [cm$^2$ hr Torr/g]",fontsize=gcafontSize,color=color,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_style_subflux(ax, **kwargs)

Function to set styling for axes, with time on x and sublimation flux on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_subflux(ax, **kwargs):  
    """ Function to set styling for axes, with time on x and sublimation flux on y.
    See axis_tick_styling for more usable kwargs.
    """
    color = kwargs.get('color',[0, 0.7, 0.3])
    gcafontSize = kwargs.get('gcafontSize',60)
    ax.set_xlabel("Time [hr]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel("Sublimation Flux [kg/hr/m$^2$]",fontsize=gcafontSize,color=color,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_style_temperature(ax, **kwargs)

Function to set styling for axes, with time on x and temperature on y. See axis_tick_styling for more usable kwargs.

Source code in lyopronto/plot_styling.py
def axis_style_temperature(ax, **kwargs):  
    """ Function to set styling for axes, with time on x and temperature on y.
    See axis_tick_styling for more usable kwargs.
    """
    color = kwargs.get('color','k')
    gcafontSize = kwargs.get('gcafontSize',60)  
    ax.set_xlabel("Time [hr]",fontsize=gcafontSize,**default_font_spec)
    ax.set_ylabel("Temperature [°C]",fontsize=gcafontSize,color=color,**default_font_spec)
    axis_tick_styling(ax, **kwargs)

axis_tick_styling(ax, color='k', gcafontSize=60, majorTickWidth=5, minorTickWidth=3, majorTickLength=30, minorTickLength=20, labelPad=30)

summary

Parameters:

Name Type Description Default
ax Axes

Axes object to style

required
color str

Axis and tick color. Defaults to 'k'.

'k'
gcafontSize int

Font size for tick labels (and axis labels). Defaults to 60.

60
majorTickWidth int

Width of major ticks. Defaults to 5.

5
minorTickWidth int

Width of minor ticks. Defaults to 3.

3
majorTickLength int

Length of major ticks. Defaults to 30.

30
minorTickLength int

Length of minor ticks. Defaults to 20.

20
labelPad int

padding between axes and axis labels. Defaults to 30.

30
Source code in lyopronto/plot_styling.py
def axis_tick_styling(
        ax,
        color = 'k',
        gcafontSize = 60,
        majorTickWidth = 5,
        minorTickWidth = 3,
        majorTickLength = 30,
        minorTickLength = 20,
        labelPad = 30,
    ):
    """_summary_

    Args:
        ax (matplotlib.Axes.Axes): Axes object to style
        color (str, optional): Axis and tick color. Defaults to 'k'.
        gcafontSize (int, optional): Font size for tick labels (and axis labels). Defaults to 60.
        majorTickWidth (int, optional): Width of major ticks. Defaults to 5.
        minorTickWidth (int, optional): Width of minor ticks. Defaults to 3.
        majorTickLength (int, optional): Length of major ticks. Defaults to 30.
        minorTickLength (int, optional): Length of minor ticks. Defaults to 20.
        labelPad (int, optional): padding between axes and axis labels. Defaults to 30.
    """    
    ax.minorticks_on()
    ax.tick_params(axis='both',direction='in',pad=labelPad,width=majorTickWidth,length=majorTickLength,bottom=1,top=0)
    ax.tick_params(axis='both',which='minor',direction='in',width=minorTickWidth,length=minorTickLength)
    ax.tick_params(axis='both',labelsize=gcafontSize,labelfontfamily=default_font_spec['fontname'])
    ax.tick_params(axis='y',which='both',color=color, labelcolor=color)
    for tick in [*ax.get_xticklabels(), *ax.get_yticklabels()]:
        tick.set_fontweight('bold')
    ax.xaxis.labelpad = labelPad
    ax.yaxis.labelpad = labelPad