NetworkX, CVXOPT, FEniCS & Python Serialization

Advanced scientific computing libraries for graph theory, convex optimization, finite element analysis, and data serialization in Python.


NetworkX - Graph Theory and Network Analysis

What is NetworkX?

NetworkX is a Python package for creating, manipulating, and studying complex networks and graphs. It provides data structures and algorithms for working with graphs, digraphs, and multigraphs.

Key Features: - Graph data structures for various graph types - Algorithms for centrality, clustering, shortest paths - Graph generators for common network models - Visualization capabilities - Integration with NumPy and SciPy

Basic NetworkX Usage

import networkx as nx
import matplotlib.pyplot as plt

# Create an empty graph
G = nx.Graph()

# Add nodes
G.add_node(1)
G.add_nodes_from([2, 3, 4])

# Add edges
G.add_edge(1, 2)
G.add_edges_from([(2, 3), (3, 4), (1, 4)])

# Basic properties
print(f"Number of nodes: {G.number_of_nodes()}")
print(f"Number of edges: {G.number_of_edges()}")
print(f"Nodes: {list(G.nodes())}")
print(f"Edges: {list(G.edges())}")

# Degree of nodes
degrees = dict(G.degree())
print(f"Node degrees: {degrees}")

# Visualize the graph
nx.draw(G, with_labels=True)
plt.show()

Graph Algorithms

Centrality Measures

# Create a more complex graph
G = nx.erdos_renyi_graph(20, 0.1, seed=42)

# Degree centrality
degree_centrality = nx.degree_centrality(G)

# Betweenness centrality
betweenness_centrality = nx.betweenness_centrality(G)

# Closeness centrality
closeness_centrality = nx.closeness_centrality(G)

# Eigenvector centrality
eigenvector_centrality = nx.eigenvector_centrality(G, max_iter=1000)

# Print top 5 nodes by eigenvector centrality
sorted_nodes = sorted(eigenvector_centrality.items(), key=lambda x: x[1], reverse=True)
for node, centrality in sorted_nodes[:5]:
    print(f"Node {node}: eigenvector centrality = {centrality:.3f}")

Shortest Paths and Connectivity

# Shortest path between nodes
shortest_path = nx.shortest_path(G, source=0, target=10)
print(f"Shortest path from 0 to 10: {shortest_path}")

# Shortest path lengths
shortest_path_lengths = nx.shortest_path_length(G)
print(f"Average shortest path length: {nx.average_shortest_path_length(G):.2f}")

# Connected components
components = list(nx.connected_components(G))
print(f"Number of connected components: {len(components)}")

# Clustering coefficient
clustering = nx.clustering(G)
print(f"Average clustering coefficient: {nx.average_clustering(G):.3f}")

Advanced Graph Analysis

Community Detection

# Louvain community detection
try:
    from community import community_louvain
    import matplotlib.cm as cm

    # Convert to undirected graph if needed
    if isinstance(G, nx.DiGraph):
        G_undirected = G.to_undirected()
    else:
        G_undirected = G

    # Detect communities
    partition = community_louvain.best_partition(G_undirected)

    # Visualize communities
    pos = nx.spring_layout(G)
    cmap = cm.get_cmap('viridis', max(partition.values()) + 1)
    nx.draw_networkx_nodes(G, pos, partition.keys(),
                          node_color=list(partition.values()),
                          cmap=cmap)

    nx.draw_networkx_edges(G, pos, alpha=0.5)
    plt.show()

except ImportError:
    print("python-louvain package not installed")

Network Metrics and Analysis

# Network properties
print("Network Statistics:")
print(f"Density: {nx.density(G):.3f}")
print(f"Diameter: {nx.diameter(G)}")
print(f"Radius: {nx.radius(G)}")
print(f"Average degree: {sum(dict(G.degree()).values()) / G.number_of_nodes():.2f}")

# Transitivity
print(f"Transitivity: {nx.transitivity(G):.3f}")

# Small world properties
if nx.is_connected(G):
    sigma = nx.sigma(G)  # Small-world coefficient
    omega = nx.omega(G)  # Small-world coefficient alternative
    print(f"Small-world coefficient (sigma): {sigma:.3f}")
    print(f"Small-world coefficient (omega): {omega:.3f}")

