NetworkX, CVXOPT, FEniCS & Python Serialization
Advanced scientific computing libraries for graph theory, convex optimization, finite element analysis, and data serialization in Python.
NetworkX - Graph Theory and Network Analysis
What is NetworkX?
NetworkX is a Python package for creating, manipulating, and studying complex networks and graphs. It provides data structures and algorithms for working with graphs, digraphs, and multigraphs.
Key Features: - Graph data structures for various graph types - Algorithms for centrality, clustering, shortest paths - Graph generators for common network models - Visualization capabilities - Integration with NumPy and SciPy
Basic NetworkX Usage
import networkx as nx
import matplotlib.pyplot as plt
# Create an empty graph
G = nx.Graph()
# Add nodes
G.add_node(1)
G.add_nodes_from([2, 3, 4])
# Add edges
G.add_edge(1, 2)
G.add_edges_from([(2, 3), (3, 4), (1, 4)])
# Basic properties
print(f"Number of nodes: {G.number_of_nodes()}")
print(f"Number of edges: {G.number_of_edges()}")
print(f"Nodes: {list(G.nodes())}")
print(f"Edges: {list(G.edges())}")
# Degree of nodes
degrees = dict(G.degree())
print(f"Node degrees: {degrees}")
# Visualize the graph
nx.draw(G, with_labels=True)
plt.show()
Graph Algorithms
Centrality Measures
# Create a more complex graph
G = nx.erdos_renyi_graph(20, 0.1, seed=42)
# Degree centrality
degree_centrality = nx.degree_centrality(G)
# Betweenness centrality
betweenness_centrality = nx.betweenness_centrality(G)
# Closeness centrality
closeness_centrality = nx.closeness_centrality(G)
# Eigenvector centrality
eigenvector_centrality = nx.eigenvector_centrality(G, max_iter=1000)
# Print top 5 nodes by eigenvector centrality
sorted_nodes = sorted(eigenvector_centrality.items(), key=lambda x: x[1], reverse=True)
for node, centrality in sorted_nodes[:5]:
print(f"Node {node}: eigenvector centrality = {centrality:.3f}")
Shortest Paths and Connectivity
# Shortest path between nodes
shortest_path = nx.shortest_path(G, source=0, target=10)
print(f"Shortest path from 0 to 10: {shortest_path}")
# Shortest path lengths
shortest_path_lengths = nx.shortest_path_length(G)
print(f"Average shortest path length: {nx.average_shortest_path_length(G):.2f}")
# Connected components
components = list(nx.connected_components(G))
print(f"Number of connected components: {len(components)}")
# Clustering coefficient
clustering = nx.clustering(G)
print(f"Average clustering coefficient: {nx.average_clustering(G):.3f}")
Advanced Graph Analysis
Community Detection
# Louvain community detection
try:
from community import community_louvain
import matplotlib.cm as cm
# Convert to undirected graph if needed
if isinstance(G, nx.DiGraph):
G_undirected = G.to_undirected()
else:
G_undirected = G
# Detect communities
partition = community_louvain.best_partition(G_undirected)
# Visualize communities
pos = nx.spring_layout(G)
cmap = cm.get_cmap('viridis', max(partition.values()) + 1)
nx.draw_networkx_nodes(G, pos, partition.keys(),
node_color=list(partition.values()),
cmap=cmap)
nx.draw_networkx_edges(G, pos, alpha=0.5)
plt.show()
except ImportError:
print("python-louvain package not installed")
Network Metrics and Analysis
# Network properties
print("Network Statistics:")
print(f"Density: {nx.density(G):.3f}")
print(f"Diameter: {nx.diameter(G)}")
print(f"Radius: {nx.radius(G)}")
print(f"Average degree: {sum(dict(G.degree()).values()) / G.number_of_nodes():.2f}")
# Transitivity
print(f"Transitivity: {nx.transitivity(G):.3f}")
# Small world properties
if nx.is_connected(G):
sigma = nx.sigma(G) # Small-world coefficient
omega = nx.omega(G) # Small-world coefficient alternative
print(f"Small-world coefficient (sigma): {sigma:.3f}")
print(f"Small-world coefficient (omega): {omega:.3f}")
CVXOPT - Convex Optimization
What is CVXOPT?
CVXOPT is a Python library for convex optimization that provides solvers for linear programming, quadratic programming, second-order cone programming, and semidefinite programming.
Convex Optimization Basics
Convex optimization problems can be written as:
minimize f₀(x)
subject to fᵢ(x) ≤ 0, i = 1,...,m
Ax = b
Where f₀, f₁,... are convex functions.
Linear Programming Example
from cvxopt import matrix, solvers
# Linear programming example
# maximize: 2*x + y
# subject to: x + y ≤ 1, x ≥ 0, y ≥ 0
# Convert to standard form: minimize -2*x - y
c = matrix([-2.0, -1.0]) # Coefficients for objective function
G = matrix([[1.0, -1.0, 0.0],
[1.0, 0.0, -1.0]]) # Inequality constraints matrix
h = matrix([1.0, 0.0, 0.0]) # Inequality constraints vector
# Solve
sol = solvers.lp(c, G, h)
print(f"Optimal solution: x={sol['x'][0]:.3f}, y={sol['x'][1]:.3f}")
print(f"Optimal value: {sol['primal objective']:.3f}")
Quadratic Programming
# Portfolio optimization example
import numpy as np
# Generate sample returns and covariance
np.random.seed(42)
n_assets = 5
returns = np.random.randn(n_assets) * 0.1 + 0.05 # Expected returns
cov_matrix = np.random.rand(n_assets, n_assets)
cov_matrix = cov_matrix.T @ cov_matrix # Make positive semidefinite
# Minimize portfolio variance: minimize (1/2) x^T P x
# Subject to: returns^T x >= target_return, sum(x) = 1, x >= 0
target_return = 0.08
P = matrix(2 * cov_matrix) # P in quadratic form
q = matrix(np.zeros(n_assets)) # Linear term
G = matrix(np.vstack([-returns, -np.eye(n_assets)]))
h = matrix(np.concatenate([-np.array([target_return]), np.zeros(n_assets)]))
A = matrix(np.ones((1, n_assets))) # Equality constraints
b = matrix([1.0]) # Right-hand side
# Solve QP
sol = solvers.qp(P, q, G, h, A, b)
weights = np.array(sol['x']).flatten()
portfolio_return = returns @ weights
portfolio_risk = np.sqrt(weights.T @ cov_matrix @ weights)
print(f"Optimal weights: {weights}")
print(f"Portfolio return: {portfolio_return:.3f}")
print(f"Portfolio risk: {portfolio_risk:.3f}")
Advanced Optimization Problems
Support Vector Machine Dual
# SVM dual problem for binary classification
def svm_dual(X, y, C=1.0):
"""
Solve SVM dual problem using CVXOPT
"""
m, n = X.shape
# P = X @ X.T * y_i * y_j
P = matrix(np.outer(y, y) * (X @ X.T))
q = matrix(-np.ones(m))
G = matrix(np.vstack([-np.eye(m), np.eye(m)]))
h = matrix(np.concatenate([np.zeros(m), C * np.ones(m)]))
A = matrix(y, (1, m))
b = matrix(0.0)
# Solve QP
sol = solvers.qp(P, q, G, h, A, b)
# Extract results
alphas = np.array(sol['x']).flatten()
# Find support vectors
support_vectors = alphas > 1e-5
alpha_sv = alphas[support_vectors]
# Calculate weights
w = np.sum(alpha_sv[:, np.newaxis] * y[support_vectors, np.newaxis] * X[support_vectors], axis=0)
# Calculate bias
b = np.mean(y[support_vectors] - X[support_vectors] @ w)
return w, b, alphas
# Example usage
X = np.array([[1, 2], [2, 3], [3, 3], [2, 1]])
y = np.array([1, 1, -1, -1])
w, b, alphas = svm_dual(X, y)
print(f"Weights: {w}, Bias: {b}")
FEniCS - Finite Element Methods
What is FEniCS?
FEniCS is an open-source computing platform for solving partial differential equations (PDEs) using the finite element method. It provides automated code generation for finite element computations.
Basic FEniCS Workflow
Poisson Equation Example
try:
import fenics as fe
import matplotlib.pyplot as plt
# Create mesh
mesh = fe.UnitSquareMesh(32, 32) # 32x32 elements
# Define function space (linear Lagrange elements)
V = fe.FunctionSpace(mesh, 'P', 1)
# Define boundary conditions
def boundary(x, on_boundary):
return on_boundary
bc = fe.DirichletBC(V, fe.Constant(0.0), boundary)
# Define variational problem
u = fe.TrialFunction(V)
v = fe.TestFunction(V)
# Right-hand side (source term)
f = fe.Expression('10*exp(-(pow(x[0] - 0.5, 2) + pow(x[1] - 0.5, 2)) / (2 * pow(0.2, 2)))',
degree=2)
# Bilinear form (LHS)
a = fe.dot(fe.grad(u), fe.grad(v)) * fe.dx
# Linear form (RHS)
L = f * v * fe.dx
# Solve
u = fe.Function(V)
fe.solve(a == L, u, bc)
# Plot solution
p = fe.plot(u)
plt.colorbar(p)
plt.title('Poisson Equation Solution')
plt.show()
# Compute L2 norm
error_L2 = fe.errornorm(fe.Constant(0), u, 'L2')
print(f"L2 error: {error_L2:.6f}")
except ImportError:
print("FEniCS not installed. Install with: conda install -c conda-forge fenics")
Navier-Stokes Equations
# Steady Navier-Stokes example (simplified)
def solve_navier_stokes():
# Mesh and function spaces
mesh = fe.RectangleMesh(fe.Point(0, 0), fe.Point(1, 1), 32, 32)
# Velocity function space (Taylor-Hood elements)
V = fe.VectorFunctionSpace(mesh, 'P', 2)
Q = fe.FunctionSpace(mesh, 'P', 1)
W = V * Q # Mixed function space
# Boundary conditions
inflow_profile = fe.Expression(('-sin(pi*x[1])', '0'), degree=2)
def inflow_boundary(x, on_boundary):
return on_boundary and fe.near(x[0], 0)
def outflow_boundary(x, on_boundary):
return on_boundary and fe.near(x[0], 1)
def walls_boundary(x, on_boundary):
return on_boundary and (fe.near(x[1], 0) or fe.near(x[1], 1))
# Define boundaries
bc_in = fe.DirichletBC(W.sub(0), inflow_profile, inflow_boundary)
bc_out = fe.DirichletBC(W.sub(0).sub(1), fe.Constant(0), outflow_boundary)
bc_walls = fe.DirichletBC(W.sub(0), fe.Constant((0, 0)), walls_boundary)
bc_pressure = fe.DirichletBC(W.sub(1), fe.Constant(0), outflow_boundary)
bcs = [bc_in, bc_out, bc_walls, bc_pressure]
# Define variational problem
(u, p) = fe.TrialFunctions(W)
(v, q) = fe.TestFunctions(W)
# Parameters
Re = fe.Constant(1000) # Reynolds number
# Weak formulation
F = (fe.inner(fe.grad(u), fe.grad(v)) / Re * fe.dx +
fe.inner(fe.grad(u)*u, v) * fe.dx -
fe.div(v) * p * fe.dx -
q * fe.div(u) * fe.dx)
# Solve nonlinear problem
w = fe.Function(W)
fe.solve(F == 0, w, bcs,
solver_parameters={"newton_solver": {"linear_solver": "mumps"}})
(velocity, pressure) = w.split()
return velocity, pressure, mesh
Complex PDE Systems
Heat Transfer with Convection
def heat_transfer_convection():
# Problem: ∂T/∂t + u·∇T - α∇²T = 0 (steady-state convection-diffusion)
mesh = fe.UnitSquareMesh(20, 20)
V = fe.FunctionSpace(mesh, 'P', 1)
# Boundary conditions
def hot_wall(x, on_boundary):
return on_boundary and x[0] < fe.DOLFIN_EPS
def cold_wall(x, on_boundary):
return on_boundary and x[0] > 1 - fe.DOLFIN_EPS
bc_hot = fe.DirichletBC(V, fe.Constant(1.0), hot_wall)
bc_cold = fe.DirichletBC(V, fe.Constant(0.0), cold_wall)
# Define velocity field (circular convection)
velocity = fe.Expression(('0.5 - x[1]', 'x[0] - 0.5'), degree=1)
# Parameters
Pe = fe.Constant(100) # Peclet number
# Variational formulation
u = fe.TrialFunction(V)
v = fe.TestFunction(V)
# Steady convection-diffusion equation
a = (fe.dot(velocity, fe.grad(u)) * v +
(1/Pe) * fe.dot(fe.grad(u), fe.grad(v))) * fe.dx
L = fe.Constant(0) * v * fe.dx
# Solve
T = fe.Function(V)
problem = fe.LinearVariationalProblem(a, L, T, [bc_hot, bc_cold])
solver = fe.LinearVariationalSolver(problem)
solver.solve()
return T
Python Serialization Techniques
Standard Library Serialization
Pickle - General Python Object Serialization
import pickle
import numpy as np
# Serialize Python objects
data = {
'array': np.random.randn(100, 100),
'metadata': {'version': '1.0', 'date': '2025-01-15'},
'function': lambda x: x**2, # Warning: not recommended
'custom_class': {'attribute': 'value'}
}
# Save to file
with open('data.pkl', 'wb') as f:
pickle.dump(data, f)
# Load from file
with open('data.pkl', 'rb') as f:
loaded_data = pickle.load(f)
print(f"Original array shape: {data['array'].shape}")
print(f"Loaded array shape: {loaded_data['array'].shape}")
JSON - JavaScript Object Notation
import json
import datetime
# Custom JSON encoder for datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
elif isinstance(obj, np.ndarray):
return {'__numpy_array__': obj.tolist()}
return super().default(obj)
# Custom JSON decoder
def datetime_decoder(dct):
if '__numpy_array__' in dct:
return np.array(dct['__numpy_array__'])
for key, value in dct.items():
if isinstance(value, str):
# Try to parse as datetime
try:
dct[key] = datetime.datetime.fromisoformat(value)
continue
except:
pass
return dct
# Serialize with custom types
data = {
'timestamp': datetime.datetime.now(),
'array': np.array([1, 2, 3, 4, 5]),
'config': {'learning_rate': 0.001, 'epochs': 100}
}
# Save with custom encoder
with open('data.json', 'w') as f:
json.dump(data, f, cls=DateTimeEncoder, indent=2)
# Load with custom decoder
with open('data.json', 'r') as f:
loaded_data = json.load(f, object_hook=datetime_decoder)
print(f"Loaded timestamp: {loaded_data['timestamp']}")
print(f"Loaded array: {loaded_data['array']}")
Advanced Serialization Libraries
Dill - Enhanced Pickle
import dill
# Dill can serialize more Python objects than pickle
def complex_function(x):
return sum([i**2 for i in range(x)]) if x > 0 else 0
data = {
'lambda': lambda x: x**3,
'nested_function': complex_function,
'generator': (x**2 for x in range(10)),
'partial_func': lambda: None # Will be bound properly
}
# Serialize complex objects
with open('complex_data.dill', 'wb') as f:
dill.dump(data, f)
with open('complex_data.dill', 'rb') as f:
loaded = dill.load(f)
print("Dill successfully serialized complex Python objects")
MessagePack - Efficient Binary Serialization
try:
import msgpack
import msgpack_numpy as m # For NumPy array support
# Enable NumPy serialization
m.patch()
# Efficient binary serialization
scientific_data = {
'experiment_name': 'quantum_simulation',
'parameters': np.array([1.23, 4.56, 7.89]),
'results': np.random.randn(1000, 1000),
'metadata': {
'temperature_K': 4.2,
'pressure_Pa': 1e-9,
'measurement_time': '2025-01-15T10:30:00'
}
}
# Serialize to MessagePack
packed = msgpack.packb(scientific_data, use_bin_type=True)
# Deserialize
unpacked = msgpack.unpackb(packed, raw=False, use_list=False)
print(f"MessagePack size: {len(packed)} bytes")
print(f"Array shape preserved: {unpacked['results'].shape}")
except ImportError:
print("MessagePack with NumPy support not installed")
print("Install with: pip install msgpack msgpack-numpy")
HDF5 for Large Dataset Serialization
Efficient Large Array Storage
import h5py
# For very large datasets beyond memory
def serialize_large_scientific_data(filename, data_dict):
"""Serialize large scientific datasets to HDF5"""
with h5py.File(filename, 'w') as f:
# Store metadata
metadata_grp = f.create_group('metadata')
for key, value in data_dict['metadata'].items():
if isinstance(value, str):
metadata_grp.attrs[key] = value
else:
metadata_grp.create_dataset(key, data=value)
# Store large arrays with compression
arrays_grp = f.create_group('arrays')
for name, array in data_dict['arrays'].items():
# Choose optimal compression based on data size
if array.nbytes > 100 * 1024 * 1024: # > 100MB
compression = 'gzip'
compression_opts = 6
else:
compression = 'lzf'
arrays_grp.create_dataset(name,
data=array,
compression=compression,
compression_opts=compression_opts if compression == 'gzip' else None,
chunks=True,
shuffle=True)
# Store complex data structures via serialization
if 'models' in data_dict:
models_grp = f.create_group('models')
pickled_models = {}
for model_name, model_obj in data_dict['models'].items():
# Pickle the model object
pickled_data = pickle.dumps(model_obj)
models_grp.create_dataset(model_name,
data=np.frombuffer(pickled_data, dtype='uint8'),
compression='gzip')
def deserialize_large_scientific_data(filename):
"""Deserialize data from HDF5"""
data_dict = {'metadata': {}, 'arrays': {}, 'models': {}}
with h5py.File(filename, 'r') as f:
# Load metadata
for key, value in f['metadata'].attrs.items():
data_dict['metadata'][key] = value
for key in f['metadata'].keys():
if key not in f['metadata'].attrs:
data_dict['metadata'][key] = f[f'metadata/{key}'][:]
# Load arrays
for name in f['arrays'].keys():
data_dict['arrays'][name] = f[f'arrays/{name}'][:]
# Load models
for name in f['models'].keys():
pickled_data = f[f'models/{name}'][:].tobytes()
data_dict['models'][name] = pickle.loads(pickled_data)
return data_dict
Serialization Performance Comparison
import time
import pickle
try:
import cPickle
pickle_module = cPickle
except ImportError:
pickle_module = pickle
# Generate test data
test_data = {
'arrays': [np.random.randn(1000, 1000) for _ in range(10)],
'metadata': {'experiment': 'performance_test', 'size': '10GB'},
'scalars': list(range(10000))
}
# Performance comparison
methods = [('pickle', pickle), ('json', json), ('msgpack', locals().get('msgpack'))]
for method_name, module in methods:
if module is None:
print(f"{method_name}: Not available")
continue
try:
# Serialize
start = time.time()
if method_name == 'json':
# JSON doesn't handle NumPy arrays natively
continue
elif method_name == 'msgpack':
serialized = module.packb(test_data)
else:
serialized = module.dumps(test_data)
serialize_time = time.time() - start
# Deserialize
start = time.time()
if method_name == 'msgpack':
deserialized = module.unpackb(serialized)
else:
deserialized = module.loads(serialized)
deserialize_time = time.time() - start
print(f"{method_name}:")
print(f" Serialize: {serialize_time:.3f}s")
print(f" Deserialize: {deserialize_time:.3f}s")
print(f" Size: {len(serialized)} bytes")
except Exception as e:
print(f"{method_name}: Error - {e}")
These advanced Python libraries provide powerful capabilities for complex scientific computing tasks ranging from network analysis and optimization to finite element simulations and data persistence. Each library addresses specific needs in modern computational science and engineering.
This guide covers advanced scientific computing libraries. Each library has extensive documentation for specific use cases and applications.
Updated: January 15, 2025
Author: Danial Pahlavan
Category: Scientific Computing