BitConverter / src /app.py
wenjun99's picture
Update src/app.py
f5d8baa verified
import streamlit as st
import pandas as pd
import io
import re
import numpy as np
import openpyxl
import base64
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from scipy.stats import gaussian_kde
from PIL import Image
# =========================
# Streamlit App Setup
# =========================
st.set_page_config(page_title="DNA ↔ Binary Converter", layout="wide")
st.title("DNA ↔ Binary Converter")
# =========================
# Encoding Schemes
# =========================
ENCODING_OPTIONS = ["Voyager 6-bit", "Base64 (6-bit)", "ASCII (7-bit)", "UTF-8 (8-bit)"]
BITS_PER_UNIT = {
"Voyager 6-bit": 6,
"Base64 (6-bit)": 6,
"ASCII (7-bit)": 7,
"UTF-8 (8-bit)": 8,
}
# =========================
# Voyager ASCII 6-bit Table
# =========================
voyager_table = {
i: ch for i, ch in enumerate([
' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I',
'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S',
'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '.', ',', '(',
')','+', '-', '*', '/', '=', '$', '!', ':', '%',
'"', '#', '@', "'", '?', '&'
])
}
reverse_voyager_table = {v: k for k, v in voyager_table.items()}
B64_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
# =========================
# Encoding Functions
# =========================
def encode_to_binary(text: str, scheme: str) -> tuple[list[int], list[str]]:
"""
Returns (flat_bits, display_units).
display_units is a list of labels for each chunk (character, byte, or Base64 symbol).
"""
if scheme == "Voyager 6-bit":
bits = []
for char in text:
val = reverse_voyager_table.get(char.upper(), 0)
bits.extend([(val >> b) & 1 for b in range(5, -1, -1)])
return bits, list(text.upper())
elif scheme == "ASCII (7-bit)":
bits = []
for c in text:
val = ord(c) & 0x7F
bits.extend([(val >> b) & 1 for b in range(6, -1, -1)])
return bits, list(text)
elif scheme == "UTF-8 (8-bit)":
raw = text.encode("utf-8")
bits = []
for byte in raw:
bits.extend([(byte >> b) & 1 for b in range(7, -1, -1)])
# For display: show hex byte value and the character it belongs to
labels = [f"0x{b:02X}" for b in raw]
return bits, labels
elif scheme == "Base64 (6-bit)":
b64_str = base64.b64encode(text.encode("utf-8")).decode("ascii")
bits = []
clean = b64_str.rstrip("=")
for c in clean:
val = B64_ALPHABET.index(c)
bits.extend([(val >> b) & 1 for b in range(5, -1, -1)])
return bits, list(clean)
return [], []
# =========================
# Decoding Functions
# =========================
def decode_from_binary(bits: list[int], scheme: str) -> str:
if scheme == "Voyager 6-bit":
chars = []
for i in range(0, len(bits), 6):
chunk = bits[i:i + 6]
if len(chunk) < 6:
chunk += [0] * (6 - len(chunk))
val = sum(b << (5 - j) for j, b in enumerate(chunk))
chars.append(voyager_table.get(val, '?'))
return ''.join(chars)
elif scheme == "ASCII (7-bit)":
chars = []
for i in range(0, len(bits), 7):
chunk = bits[i:i + 7]
if len(chunk) < 7:
chunk += [0] * (7 - len(chunk))
val = sum(b << (6 - j) for j, b in enumerate(chunk))
chars.append(chr(val) if 32 <= val < 127 else '?')
return ''.join(chars)
elif scheme == "UTF-8 (8-bit)":
byte_list = []
for i in range(0, len(bits), 8):
chunk = bits[i:i + 8]
if len(chunk) < 8:
chunk += [0] * (8 - len(chunk))
val = sum(b << (7 - j) for j, b in enumerate(chunk))
byte_list.append(val)
return bytes(byte_list).decode("utf-8", errors="replace")
elif scheme == "Base64 (6-bit)":
chars = []
for i in range(0, len(bits), 6):
chunk = bits[i:i + 6]
if len(chunk) < 6:
chunk += [0] * (6 - len(chunk))
val = sum(b << (5 - j) for j, b in enumerate(chunk))
chars.append(B64_ALPHABET[val])
b64_str = ''.join(chars)
# Add Base64 padding
while len(b64_str) % 4 != 0:
b64_str += '='
try:
return base64.b64decode(b64_str).decode("utf-8", errors="replace")
except Exception:
return "[Base64 decode error]"
return ""
# =========================
# Tabs
# =========================
tab1, tab2, tab3, tab4, tab5 = st.tabs(["Encoding", "Decoding", "Image Preview", "Data Analytics", "Writing"])
# --------------------------------------------------
# TAB 1: Text/Image → Binary
# --------------------------------------------------
with tab1:
st.markdown("""
Convert text or an image into binary labels.
Choose an input mode, encoding scheme, and control grouping.
""")
input_mode = st.selectbox("Input mode:", ["Text", "Image"], key="input_mode")
if input_mode == "Text":
st.subheader("Step 1 – Choose Encoding & Input Text")
encoding_scheme = st.selectbox(
"Encoding scheme:",
ENCODING_OPTIONS,
index=0,
key="enc_scheme",
help=(
"**Voyager 6-bit** – Custom 56-character table (A-Z, 0-9, punctuation). 6 bits/char.\n\n"
"**Base64 (6-bit)** – Standard Base64 encoding of UTF-8 bytes. 6 bits/symbol.\n\n"
"**ASCII (7-bit)** – Standard 7-bit ASCII. 7 bits/char.\n\n"
"**UTF-8 (8-bit)** – Full UTF-8 byte encoding. 8 bits/byte. Supports all Unicode."
)
)
bits_per = BITS_PER_UNIT[encoding_scheme]
if encoding_scheme == "Voyager 6-bit":
supported = ''.join(voyager_table[i] for i in range(len(voyager_table)))
st.caption(f"Supported characters ({len(voyager_table)}): `{supported}`")
user_input = st.text_input("Enter your text:", value="DNA", key="input_text")
col1, col2 = st.columns([2, 1])
with col1:
group_size = st.slider("Select number of target positions:", min_value=12, max_value=128, value=25)
with col2:
custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=512, value=group_size)
if custom_cols != group_size:
group_size = custom_cols
if user_input:
binary_labels, display_units = encode_to_binary(user_input, encoding_scheme)
binary_concat = ''.join(map(str, binary_labels))
unit_label = "Byte" if encoding_scheme == "UTF-8 (8-bit)" else "Character"
st.markdown(f"### Output 1 – Binary Labels per {unit_label}")
st.caption(f"Encoding: **{encoding_scheme}** — {bits_per} bits per {unit_label.lower()}")
grouped_bits = [binary_labels[i:i + bits_per] for i in range(0, len(binary_labels), bits_per)]
scroll_html = (
"<div style='max-height:300px; overflow-y:auto; font-family:monospace; "
"padding:6px; border:1px solid #ccc;'>"
)
for i, bits in enumerate(grouped_bits):
label = display_units[i] if i < len(display_units) else "?"
scroll_html += f"<div>'{label}' → {bits}</div>"
scroll_html += "</div>"
st.markdown(scroll_html, unsafe_allow_html=True)
per_char_lines = []
for i, bits in enumerate(grouped_bits):
label = display_units[i] if i < len(display_units) else "?"
per_char_lines.append(f"'{label}' → {''.join(map(str, bits))}")
st.download_button(
f"⬇️ Download Binary per {unit_label} (.txt)",
data='\n'.join(per_char_lines),
file_name="binary_per_unit.txt",
mime="text/plain",
key="download_per_unit"
)
st.download_button(
"⬇️ Download Concatenated Binary String",
data=binary_concat,
file_name="binary_full.txt",
mime="text/plain",
key="download_binary_txt"
)
st.markdown("### Output 2 – Binary matrix split into reactions grouped by target position")
groups = []
for i in range(0, len(binary_labels), group_size):
group = binary_labels[i:i + group_size]
if len(group) < group_size:
group += [0] * (group_size - len(group))
groups.append(group)
columns = [f"Position {i+1}" for i in range(group_size)]
df = pd.DataFrame(groups, columns=columns)
df.insert(0, "Sample", range(1, len(df) + 1))
st.dataframe(df, width="stretch")
st.download_button(
"⬇️ Download as CSV",
df.to_csv(index=False),
file_name=f"binary_labels_{group_size}_positions.csv",
mime="text/csv",
key="download_binary_csv"
)
else:
st.info("👆 Enter text above to see binary labels.")
# =====================================================
# IMAGE INPUT MODE
# =====================================================
else:
st.subheader("Step 1 – Upload Image & Set Resolution")
uploaded_img = st.file_uploader(
"Upload an image (PNG, JPG, BMP, etc.):",
type=["png", "jpg", "jpeg", "bmp", "gif", "tiff", "webp"],
key="img_uploader"
)
if uploaded_img is not None:
img = Image.open(uploaded_img).convert("L") # grayscale
orig_w, orig_h = img.size
aspect = orig_h / orig_w
st.image(img, caption=f"Original (grayscale) — {orig_w}×{orig_h} px", use_container_width=True)
st.markdown("#### ⚙️ Resolution & Threshold")
target_width = st.slider(
"Output width (pixels):",
min_value=8, max_value=min(orig_w, 256), value=min(64, orig_w), step=1,
help="Height is auto-calculated from aspect ratio. Each pixel = 1 bit."
)
target_height = max(1, int(round(target_width * aspect)))
total_bits = target_width * target_height
st.caption(f"Output size: **{target_width} × {target_height}** = **{total_bits:,}** bits (pixels)")
threshold = st.slider(
"Black/white threshold:",
min_value=0, max_value=255, value=128,
help="Pixels darker than this → 1 (black). Brighter → 0 (white)."
)
# Resize & threshold
img_resized = img.resize((target_width, target_height), Image.LANCZOS)
img_array = np.array(img_resized)
binary_matrix = (img_array < threshold).astype(int) # dark = 1, light = 0
# Show preview
st.markdown("### Preview — Black & White Output")
col_prev1, col_prev2 = st.columns(2)
with col_prev1:
st.image(img_resized, caption=f"Resized grayscale ({target_width}×{target_height})", use_container_width=True)
with col_prev2:
bw_display = Image.fromarray(((1 - binary_matrix) * 255).astype(np.uint8))
st.image(bw_display, caption=f"Binary B&W ({target_width}×{target_height})", use_container_width=True)
# Flatten to binary labels
binary_labels = binary_matrix.flatten().tolist()
binary_concat = ''.join(map(str, binary_labels))
st.markdown("### Output 1 – Image Info")
st.markdown(
f"- **Dimensions:** {target_width} × {target_height} \n"
f"- **Total bits:** {total_bits:,} \n"
f"- **Black pixels (1s):** {sum(binary_labels):,} \n"
f"- **White pixels (0s):** {total_bits - sum(binary_labels):,}"
)
st.download_button(
"⬇️ Download Concatenated Binary String",
data=binary_concat,
file_name="image_binary_full.txt",
mime="text/plain",
key="download_img_binary_txt"
)
# Output as matrix with width = target_width
st.markdown("### Output 2 – Binary Matrix (rows = pixel rows)")
columns = [f"Position {i+1}" for i in range(target_width)]
df_img = pd.DataFrame(binary_matrix, columns=columns)
df_img.insert(0, "Sample", range(1, len(df_img) + 1))
st.dataframe(df_img, width="stretch")
st.download_button(
"⬇️ Download as CSV",
df_img.to_csv(index=False),
file_name=f"image_binary_{target_width}x{target_height}.csv",
mime="text/csv",
key="download_img_csv"
)
# Also offer custom grouping (same as text mode)
st.markdown("### Output 3 – Custom Grouped Matrix")
col1, col2 = st.columns([2, 1])
with col1:
img_group_size = st.slider(
"Select number of target positions:",
min_value=12, max_value=128, value=target_width, key="img_group_slider"
)
with col2:
img_custom_cols = st.number_input(
"Or enter custom number:",
min_value=1, max_value=512, value=img_group_size, key="img_custom_cols"
)
if img_custom_cols != img_group_size:
img_group_size = img_custom_cols
groups = []
for i in range(0, len(binary_labels), img_group_size):
group = binary_labels[i:i + img_group_size]
if len(group) < img_group_size:
group += [0] * (img_group_size - len(group))
groups.append(group)
columns_g = [f"Position {i+1}" for i in range(img_group_size)]
df_grouped = pd.DataFrame(groups, columns=columns_g)
df_grouped.insert(0, "Sample", range(1, len(df_grouped) + 1))
st.dataframe(df_grouped, width="stretch")
st.download_button(
"⬇️ Download Grouped CSV",
df_grouped.to_csv(index=False),
file_name=f"image_binary_grouped_{img_group_size}_positions.csv",
mime="text/csv",
key="download_img_grouped_csv"
)
else:
st.info("👆 Upload an image to encode it as binary.")
# --------------------------------------------------
# TAB 2: Binary → Text
# --------------------------------------------------
with tab2:
st.markdown("""
Convert binary data back into readable text.
Upload either:
- `.csv` file with 0/1 values (any number of columns/rows)
- `.xlsx` Excel file
- `.txt` file containing a concatenated binary string (e.g. `010101...`)
""")
decode_scheme = st.selectbox(
"Decoding scheme (must match the encoding used):",
ENCODING_OPTIONS,
index=0,
key="dec_scheme",
help="Select the same encoding scheme that was used to produce the binary data."
)
uploaded_decode = st.file_uploader(
"Upload your file (.csv, .xlsx, or .txt):",
type=["csv", "xlsx", "txt"],
key="decode_uploader"
)
if uploaded_decode is not None:
try:
if uploaded_decode.name.endswith(".csv"):
df = pd.read_csv(uploaded_decode)
bits = df.values.flatten().astype(int).tolist()
elif uploaded_decode.name.endswith(".xlsx"):
df = pd.read_excel(uploaded_decode)
bits = df.values.flatten().astype(int).tolist()
elif uploaded_decode.name.endswith(".txt"):
content = uploaded_decode.read().decode().strip()
bits = [int(b) for b in content if b in ['0', '1']]
else:
bits = []
if not bits:
st.warning("No binary data detected.")
else:
recovered_text = decode_from_binary(bits, decode_scheme)
st.success(f"✅ Conversion complete using **{decode_scheme}**!")
st.markdown("**Recovered text:**")
st.text_area("Output", recovered_text, height=150)
st.download_button(
"⬇️ Download Recovered Text (.txt)",
data=recovered_text,
file_name="recovered_text.txt",
mime="text/plain",
key="download_recovered"
)
except Exception as e:
st.error(f"Error reading or converting file: {e}")
else:
st.info("👆 Upload a file to start the reverse conversion.")
# --------------------------------------------------
# TAB 3: Image Preview
# --------------------------------------------------
with tab3:
st.header("🖼️ Image Preview")
st.markdown("""
Render binary data (0/1) as a **black & white image**.
Upload a binary matrix CSV (rows × positions) or a concatenated binary `.txt` string.
""")
img_preview_file = st.file_uploader(
"📤 Upload binary data file (.csv, .xlsx, or .txt):",
type=["csv", "xlsx", "txt"],
key="img_preview_uploader"
)
if img_preview_file is not None:
try:
# --- Load binary data ---
if img_preview_file.name.endswith(".csv"):
idf = pd.read_csv(img_preview_file)
# Drop Sample column if present
if "Sample" in idf.columns or "sample" in idf.columns:
idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"])
bits_matrix = idf.values.flatten().astype(int)
detected_width = len(idf.columns)
elif img_preview_file.name.endswith(".xlsx"):
idf = pd.read_excel(img_preview_file)
if "Sample" in idf.columns or "sample" in idf.columns:
idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"])
bits_matrix = idf.values.flatten().astype(int)
detected_width = len(idf.columns)
elif img_preview_file.name.endswith(".txt"):
content = img_preview_file.read().decode().strip()
bits_matrix = np.array([int(b) for b in content if b in ['0', '1']])
detected_width = None
else:
bits_matrix = np.array([])
detected_width = None
if len(bits_matrix) == 0:
st.warning("No binary data detected.")
else:
total_bits = len(bits_matrix)
st.success(f"✅ Loaded **{total_bits:,}** bits.")
# --- Width control ---
st.markdown("#### ⚙️ Image Dimensions")
if detected_width and detected_width > 1:
default_w = detected_width
st.caption(f"Auto-detected width from columns: **{detected_width}**")
else:
# Guess a square-ish default
default_w = max(1, int(np.sqrt(total_bits)))
img_width = st.number_input(
"Image width (pixels / positions per row):",
min_value=1, max_value=total_bits, value=default_w, step=1,
key="img_preview_width"
)
img_height = int(np.ceil(total_bits / img_width))
st.caption(f"Image size: **{img_width} × {img_height}** = **{img_width * img_height:,}** pixels "
f"({total_bits:,} bits, {img_width * img_height - total_bits} padded)")
# Pad to fill the last row
padded = np.zeros(img_width * img_height, dtype=int)
padded[:total_bits] = bits_matrix[:total_bits]
img_data = padded.reshape((img_height, img_width))
# Render: 1 = black (0), 0 = white (255)
img_render = ((1 - img_data) * 255).astype(np.uint8)
pil_img = Image.fromarray(img_render, mode="L")
st.markdown("### 🖼️ Rendered Image")
# Use nearest-neighbor scaling for crisp pixels
display_scale = max(1, 256 // img_width)
display_w = img_width * display_scale
display_h = img_height * display_scale
pil_display = pil_img.resize((display_w, display_h), Image.NEAREST)
st.image(pil_display, caption=f"Binary image — {img_width}×{img_height} (1=black, 0=white)")
# Stats
ones = int(bits_matrix.sum())
st.markdown(
f"- **Black pixels (1):** {ones:,} ({100*ones/total_bits:.1f}%) \n"
f"- **White pixels (0):** {total_bits - ones:,} ({100*(total_bits-ones)/total_bits:.1f}%)"
)
# Download rendered image as PNG
buf = io.BytesIO()
pil_img.save(buf, format="PNG")
st.download_button(
"⬇️ Download as PNG",
data=buf.getvalue(),
file_name=f"binary_image_{img_width}x{img_height}.png",
mime="image/png",
key="download_preview_png"
)
# Also offer a high-res version
buf_hr = io.BytesIO()
pil_display.save(buf_hr, format="PNG")
st.download_button(
"⬇️ Download Scaled PNG (for viewing)",
data=buf_hr.getvalue(),
file_name=f"binary_image_{display_w}x{display_h}_scaled.png",
mime="image/png",
key="download_preview_png_scaled"
)
except Exception as e:
st.error(f"❌ Error processing file: {e}")
import traceback
st.code(traceback.format_exc())
else:
st.info("👆 Upload a binary data file (CSV or TXT) to render as an image.")
# --------------------------------------------------
# TAB 4: Data Analytics
# --------------------------------------------------
with tab4:
st.header("📊 Data Analytics")
st.markdown("""
Upload your sample data file (Excel or CSV) for a quick exploratory assessment.
The file should contain samples as rows and position columns with editing values.
This tab provides visualizations **before** any binary labelling.
""")
analytics_uploaded = st.file_uploader(
"📤 Upload data file",
type=["xlsx", "csv"],
key="analytics_uploader"
)
if analytics_uploaded is not None:
try:
# --- Load ---
if analytics_uploaded.name.endswith(".xlsx"):
adf = pd.read_excel(analytics_uploaded)
else:
adf = pd.read_csv(analytics_uploaded)
st.success(f"✅ Loaded file with {len(adf)} rows and {len(adf.columns)} columns")
adf.columns = [str(c).strip() for c in adf.columns]
# --- Detect position columns ---
non_pos_keywords = {"sample", "description", "descritpion", "total edited",
'volume per "1"', "volume per 1", "id", "name"}
position_cols = [c for c in adf.columns
if c.lower() not in non_pos_keywords
and pd.to_numeric(adf[c], errors="coerce").notna().any()]
def pos_sort_key(col_name: str):
m = re.search(r"(\d+)", col_name)
return int(m.group(1)) if m else 10**9
position_cols = sorted(position_cols, key=pos_sort_key)
if not position_cols:
st.error("No numeric position columns detected.")
st.stop()
st.info(f"Detected **{len(position_cols)}** position columns and **{len(adf)}** samples.")
# Convert position data to numeric
pos_data = adf[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
# Compute Total edited (sum across positions per sample)
if "Total edited" in adf.columns:
total_edited = pd.to_numeric(adf["Total edited"], errors="coerce").fillna(0.0)
else:
total_edited = pos_data.sum(axis=1)
# =====================================================
# Shared controls for raw data plots
# =====================================================
st.markdown("### 1️⃣ Raw Data Distribution")
st.caption("Visualize editing values across all positions and samples — before any binary labelling.")
transform_option = st.selectbox(
"Value transformation:",
["Raw (linear)", "log1p", "log1p → log1p", "log1p → pos. norm."],
index=0,
key="transform_select",
help=(
"**Raw** — No transformation.\n\n"
"**log1p** — `log(1 + x)`. Compresses high values, spreads low range.\n\n"
"**log1p → log1p** — Double log1p. Even stronger compression.\n\n"
"**log1p → pos. norm.** — log1p then robust per-position normalization "
"(median / IQR scaling per position column)."
)
)
# --- Apply transforms ---
def robust_pos_normalize_log1p(data: pd.DataFrame) -> pd.DataFrame:
"""log1p then robust per-position normalization (median + IQR)."""
logged = np.log1p(data)
result = logged.copy()
for col in result.columns:
med = result[col].median()
q75, q25 = result[col].quantile(0.75), result[col].quantile(0.25)
iqr = q75 - q25
if iqr > 0:
result[col] = (result[col] - med) / iqr
else:
result[col] = result[col] - med
return result
if transform_option == "log1p":
transformed = np.log1p(pos_data)
value_label = "Editing Value (log1p)"
transform_tag = "log1p"
elif transform_option == "log1p → log1p":
transformed = np.log1p(np.log1p(pos_data))
value_label = "Editing Value (log1p → log1p)"
transform_tag = "log1p_log1p"
elif transform_option == "log1p → pos. norm.":
transformed = robust_pos_normalize_log1p(pos_data)
value_label = "Editing Value (log1p → pos. norm.)"
transform_tag = "log1p_posnorm"
else:
transformed = pos_data
value_label = "Editing Value"
transform_tag = "raw"
# Melt data to long format: (sample, position_index, value)
melted = transformed.melt(var_name="Position", value_name="Value")
melted["Position_idx"] = melted["Position"].apply(
lambda x: int(re.search(r"(\d+)", str(x)).group(1)) if re.search(r"(\d+)", str(x)) else 0
)
# =====================================================
# PLOT 2: Histogram — all values
# =====================================================
st.markdown("#### 📊 Histogram — All Values")
n_bins = st.number_input("Number of bins:", min_value=10, max_value=300, value=80, step=10, key="hist_bins")
fig2, ax2 = plt.subplots(figsize=(10, 4))
ax2.hist(melted["Value"].values, bins=n_bins, color="#4F46E5", edgecolor="white", linewidth=0.3)
ax2.set_xlabel(value_label)
ax2.set_ylabel("Count")
ax2.set_title(f"Raw Values Distribution ({transform_tag})")
# Fine x-axis ticks adapted to transform range
val_min = melted["Value"].min()
val_max = melted["Value"].max()
val_range = val_max - val_min
if val_range <= 2:
tick_step = 0.1
elif val_range <= 6:
tick_step = 0.2
elif val_range <= 20:
tick_step = 1
else:
tick_step = 5
ax2.set_xticks(np.arange(np.floor(val_min / tick_step) * tick_step,
val_max + tick_step, tick_step))
ax2.tick_params(axis='x', labelsize=8, rotation=45)
ax2.grid(axis='y', alpha=0.3)
fig2.tight_layout()
st.pyplot(fig2)
# =====================================================
# PLOT 3: FACS-style density scatter
# =====================================================
st.markdown("#### 2️⃣ Density Scatter Plot (FACS-style)")
st.caption("Each dot = one measurement (sample × position). Color = local point density.")
x_vals = melted["Position_idx"].values.astype(float)
y_vals = melted["Value"].values.astype(float)
# Add small jitter to x for visual separation
x_jittered = x_vals + np.random.default_rng(42).uniform(-0.3, 0.3, size=len(x_vals))
# Compute density
with st.spinner("Computing point density..."):
try:
xy = np.vstack([x_jittered, y_vals])
density = gaussian_kde(xy)(xy)
except np.linalg.LinAlgError:
density = np.ones(len(x_vals))
# Sort by density so dense points render on top
sort_idx = density.argsort()
x_plot = x_jittered[sort_idx]
y_plot = y_vals[sort_idx]
d_plot = density[sort_idx]
fig3, ax3 = plt.subplots(figsize=(12, 6))
scatter = ax3.scatter(x_plot, y_plot, c=d_plot, cmap="jet", s=8, alpha=0.7, edgecolors="none")
cbar = fig3.colorbar(scatter, ax=ax3, label="Density")
ax3.set_xlabel("Position")
ax3.set_ylabel(value_label)
ax3.set_title(f"Density Scatter — Position vs. {value_label}")
ax3.set_xticks(sorted(melted["Position_idx"].unique()))
ax3.grid(alpha=0.2)
fig3.tight_layout()
st.pyplot(fig3)
# =====================================================
# PLOT 4: 2D Density Heatmap
# =====================================================
st.markdown("#### 3️⃣ 2D Density Heatmap")
st.caption("Binned heatmap of editing values by position — similar to a FACS density plot.")
y_bins = st.slider("Vertical bins:", min_value=20, max_value=150, value=60, key="heatmap_ybins")
positions_unique = sorted(melted["Position_idx"].unique())
n_positions = len(positions_unique)
fig4, ax4 = plt.subplots(figsize=(12, 6))
h = ax4.hist2d(
x_vals, y_vals,
bins=[n_positions, y_bins],
cmap="jet",
norm=mcolors.LogNorm() if melted["Value"].max() > 0 else None,
)
fig4.colorbar(h[3], ax=ax4, label="Count (log scale)")
ax4.set_xlabel("Position")
ax4.set_ylabel(value_label)
ax4.set_title(f"2D Density Heatmap — Position vs. {value_label}")
ax4.set_xticks(positions_unique)
ax4.grid(alpha=0.15)
fig4.tight_layout()
st.pyplot(fig4)
except Exception as e:
st.error(f"❌ Error processing file: {e}")
import traceback
st.code(traceback.format_exc())
else:
st.info("👆 Upload a data file (CSV or Excel) to start exploring.")
# --------------------------------------------------
# TAB 5: Pipetting Command Generator
# --------------------------------------------------
with tab5:
from math import ceil
st.header("🧪 Pipetting Command Generator for Eppendorf epMotion liquid handler")
st.markdown("""
Upload your sample file (Excel, CSV, or TXT) containing binary mutation data.
The app will:
- Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns
- Let you set the **Maximum volume per input well (µL)** used to compute `Volume per "1"`
- Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input)
- **Preview** the layout on a plate map (with tooltips)
- After confirmation, generate pipetting commands and a source volume summary
""")
uploaded_writing = st.file_uploader(
"📤 Upload data file",
type=["xlsx", "csv", "txt"],
key="writing_uploader"
)
max_per_well_ul = st.number_input(
"Maximum volume per source well (µL)",
min_value=10.0, max_value=2000.0, value=160.0, step=10.0
)
# ---------- Helpers (plate geometry, parsing, viz) ----------
ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"]
COLS_96 = list(range(1, 13))
def well_name(row_letter, col_number):
return f"{row_letter}{col_number}"
def enumerate_plate_wells():
for r in ROWS_96:
for c in COLS_96:
yield f"{r}{c}"
def parse_well_name(well: str):
m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip())
if not m:
return ("A", 0)
return (m.group(1).upper(), int(m.group(2)))
def sample_index_to_plate_and_well(sample_idx: int):
plate_num = ((sample_idx - 1) // 96) + 1
within_plate = (sample_idx - 1) % 96
row_idx = within_plate // 12
col_idx = within_plate % 12
return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx])
def build_global_wells_list(n_plates: int):
out = []
for p in range(1, n_plates + 1):
for w in enumerate_plate_wells():
out.append((p, w))
return out
def pick_tool(volume_ul: float) -> str:
return "TS_10" if volume_ul <= 10.0 else "TS_50"
PALETTE = [
"#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316",
"#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626"
]
def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count):
legend_spans = []
for i in range(1, inputs_count + 1):
color = PALETTE[(i-1) % len(PALETTE)]
legend_spans.append(
f"<span style='display:inline-block;margin-right:12px'>"
f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>"
f"Input {i}</span>"
)
legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>"
css = """
<style>
.plate { margin: 10px 0 24px 0; }
.plate-title { font-weight: 600; margin: 4px 0 8px 0; }
.grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; }
.cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; }
.head { font-weight:600; background:#F3F4F6; }
.cell[data-color] { color:#111; }
.cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; }
.cell:hover .tip { visibility:visible; opacity:0.95; }
</style>
"""
body = [css, legend_html]
for p in range(1, plates_used + 1):
body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>")
body.append("<div class='grid'>")
body.append("<div class='cell head'></div>")
for c in COLS_96:
body.append(f"<div class='cell head'>{c}</div>")
for r in ROWS_96:
body.append(f"<div class='cell head'>{r}</div>")
for c in COLS_96:
well = f"{r}{c}"
key = (p, well)
if key in well_to_input:
input_idx, within_idx = well_to_input[key]
color = PALETTE[(input_idx-1) % len(PALETTE)]
tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}"
cell_html = (
f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>"
f"<span class='tip'>{tip}</span>"
"</div>"
)
else:
cell_html = "<div class='cell'></div>"
body.append(cell_html)
body.append("</div></div>")
return "".join(body)
# ---------- Main flow ----------
if uploaded_writing is not None:
try:
if uploaded_writing.name.endswith(".xlsx"):
df = pd.read_excel(uploaded_writing)
elif uploaded_writing.name.endswith(".csv"):
df = pd.read_csv(uploaded_writing)
else:
try:
df = pd.read_csv(uploaded_writing, sep="\t")
except Exception:
df = pd.read_csv(uploaded_writing)
st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns")
df.columns = [str(c).strip() for c in df.columns]
if not any(c.lower() == "sample" for c in df.columns):
df.insert(0, "Sample", np.arange(1, len(df) + 1))
st.info("`Sample` column missing — automatically generated 1..N.")
position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)]
if not position_cols:
non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"}
candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols]
position_cols = candidate_cols
st.info(f"Position columns inferred automatically: {len(position_cols)} detected.")
def pos_key(col_name: str):
m = re.search(r"(\d+)", col_name)
return int(m.group(1)) if m else 10**9
position_cols = sorted(position_cols, key=pos_key)
df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int)
if "Total edited" not in df.columns:
df["Total edited"] = df[position_cols].sum(axis=1).astype(int)
st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.")
st.markdown("#### ⚙️ Volume Calculation Settings")
default_total_vol = st.number_input(
"Maximum volume per input well (µL)",
min_value=1.0, max_value=10000.0, value=64.0, step=1.0,
help="Used to compute Volume per '1' as (Maximum volume per input well / Total edited) when not provided."
)
vol_candidates = [c for c in df.columns if "volume per" in c.lower()]
if not vol_candidates:
df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan)
df['Volume per "1"'] = df['Volume per "1"'].fillna(0)
st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL (max per input well) / Total edited.')
volume_col = 'Volume per "1"'
else:
volume_col = vol_candidates[0]
if df[volume_col].max() > max_per_well_ul:
st.error(
f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). "
"Increase the cap or reduce per-transfer volume."
)
st.stop()
vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0)
total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols]
wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input]
num_inputs = len(position_cols)
max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0
st.markdown("### 👀 Preview: Suggested Uniform Layout")
if max_wells_per_source == 0:
st.info("No edits detected — nothing to allocate.")
st.stop()
st.write(
f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** "
f"(cap {max_per_well_ul:.0f} µL/well)."
)
total_wells_needed_uniform = num_inputs * max_wells_per_source
plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1
global_wells = sorted(
build_global_wells_list(plates_needed),
key=lambda x: (
x[0],
ROWS_96.index(parse_well_name(x[1])[0]),
parse_well_name(x[1])[1]
)
)
global_wells = global_wells[:total_wells_needed_uniform]
assigned_wells_map, well_to_input, preview_rows = {}, {}, []
for i in range(1, num_inputs + 1):
start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source
block = global_wells[start:end]
assigned_wells_map[i] = block
for j, (p, w) in enumerate(block, start=1):
well_to_input[(p, w)] = (i, j)
block_str = ", ".join([f"P{p}:{w}" for (p, w) in block])
preview_rows.append({
"Input (Position #)": i,
"Total demand (µL)": round(total_volume_per_input[i-1], 2),
"Wells needed (actual)": wells_needed_per_input[i-1],
"Allocated (uniform)": max_wells_per_source,
"Assigned wells": block_str
})
preview_df = pd.DataFrame(preview_rows)
st.dataframe(preview_df, width="stretch", height=300)
st.markdown("#### Plate Map (hover cells for details)")
plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs)
st.markdown(plate_html, unsafe_allow_html=True)
st.markdown("### ✅ Generate Pipetting Commands")
if st.button("Generate using this layout"):
per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)}
commands, source_volume_totals = [], {}
for _, row in df.iterrows():
sample_id = int(row["Sample"])
vol_per_one = float(row[volume_col])
if vol_per_one <= 0:
continue
dest_plate, dest_well = sample_index_to_plate_and_well(sample_id)
tool = pick_tool(vol_per_one)
for pos_idx, col in enumerate(position_cols, start=1):
if int(row[col]) != 1:
continue
wells_for_input = assigned_wells_map[pos_idx]
cum_list = per_input_well_cum[pos_idx]
chosen = None
for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)):
if current_vol + vol_per_one <= max_per_well_ul:
chosen = (j, src_plate, src_well)
break
if chosen is None:
st.error(
f"Allocation exhausted for Input {pos_idx} while creating commands. "
"Increase the max volume per well or review per-transfer volume."
)
st.stop()
j, src_plate, src_well = chosen
cum_list[j] += vol_per_one
per_input_well_cum[pos_idx] = cum_list
source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one
commands.append({
"Input #": pos_idx,
"Source plate": src_plate,
"Source well": src_well,
"Destination plate": dest_plate,
"Destination well": dest_well,
"Volume": round(vol_per_one, 2),
"Tool": tool
})
commands_df = pd.DataFrame(commands)
def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0])
def col_num_from_well(w): return parse_well_name(w)[1]
commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well)
commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well)
commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well)
commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well)
commands_df = commands_df.sort_values(
by=["Input #", "Source plate", "Src_row_idx", "Src_col_num",
"Destination plate", "Dst_row_idx", "Dst_col_num"],
kind="stable"
)
commands_df = commands_df[[
"Input #", "Source plate", "Source well",
"Destination plate", "Destination well", "Volume", "Tool"
]]
st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.")
summary_rows = []
for i in range(1, num_inputs + 1):
for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]):
total = source_volume_totals.get((p, w), 0.0)
summary_rows.append({
"Source": i, "Source plate": p, "Source well": w,
"Total volume taken (µL)": round(total, 2),
"Allocated capacity (µL)": round(max_per_well_ul, 2)
})
summary_df = pd.DataFrame(summary_rows)
summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well)
summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well)
summary_df = summary_df.sort_values(
by=["Source", "Source plate", "Src_row_idx", "Src_col_num"],
kind="stable"
)[
["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"]
]
st.markdown("### 💧 Pipetting Commands")
st.dataframe(commands_df, width="stretch", height=400)
st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv")
st.markdown("### 📊 Source Volume Summary")
st.dataframe(summary_df, width="stretch", height=400)
st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv")
except Exception as e:
st.error(f"❌ Error processing file: {e}")
else:
st.info("👆 Upload an Excel/CSV/TXT file to start.")