CVXOPT - Convex Optimization

What is CVXOPT?

CVXOPT is a Python library for convex optimization that provides solvers for linear programming, quadratic programming, second-order cone programming, and semidefinite programming.

Convex Optimization Basics

Convex optimization problems can be written as:

minimize    f₀(x)
subject to  fᵢ(x) ≤ 0, i = 1,...,m
            Ax = b

Where f₀, f₁,... are convex functions.

Linear Programming Example

from cvxopt import matrix, solvers

# Linear programming example
# maximize: 2*x + y
# subject to: x + y ≤ 1, x ≥ 0, y ≥ 0

# Convert to standard form: minimize -2*x - y
c = matrix([-2.0, -1.0])  # Coefficients for objective function
G = matrix([[1.0, -1.0, 0.0],
            [1.0, 0.0, -1.0]])  # Inequality constraints matrix
h = matrix([1.0, 0.0, 0.0])    # Inequality constraints vector

# Solve
sol = solvers.lp(c, G, h)
print(f"Optimal solution: x={sol['x'][0]:.3f}, y={sol['x'][1]:.3f}")
print(f"Optimal value: {sol['primal objective']:.3f}")

Quadratic Programming

# Portfolio optimization example
import numpy as np

# Generate sample returns and covariance
np.random.seed(42)
n_assets = 5
returns = np.random.randn(n_assets) * 0.1 + 0.05  # Expected returns
cov_matrix = np.random.rand(n_assets, n_assets)
cov_matrix = cov_matrix.T @ cov_matrix  # Make positive semidefinite

# Minimize portfolio variance: minimize (1/2) x^T P x
# Subject to: returns^T x >= target_return, sum(x) = 1, x >= 0

target_return = 0.08
P = matrix(2 * cov_matrix)  # P in quadratic form
q = matrix(np.zeros(n_assets))  # Linear term
G = matrix(np.vstack([-returns, -np.eye(n_assets)]))
h = matrix(np.concatenate([-np.array([target_return]), np.zeros(n_assets)]))
A = matrix(np.ones((1, n_assets)))  # Equality constraints
b = matrix([1.0])  # Right-hand side

# Solve QP
sol = solvers.qp(P, q, G, h, A, b)
weights = np.array(sol['x']).flatten()
portfolio_return = returns @ weights
portfolio_risk = np.sqrt(weights.T @ cov_matrix @ weights)

print(f"Optimal weights: {weights}")
print(f"Portfolio return: {portfolio_return:.3f}")
print(f"Portfolio risk: {portfolio_risk:.3f}")

Advanced Optimization Problems

Support Vector Machine Dual

# SVM dual problem for binary classification
def svm_dual(X, y, C=1.0):
    """
    Solve SVM dual problem using CVXOPT
    """
    m, n = X.shape

    # P = X @ X.T * y_i * y_j
    P = matrix(np.outer(y, y) * (X @ X.T))
    q = matrix(-np.ones(m))
    G = matrix(np.vstack([-np.eye(m), np.eye(m)]))
    h = matrix(np.concatenate([np.zeros(m), C * np.ones(m)]))
    A = matrix(y, (1, m))
    b = matrix(0.0)

    # Solve QP
    sol = solvers.qp(P, q, G, h, A, b)

    # Extract results
    alphas = np.array(sol['x']).flatten()

    # Find support vectors
    support_vectors = alphas > 1e-5
    alpha_sv = alphas[support_vectors]

    # Calculate weights
    w = np.sum(alpha_sv[:, np.newaxis] * y[support_vectors, np.newaxis] * X[support_vectors], axis=0)

    # Calculate bias
    b = np.mean(y[support_vectors] - X[support_vectors] @ w)

    return w, b, alphas

# Example usage
X = np.array([[1, 2], [2, 3], [3, 3], [2, 1]])
y = np.array([1, 1, -1, -1])
w, b, alphas = svm_dual(X, y)
print(f"Weights: {w}, Bias: {b}")

FEniCS - Finite Element Methods

What is FEniCS?

FEniCS is an open-source computing platform for solving partial differential equations (PDEs) using the finite element method. It provides automated code generation for finite element computations.

Basic FEniCS Workflow

Poisson Equation Example

try:
    import fenics as fe
    import matplotlib.pyplot as plt

    # Create mesh
    mesh = fe.UnitSquareMesh(32, 32)  # 32x32 elements

    # Define function space (linear Lagrange elements)
    V = fe.FunctionSpace(mesh, 'P', 1)

    # Define boundary conditions
    def boundary(x, on_boundary):
        return on_boundary

    bc = fe.DirichletBC(V, fe.Constant(0.0), boundary)

    # Define variational problem
    u = fe.TrialFunction(V)
    v = fe.TestFunction(V)

    # Right-hand side (source term)
    f = fe.Expression('10*exp(-(pow(x[0] - 0.5, 2) + pow(x[1] - 0.5, 2)) / (2 * pow(0.2, 2)))',
                     degree=2)

    # Bilinear form (LHS)
    a = fe.dot(fe.grad(u), fe.grad(v)) * fe.dx

    # Linear form (RHS)
    L = f * v * fe.dx

    # Solve
    u = fe.Function(V)
    fe.solve(a == L, u, bc)

    # Plot solution
    p = fe.plot(u)
    plt.colorbar(p)
    plt.title('Poisson Equation Solution')
    plt.show()

    # Compute L2 norm
    error_L2 = fe.errornorm(fe.Constant(0), u, 'L2')
    print(f"L2 error: {error_L2:.6f}")

except ImportError:
    print("FEniCS not installed. Install with: conda install -c conda-forge fenics")
# Steady Navier-Stokes example (simplified)
def solve_navier_stokes():
    # Mesh and function spaces
    mesh = fe.RectangleMesh(fe.Point(0, 0), fe.Point(1, 1), 32, 32)

    # Velocity function space (Taylor-Hood elements)
    V = fe.VectorFunctionSpace(mesh, 'P', 2)
    Q = fe.FunctionSpace(mesh, 'P', 1)
    W = V * Q  # Mixed function space

    # Boundary conditions
    inflow_profile = fe.Expression(('-sin(pi*x[1])', '0'), degree=2)

    def inflow_boundary(x, on_boundary):
        return on_boundary and fe.near(x[0], 0)

    def outflow_boundary(x, on_boundary):
        return on_boundary and fe.near(x[0], 1)

    def walls_boundary(x, on_boundary):
        return on_boundary and (fe.near(x[1], 0) or fe.near(x[1], 1))

    # Define boundaries
    bc_in = fe.DirichletBC(W.sub(0), inflow_profile, inflow_boundary)
    bc_out = fe.DirichletBC(W.sub(0).sub(1), fe.Constant(0), outflow_boundary)
    bc_walls = fe.DirichletBC(W.sub(0), fe.Constant((0, 0)), walls_boundary)
    bc_pressure = fe.DirichletBC(W.sub(1), fe.Constant(0), outflow_boundary)

    bcs = [bc_in, bc_out, bc_walls, bc_pressure]

    # Define variational problem
    (u, p) = fe.TrialFunctions(W)
    (v, q) = fe.TestFunctions(W)

    # Parameters
    Re = fe.Constant(1000)  # Reynolds number

    # Weak formulation
    F = (fe.inner(fe.grad(u), fe.grad(v)) / Re * fe.dx +
         fe.inner(fe.grad(u)*u, v) * fe.dx -
         fe.div(v) * p * fe.dx -
         q * fe.div(u) * fe.dx)

    # Solve nonlinear problem
    w = fe.Function(W)
    fe.solve(F == 0, w, bcs,
             solver_parameters={"newton_solver": {"linear_solver": "mumps"}})

    (velocity, pressure) = w.split()

    return velocity, pressure, mesh

Complex PDE Systems

Heat Transfer with Convection

def heat_transfer_convection():
    # Problem: ∂T/∂t + u·∇T - α∇²T = 0 (steady-state convection-diffusion)

    mesh = fe.UnitSquareMesh(20, 20)
    V = fe.FunctionSpace(mesh, 'P', 1)

    # Boundary conditions
    def hot_wall(x, on_boundary):
        return on_boundary and x[0] < fe.DOLFIN_EPS

    def cold_wall(x, on_boundary):
        return on_boundary and x[0] > 1 - fe.DOLFIN_EPS

    bc_hot = fe.DirichletBC(V, fe.Constant(1.0), hot_wall)
    bc_cold = fe.DirichletBC(V, fe.Constant(0.0), cold_wall)

    # Define velocity field (circular convection)
    velocity = fe.Expression(('0.5 - x[1]', 'x[0] - 0.5'), degree=1)

    # Parameters
    Pe = fe.Constant(100)  # Peclet number

    # Variational formulation
    u = fe.TrialFunction(V)
    v = fe.TestFunction(V)

    # Steady convection-diffusion equation
    a = (fe.dot(velocity, fe.grad(u)) * v +
         (1/Pe) * fe.dot(fe.grad(u), fe.grad(v))) * fe.dx

    L = fe.Constant(0) * v * fe.dx

    # Solve
    T = fe.Function(V)
    problem = fe.LinearVariationalProblem(a, L, T, [bc_hot, bc_cold])
    solver = fe.LinearVariationalSolver(problem)
    solver.solve()

    return T

Python Serialization Techniques

Standard Library Serialization

Pickle - General Python Object Serialization

import pickle
import numpy as np

# Serialize Python objects
data = {
    'array': np.random.randn(100, 100),
    'metadata': {'version': '1.0', 'date': '2025-01-15'},
    'function': lambda x: x**2,  # Warning: not recommended
    'custom_class': {'attribute': 'value'}
}

# Save to file
with open('data.pkl', 'wb') as f:
    pickle.dump(data, f)

# Load from file
with open('data.pkl', 'rb') as f:
    loaded_data = pickle.load(f)

print(f"Original array shape: {data['array'].shape}")
print(f"Loaded array shape: {loaded_data['array'].shape}")

JSON - JavaScript Object Notation

import json
import datetime

# Custom JSON encoder for datetime objects
class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        elif isinstance(obj, np.ndarray):
            return {'__numpy_array__': obj.tolist()}
        return super().default(obj)

# Custom JSON decoder
def datetime_decoder(dct):
    if '__numpy_array__' in dct:
        return np.array(dct['__numpy_array__'])
    for key, value in dct.items():
        if isinstance(value, str):
            # Try to parse as datetime
            try:
                dct[key] = datetime.datetime.fromisoformat(value)
                continue
            except:
                pass
    return dct

# Serialize with custom types
data = {
    'timestamp': datetime.datetime.now(),
    'array': np.array([1, 2, 3, 4, 5]),
    'config': {'learning_rate': 0.001, 'epochs': 100}
}

# Save with custom encoder
with open('data.json', 'w') as f:
    json.dump(data, f, cls=DateTimeEncoder, indent=2)

# Load with custom decoder
with open('data.json', 'r') as f:
    loaded_data = json.load(f, object_hook=datetime_decoder)

print(f"Loaded timestamp: {loaded_data['timestamp']}")
print(f"Loaded array: {loaded_data['array']}")

Advanced Serialization Libraries

Dill - Enhanced Pickle

import dill

# Dill can serialize more Python objects than pickle
def complex_function(x):
    return sum([i**2 for i in range(x)]) if x > 0 else 0

data = {
    'lambda': lambda x: x**3,
    'nested_function': complex_function,
    'generator': (x**2 for x in range(10)),
    'partial_func': lambda: None  # Will be bound properly
}

# Serialize complex objects
with open('complex_data.dill', 'wb') as f:
    dill.dump(data, f)

with open('complex_data.dill', 'rb') as f:
    loaded = dill.load(f)

print("Dill successfully serialized complex Python objects")

MessagePack - Efficient Binary Serialization

try:
    import msgpack
    import msgpack_numpy as m  # For NumPy array support

    # Enable NumPy serialization
    m.patch()

    # Efficient binary serialization
    scientific_data = {
        'experiment_name': 'quantum_simulation',
        'parameters': np.array([1.23, 4.56, 7.89]),
        'results': np.random.randn(1000, 1000),
        'metadata': {
            'temperature_K': 4.2,
            'pressure_Pa': 1e-9,
            'measurement_time': '2025-01-15T10:30:00'
        }
    }

    # Serialize to MessagePack
    packed = msgpack.packb(scientific_data, use_bin_type=True)

    # Deserialize
    unpacked = msgpack.unpackb(packed, raw=False, use_list=False)

    print(f"MessagePack size: {len(packed)} bytes")
    print(f"Array shape preserved: {unpacked['results'].shape}")

except ImportError:
    print("MessagePack with NumPy support not installed")
    print("Install with: pip install msgpack msgpack-numpy")

HDF5 for Large Dataset Serialization

Efficient Large Array Storage

import h5py

# For very large datasets beyond memory
def serialize_large_scientific_data(filename, data_dict):
    """Serialize large scientific datasets to HDF5"""
    with h5py.File(filename, 'w') as f:
        # Store metadata
        metadata_grp = f.create_group('metadata')
        for key, value in data_dict['metadata'].items():
            if isinstance(value, str):
                metadata_grp.attrs[key] = value
            else:
                metadata_grp.create_dataset(key, data=value)

        # Store large arrays with compression
        arrays_grp = f.create_group('arrays')
        for name, array in data_dict['arrays'].items():
            # Choose optimal compression based on data size
            if array.nbytes > 100 * 1024 * 1024:  # > 100MB
                compression = 'gzip'
                compression_opts = 6
            else:
                compression = 'lzf'

            arrays_grp.create_dataset(name,
                                    data=array,
                                    compression=compression,
                                    compression_opts=compression_opts if compression == 'gzip' else None,
                                    chunks=True,
                                    shuffle=True)

        # Store complex data structures via serialization
        if 'models' in data_dict:
            models_grp = f.create_group('models')
            pickled_models = {}
            for model_name, model_obj in data_dict['models'].items():
                # Pickle the model object
                pickled_data = pickle.dumps(model_obj)
                models_grp.create_dataset(model_name,
                                        data=np.frombuffer(pickled_data, dtype='uint8'),
                                        compression='gzip')

def deserialize_large_scientific_data(filename):
    """Deserialize data from HDF5"""
    data_dict = {'metadata': {}, 'arrays': {}, 'models': {}}

    with h5py.File(filename, 'r') as f:
        # Load metadata
        for key, value in f['metadata'].attrs.items():
            data_dict['metadata'][key] = value

        for key in f['metadata'].keys():
            if key not in f['metadata'].attrs:
                data_dict['metadata'][key] = f[f'metadata/{key}'][:]

        # Load arrays
        for name in f['arrays'].keys():
            data_dict['arrays'][name] = f[f'arrays/{name}'][:]

        # Load models
        for name in f['models'].keys():
            pickled_data = f[f'models/{name}'][:].tobytes()
            data_dict['models'][name] = pickle.loads(pickled_data)

    return data_dict

Serialization Performance Comparison

import time
import pickle
try:
    import cPickle
    pickle_module = cPickle
except ImportError:
    pickle_module = pickle

# Generate test data
test_data = {
    'arrays': [np.random.randn(1000, 1000) for _ in range(10)],
    'metadata': {'experiment': 'performance_test', 'size': '10GB'},
    'scalars': list(range(10000))
}

# Performance comparison
methods = [('pickle', pickle), ('json', json), ('msgpack', locals().get('msgpack'))]

for method_name, module in methods:
    if module is None:
        print(f"{method_name}: Not available")
        continue

    try:
        # Serialize
        start = time.time()
        if method_name == 'json':
            # JSON doesn't handle NumPy arrays natively
            continue
        elif method_name == 'msgpack':
            serialized = module.packb(test_data)
        else:
            serialized = module.dumps(test_data)
        serialize_time = time.time() - start

        # Deserialize
        start = time.time()
        if method_name == 'msgpack':
            deserialized = module.unpackb(serialized)
        else:
            deserialized = module.loads(serialized)
        deserialize_time = time.time() - start

        print(f"{method_name}:")
        print(f"  Serialize: {serialize_time:.3f}s")
        print(f"  Deserialize: {deserialize_time:.3f}s")
        print(f"  Size: {len(serialized)} bytes")

    except Exception as e:
        print(f"{method_name}: Error - {e}")

These advanced Python libraries provide powerful capabilities for complex scientific computing tasks ranging from network analysis and optimization to finite element simulations and data persistence. Each library addresses specific needs in modern computational science and engineering.

This guide covers advanced scientific computing libraries. Each library has extensive documentation for specific use cases and applications.

Updated: January 15, 2025
Author: Danial Pahlavan
Category: Scientific Computing