Spaces:
Running
Running
| import streamlit as st | |
| import pandas as pd | |
| import io | |
| import re | |
| import numpy as np | |
| import openpyxl | |
| import base64 | |
| import matplotlib.pyplot as plt | |
| import matplotlib.colors as mcolors | |
| from scipy.stats import gaussian_kde | |
| from PIL import Image | |
| # ========================= | |
| # Streamlit App Setup | |
| # ========================= | |
| st.set_page_config(page_title="DNA ↔ Binary Converter", layout="wide") | |
| st.title("DNA ↔ Binary Converter") | |
| # ========================= | |
| # Encoding Schemes | |
| # ========================= | |
| ENCODING_OPTIONS = ["Voyager 6-bit", "Base64 (6-bit)", "ASCII (7-bit)", "UTF-8 (8-bit)"] | |
| BITS_PER_UNIT = { | |
| "Voyager 6-bit": 6, | |
| "Base64 (6-bit)": 6, | |
| "ASCII (7-bit)": 7, | |
| "UTF-8 (8-bit)": 8, | |
| } | |
| # ========================= | |
| # Voyager ASCII 6-bit Table | |
| # ========================= | |
| voyager_table = { | |
| i: ch for i, ch in enumerate([ | |
| ' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', | |
| 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', | |
| 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', | |
| '3', '4', '5', '6', '7', '8', '9', '.', ',', '(', | |
| ')','+', '-', '*', '/', '=', '$', '!', ':', '%', | |
| '"', '#', '@', "'", '?', '&' | |
| ]) | |
| } | |
| reverse_voyager_table = {v: k for k, v in voyager_table.items()} | |
| B64_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" | |
| # ========================= | |
| # Encoding Functions | |
| # ========================= | |
| def encode_to_binary(text: str, scheme: str) -> tuple[list[int], list[str]]: | |
| """ | |
| Returns (flat_bits, display_units). | |
| display_units is a list of labels for each chunk (character, byte, or Base64 symbol). | |
| """ | |
| if scheme == "Voyager 6-bit": | |
| bits = [] | |
| for char in text: | |
| val = reverse_voyager_table.get(char.upper(), 0) | |
| bits.extend([(val >> b) & 1 for b in range(5, -1, -1)]) | |
| return bits, list(text.upper()) | |
| elif scheme == "ASCII (7-bit)": | |
| bits = [] | |
| for c in text: | |
| val = ord(c) & 0x7F | |
| bits.extend([(val >> b) & 1 for b in range(6, -1, -1)]) | |
| return bits, list(text) | |
| elif scheme == "UTF-8 (8-bit)": | |
| raw = text.encode("utf-8") | |
| bits = [] | |
| for byte in raw: | |
| bits.extend([(byte >> b) & 1 for b in range(7, -1, -1)]) | |
| # For display: show hex byte value and the character it belongs to | |
| labels = [f"0x{b:02X}" for b in raw] | |
| return bits, labels | |
| elif scheme == "Base64 (6-bit)": | |
| b64_str = base64.b64encode(text.encode("utf-8")).decode("ascii") | |
| bits = [] | |
| clean = b64_str.rstrip("=") | |
| for c in clean: | |
| val = B64_ALPHABET.index(c) | |
| bits.extend([(val >> b) & 1 for b in range(5, -1, -1)]) | |
| return bits, list(clean) | |
| return [], [] | |
| # ========================= | |
| # Decoding Functions | |
| # ========================= | |
| def decode_from_binary(bits: list[int], scheme: str) -> str: | |
| if scheme == "Voyager 6-bit": | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i + 6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(voyager_table.get(val, '?')) | |
| return ''.join(chars) | |
| elif scheme == "ASCII (7-bit)": | |
| chars = [] | |
| for i in range(0, len(bits), 7): | |
| chunk = bits[i:i + 7] | |
| if len(chunk) < 7: | |
| chunk += [0] * (7 - len(chunk)) | |
| val = sum(b << (6 - j) for j, b in enumerate(chunk)) | |
| chars.append(chr(val) if 32 <= val < 127 else '?') | |
| return ''.join(chars) | |
| elif scheme == "UTF-8 (8-bit)": | |
| byte_list = [] | |
| for i in range(0, len(bits), 8): | |
| chunk = bits[i:i + 8] | |
| if len(chunk) < 8: | |
| chunk += [0] * (8 - len(chunk)) | |
| val = sum(b << (7 - j) for j, b in enumerate(chunk)) | |
| byte_list.append(val) | |
| return bytes(byte_list).decode("utf-8", errors="replace") | |
| elif scheme == "Base64 (6-bit)": | |
| chars = [] | |
| for i in range(0, len(bits), 6): | |
| chunk = bits[i:i + 6] | |
| if len(chunk) < 6: | |
| chunk += [0] * (6 - len(chunk)) | |
| val = sum(b << (5 - j) for j, b in enumerate(chunk)) | |
| chars.append(B64_ALPHABET[val]) | |
| b64_str = ''.join(chars) | |
| # Add Base64 padding | |
| while len(b64_str) % 4 != 0: | |
| b64_str += '=' | |
| try: | |
| return base64.b64decode(b64_str).decode("utf-8", errors="replace") | |
| except Exception: | |
| return "[Base64 decode error]" | |
| return "" | |
| # ========================= | |
| # Tabs | |
| # ========================= | |
| tab1, tab2, tab3, tab4, tab5 = st.tabs(["Encoding", "Decoding", "Image Preview", "Data Analytics", "Writing"]) | |
| # -------------------------------------------------- | |
| # TAB 1: Text/Image → Binary | |
| # -------------------------------------------------- | |
| with tab1: | |
| st.markdown(""" | |
| Convert text or an image into binary labels. | |
| Choose an input mode, encoding scheme, and control grouping. | |
| """) | |
| input_mode = st.selectbox("Input mode:", ["Text", "Image"], key="input_mode") | |
| if input_mode == "Text": | |
| st.subheader("Step 1 – Choose Encoding & Input Text") | |
| encoding_scheme = st.selectbox( | |
| "Encoding scheme:", | |
| ENCODING_OPTIONS, | |
| index=0, | |
| key="enc_scheme", | |
| help=( | |
| "**Voyager 6-bit** – Custom 56-character table (A-Z, 0-9, punctuation). 6 bits/char.\n\n" | |
| "**Base64 (6-bit)** – Standard Base64 encoding of UTF-8 bytes. 6 bits/symbol.\n\n" | |
| "**ASCII (7-bit)** – Standard 7-bit ASCII. 7 bits/char.\n\n" | |
| "**UTF-8 (8-bit)** – Full UTF-8 byte encoding. 8 bits/byte. Supports all Unicode." | |
| ) | |
| ) | |
| bits_per = BITS_PER_UNIT[encoding_scheme] | |
| if encoding_scheme == "Voyager 6-bit": | |
| supported = ''.join(voyager_table[i] for i in range(len(voyager_table))) | |
| st.caption(f"Supported characters ({len(voyager_table)}): `{supported}`") | |
| user_input = st.text_input("Enter your text:", value="DNA", key="input_text") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| group_size = st.slider("Select number of target positions:", min_value=12, max_value=128, value=25) | |
| with col2: | |
| custom_cols = st.number_input("Or enter custom number:", min_value=1, max_value=512, value=group_size) | |
| if custom_cols != group_size: | |
| group_size = custom_cols | |
| if user_input: | |
| binary_labels, display_units = encode_to_binary(user_input, encoding_scheme) | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| unit_label = "Byte" if encoding_scheme == "UTF-8 (8-bit)" else "Character" | |
| st.markdown(f"### Output 1 – Binary Labels per {unit_label}") | |
| st.caption(f"Encoding: **{encoding_scheme}** — {bits_per} bits per {unit_label.lower()}") | |
| grouped_bits = [binary_labels[i:i + bits_per] for i in range(0, len(binary_labels), bits_per)] | |
| scroll_html = ( | |
| "<div style='max-height:300px; overflow-y:auto; font-family:monospace; " | |
| "padding:6px; border:1px solid #ccc;'>" | |
| ) | |
| for i, bits in enumerate(grouped_bits): | |
| label = display_units[i] if i < len(display_units) else "?" | |
| scroll_html += f"<div>'{label}' → {bits}</div>" | |
| scroll_html += "</div>" | |
| st.markdown(scroll_html, unsafe_allow_html=True) | |
| per_char_lines = [] | |
| for i, bits in enumerate(grouped_bits): | |
| label = display_units[i] if i < len(display_units) else "?" | |
| per_char_lines.append(f"'{label}' → {''.join(map(str, bits))}") | |
| st.download_button( | |
| f"⬇️ Download Binary per {unit_label} (.txt)", | |
| data='\n'.join(per_char_lines), | |
| file_name="binary_per_unit.txt", | |
| mime="text/plain", | |
| key="download_per_unit" | |
| ) | |
| st.download_button( | |
| "⬇️ Download Concatenated Binary String", | |
| data=binary_concat, | |
| file_name="binary_full.txt", | |
| mime="text/plain", | |
| key="download_binary_txt" | |
| ) | |
| st.markdown("### Output 2 – Binary matrix split into reactions grouped by target position") | |
| groups = [] | |
| for i in range(0, len(binary_labels), group_size): | |
| group = binary_labels[i:i + group_size] | |
| if len(group) < group_size: | |
| group += [0] * (group_size - len(group)) | |
| groups.append(group) | |
| columns = [f"Position {i+1}" for i in range(group_size)] | |
| df = pd.DataFrame(groups, columns=columns) | |
| df.insert(0, "Sample", range(1, len(df) + 1)) | |
| st.dataframe(df, width="stretch") | |
| st.download_button( | |
| "⬇️ Download as CSV", | |
| df.to_csv(index=False), | |
| file_name=f"binary_labels_{group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_binary_csv" | |
| ) | |
| else: | |
| st.info("👆 Enter text above to see binary labels.") | |
| # ===================================================== | |
| # IMAGE INPUT MODE | |
| # ===================================================== | |
| else: | |
| st.subheader("Step 1 – Upload Image & Set Resolution") | |
| uploaded_img = st.file_uploader( | |
| "Upload an image (PNG, JPG, BMP, etc.):", | |
| type=["png", "jpg", "jpeg", "bmp", "gif", "tiff", "webp"], | |
| key="img_uploader" | |
| ) | |
| if uploaded_img is not None: | |
| img = Image.open(uploaded_img).convert("L") # grayscale | |
| orig_w, orig_h = img.size | |
| aspect = orig_h / orig_w | |
| st.image(img, caption=f"Original (grayscale) — {orig_w}×{orig_h} px", use_container_width=True) | |
| st.markdown("#### ⚙️ Resolution & Threshold") | |
| target_width = st.slider( | |
| "Output width (pixels):", | |
| min_value=8, max_value=min(orig_w, 256), value=min(64, orig_w), step=1, | |
| help="Height is auto-calculated from aspect ratio. Each pixel = 1 bit." | |
| ) | |
| target_height = max(1, int(round(target_width * aspect))) | |
| total_bits = target_width * target_height | |
| st.caption(f"Output size: **{target_width} × {target_height}** = **{total_bits:,}** bits (pixels)") | |
| threshold = st.slider( | |
| "Black/white threshold:", | |
| min_value=0, max_value=255, value=128, | |
| help="Pixels darker than this → 1 (black). Brighter → 0 (white)." | |
| ) | |
| # Resize & threshold | |
| img_resized = img.resize((target_width, target_height), Image.LANCZOS) | |
| img_array = np.array(img_resized) | |
| binary_matrix = (img_array < threshold).astype(int) # dark = 1, light = 0 | |
| # Show preview | |
| st.markdown("### Preview — Black & White Output") | |
| col_prev1, col_prev2 = st.columns(2) | |
| with col_prev1: | |
| st.image(img_resized, caption=f"Resized grayscale ({target_width}×{target_height})", use_container_width=True) | |
| with col_prev2: | |
| bw_display = Image.fromarray(((1 - binary_matrix) * 255).astype(np.uint8)) | |
| st.image(bw_display, caption=f"Binary B&W ({target_width}×{target_height})", use_container_width=True) | |
| # Flatten to binary labels | |
| binary_labels = binary_matrix.flatten().tolist() | |
| binary_concat = ''.join(map(str, binary_labels)) | |
| st.markdown("### Output 1 – Image Info") | |
| st.markdown( | |
| f"- **Dimensions:** {target_width} × {target_height} \n" | |
| f"- **Total bits:** {total_bits:,} \n" | |
| f"- **Black pixels (1s):** {sum(binary_labels):,} \n" | |
| f"- **White pixels (0s):** {total_bits - sum(binary_labels):,}" | |
| ) | |
| st.download_button( | |
| "⬇️ Download Concatenated Binary String", | |
| data=binary_concat, | |
| file_name="image_binary_full.txt", | |
| mime="text/plain", | |
| key="download_img_binary_txt" | |
| ) | |
| # Output as matrix with width = target_width | |
| st.markdown("### Output 2 – Binary Matrix (rows = pixel rows)") | |
| columns = [f"Position {i+1}" for i in range(target_width)] | |
| df_img = pd.DataFrame(binary_matrix, columns=columns) | |
| df_img.insert(0, "Sample", range(1, len(df_img) + 1)) | |
| st.dataframe(df_img, width="stretch") | |
| st.download_button( | |
| "⬇️ Download as CSV", | |
| df_img.to_csv(index=False), | |
| file_name=f"image_binary_{target_width}x{target_height}.csv", | |
| mime="text/csv", | |
| key="download_img_csv" | |
| ) | |
| # Also offer custom grouping (same as text mode) | |
| st.markdown("### Output 3 – Custom Grouped Matrix") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| img_group_size = st.slider( | |
| "Select number of target positions:", | |
| min_value=12, max_value=128, value=target_width, key="img_group_slider" | |
| ) | |
| with col2: | |
| img_custom_cols = st.number_input( | |
| "Or enter custom number:", | |
| min_value=1, max_value=512, value=img_group_size, key="img_custom_cols" | |
| ) | |
| if img_custom_cols != img_group_size: | |
| img_group_size = img_custom_cols | |
| groups = [] | |
| for i in range(0, len(binary_labels), img_group_size): | |
| group = binary_labels[i:i + img_group_size] | |
| if len(group) < img_group_size: | |
| group += [0] * (img_group_size - len(group)) | |
| groups.append(group) | |
| columns_g = [f"Position {i+1}" for i in range(img_group_size)] | |
| df_grouped = pd.DataFrame(groups, columns=columns_g) | |
| df_grouped.insert(0, "Sample", range(1, len(df_grouped) + 1)) | |
| st.dataframe(df_grouped, width="stretch") | |
| st.download_button( | |
| "⬇️ Download Grouped CSV", | |
| df_grouped.to_csv(index=False), | |
| file_name=f"image_binary_grouped_{img_group_size}_positions.csv", | |
| mime="text/csv", | |
| key="download_img_grouped_csv" | |
| ) | |
| else: | |
| st.info("👆 Upload an image to encode it as binary.") | |
| # -------------------------------------------------- | |
| # TAB 2: Binary → Text | |
| # -------------------------------------------------- | |
| with tab2: | |
| st.markdown(""" | |
| Convert binary data back into readable text. | |
| Upload either: | |
| - `.csv` file with 0/1 values (any number of columns/rows) | |
| - `.xlsx` Excel file | |
| - `.txt` file containing a concatenated binary string (e.g. `010101...`) | |
| """) | |
| decode_scheme = st.selectbox( | |
| "Decoding scheme (must match the encoding used):", | |
| ENCODING_OPTIONS, | |
| index=0, | |
| key="dec_scheme", | |
| help="Select the same encoding scheme that was used to produce the binary data." | |
| ) | |
| uploaded_decode = st.file_uploader( | |
| "Upload your file (.csv, .xlsx, or .txt):", | |
| type=["csv", "xlsx", "txt"], | |
| key="decode_uploader" | |
| ) | |
| if uploaded_decode is not None: | |
| try: | |
| if uploaded_decode.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded_decode) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded_decode.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded_decode) | |
| bits = df.values.flatten().astype(int).tolist() | |
| elif uploaded_decode.name.endswith(".txt"): | |
| content = uploaded_decode.read().decode().strip() | |
| bits = [int(b) for b in content if b in ['0', '1']] | |
| else: | |
| bits = [] | |
| if not bits: | |
| st.warning("No binary data detected.") | |
| else: | |
| recovered_text = decode_from_binary(bits, decode_scheme) | |
| st.success(f"✅ Conversion complete using **{decode_scheme}**!") | |
| st.markdown("**Recovered text:**") | |
| st.text_area("Output", recovered_text, height=150) | |
| st.download_button( | |
| "⬇️ Download Recovered Text (.txt)", | |
| data=recovered_text, | |
| file_name="recovered_text.txt", | |
| mime="text/plain", | |
| key="download_recovered" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error reading or converting file: {e}") | |
| else: | |
| st.info("👆 Upload a file to start the reverse conversion.") | |
| # -------------------------------------------------- | |
| # TAB 3: Image Preview | |
| # -------------------------------------------------- | |
| with tab3: | |
| st.header("🖼️ Image Preview") | |
| st.markdown(""" | |
| Render binary data (0/1) as a **black & white image**. | |
| Upload a binary matrix CSV (rows × positions) or a concatenated binary `.txt` string. | |
| """) | |
| img_preview_file = st.file_uploader( | |
| "📤 Upload binary data file (.csv, .xlsx, or .txt):", | |
| type=["csv", "xlsx", "txt"], | |
| key="img_preview_uploader" | |
| ) | |
| if img_preview_file is not None: | |
| try: | |
| # --- Load binary data --- | |
| if img_preview_file.name.endswith(".csv"): | |
| idf = pd.read_csv(img_preview_file) | |
| # Drop Sample column if present | |
| if "Sample" in idf.columns or "sample" in idf.columns: | |
| idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"]) | |
| bits_matrix = idf.values.flatten().astype(int) | |
| detected_width = len(idf.columns) | |
| elif img_preview_file.name.endswith(".xlsx"): | |
| idf = pd.read_excel(img_preview_file) | |
| if "Sample" in idf.columns or "sample" in idf.columns: | |
| idf = idf.drop(columns=[c for c in idf.columns if c.lower() == "sample"]) | |
| bits_matrix = idf.values.flatten().astype(int) | |
| detected_width = len(idf.columns) | |
| elif img_preview_file.name.endswith(".txt"): | |
| content = img_preview_file.read().decode().strip() | |
| bits_matrix = np.array([int(b) for b in content if b in ['0', '1']]) | |
| detected_width = None | |
| else: | |
| bits_matrix = np.array([]) | |
| detected_width = None | |
| if len(bits_matrix) == 0: | |
| st.warning("No binary data detected.") | |
| else: | |
| total_bits = len(bits_matrix) | |
| st.success(f"✅ Loaded **{total_bits:,}** bits.") | |
| # --- Width control --- | |
| st.markdown("#### ⚙️ Image Dimensions") | |
| if detected_width and detected_width > 1: | |
| default_w = detected_width | |
| st.caption(f"Auto-detected width from columns: **{detected_width}**") | |
| else: | |
| # Guess a square-ish default | |
| default_w = max(1, int(np.sqrt(total_bits))) | |
| img_width = st.number_input( | |
| "Image width (pixels / positions per row):", | |
| min_value=1, max_value=total_bits, value=default_w, step=1, | |
| key="img_preview_width" | |
| ) | |
| img_height = int(np.ceil(total_bits / img_width)) | |
| st.caption(f"Image size: **{img_width} × {img_height}** = **{img_width * img_height:,}** pixels " | |
| f"({total_bits:,} bits, {img_width * img_height - total_bits} padded)") | |
| # Pad to fill the last row | |
| padded = np.zeros(img_width * img_height, dtype=int) | |
| padded[:total_bits] = bits_matrix[:total_bits] | |
| img_data = padded.reshape((img_height, img_width)) | |
| # Render: 1 = black (0), 0 = white (255) | |
| img_render = ((1 - img_data) * 255).astype(np.uint8) | |
| pil_img = Image.fromarray(img_render, mode="L") | |
| st.markdown("### 🖼️ Rendered Image") | |
| # Use nearest-neighbor scaling for crisp pixels | |
| display_scale = max(1, 256 // img_width) | |
| display_w = img_width * display_scale | |
| display_h = img_height * display_scale | |
| pil_display = pil_img.resize((display_w, display_h), Image.NEAREST) | |
| st.image(pil_display, caption=f"Binary image — {img_width}×{img_height} (1=black, 0=white)") | |
| # Stats | |
| ones = int(bits_matrix.sum()) | |
| st.markdown( | |
| f"- **Black pixels (1):** {ones:,} ({100*ones/total_bits:.1f}%) \n" | |
| f"- **White pixels (0):** {total_bits - ones:,} ({100*(total_bits-ones)/total_bits:.1f}%)" | |
| ) | |
| # Download rendered image as PNG | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="PNG") | |
| st.download_button( | |
| "⬇️ Download as PNG", | |
| data=buf.getvalue(), | |
| file_name=f"binary_image_{img_width}x{img_height}.png", | |
| mime="image/png", | |
| key="download_preview_png" | |
| ) | |
| # Also offer a high-res version | |
| buf_hr = io.BytesIO() | |
| pil_display.save(buf_hr, format="PNG") | |
| st.download_button( | |
| "⬇️ Download Scaled PNG (for viewing)", | |
| data=buf_hr.getvalue(), | |
| file_name=f"binary_image_{display_w}x{display_h}_scaled.png", | |
| mime="image/png", | |
| key="download_preview_png_scaled" | |
| ) | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |
| else: | |
| st.info("👆 Upload a binary data file (CSV or TXT) to render as an image.") | |
| # -------------------------------------------------- | |
| # TAB 4: Data Analytics | |
| # -------------------------------------------------- | |
| with tab4: | |
| st.header("📊 Data Analytics") | |
| st.markdown(""" | |
| Upload your sample data file (Excel or CSV) for a quick exploratory assessment. | |
| The file should contain samples as rows and position columns with editing values. | |
| This tab provides visualizations **before** any binary labelling. | |
| """) | |
| analytics_uploaded = st.file_uploader( | |
| "📤 Upload data file", | |
| type=["xlsx", "csv"], | |
| key="analytics_uploader" | |
| ) | |
| if analytics_uploaded is not None: | |
| try: | |
| # --- Load --- | |
| if analytics_uploaded.name.endswith(".xlsx"): | |
| adf = pd.read_excel(analytics_uploaded) | |
| else: | |
| adf = pd.read_csv(analytics_uploaded) | |
| st.success(f"✅ Loaded file with {len(adf)} rows and {len(adf.columns)} columns") | |
| adf.columns = [str(c).strip() for c in adf.columns] | |
| # --- Detect position columns --- | |
| non_pos_keywords = {"sample", "description", "descritpion", "total edited", | |
| 'volume per "1"', "volume per 1", "id", "name"} | |
| position_cols = [c for c in adf.columns | |
| if c.lower() not in non_pos_keywords | |
| and pd.to_numeric(adf[c], errors="coerce").notna().any()] | |
| def pos_sort_key(col_name: str): | |
| m = re.search(r"(\d+)", col_name) | |
| return int(m.group(1)) if m else 10**9 | |
| position_cols = sorted(position_cols, key=pos_sort_key) | |
| if not position_cols: | |
| st.error("No numeric position columns detected.") | |
| st.stop() | |
| st.info(f"Detected **{len(position_cols)}** position columns and **{len(adf)}** samples.") | |
| # Convert position data to numeric | |
| pos_data = adf[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0) | |
| # Compute Total edited (sum across positions per sample) | |
| if "Total edited" in adf.columns: | |
| total_edited = pd.to_numeric(adf["Total edited"], errors="coerce").fillna(0.0) | |
| else: | |
| total_edited = pos_data.sum(axis=1) | |
| # ===================================================== | |
| # Shared controls for raw data plots | |
| # ===================================================== | |
| st.markdown("### 1️⃣ Raw Data Distribution") | |
| st.caption("Visualize editing values across all positions and samples — before any binary labelling.") | |
| transform_option = st.selectbox( | |
| "Value transformation:", | |
| ["Raw (linear)", "log1p", "log1p → log1p", "log1p → pos. norm."], | |
| index=0, | |
| key="transform_select", | |
| help=( | |
| "**Raw** — No transformation.\n\n" | |
| "**log1p** — `log(1 + x)`. Compresses high values, spreads low range.\n\n" | |
| "**log1p → log1p** — Double log1p. Even stronger compression.\n\n" | |
| "**log1p → pos. norm.** — log1p then robust per-position normalization " | |
| "(median / IQR scaling per position column)." | |
| ) | |
| ) | |
| # --- Apply transforms --- | |
| def robust_pos_normalize_log1p(data: pd.DataFrame) -> pd.DataFrame: | |
| """log1p then robust per-position normalization (median + IQR).""" | |
| logged = np.log1p(data) | |
| result = logged.copy() | |
| for col in result.columns: | |
| med = result[col].median() | |
| q75, q25 = result[col].quantile(0.75), result[col].quantile(0.25) | |
| iqr = q75 - q25 | |
| if iqr > 0: | |
| result[col] = (result[col] - med) / iqr | |
| else: | |
| result[col] = result[col] - med | |
| return result | |
| if transform_option == "log1p": | |
| transformed = np.log1p(pos_data) | |
| value_label = "Editing Value (log1p)" | |
| transform_tag = "log1p" | |
| elif transform_option == "log1p → log1p": | |
| transformed = np.log1p(np.log1p(pos_data)) | |
| value_label = "Editing Value (log1p → log1p)" | |
| transform_tag = "log1p_log1p" | |
| elif transform_option == "log1p → pos. norm.": | |
| transformed = robust_pos_normalize_log1p(pos_data) | |
| value_label = "Editing Value (log1p → pos. norm.)" | |
| transform_tag = "log1p_posnorm" | |
| else: | |
| transformed = pos_data | |
| value_label = "Editing Value" | |
| transform_tag = "raw" | |
| # Melt data to long format: (sample, position_index, value) | |
| melted = transformed.melt(var_name="Position", value_name="Value") | |
| melted["Position_idx"] = melted["Position"].apply( | |
| lambda x: int(re.search(r"(\d+)", str(x)).group(1)) if re.search(r"(\d+)", str(x)) else 0 | |
| ) | |
| # ===================================================== | |
| # PLOT 2: Histogram — all values | |
| # ===================================================== | |
| st.markdown("#### 📊 Histogram — All Values") | |
| n_bins = st.number_input("Number of bins:", min_value=10, max_value=300, value=80, step=10, key="hist_bins") | |
| fig2, ax2 = plt.subplots(figsize=(10, 4)) | |
| ax2.hist(melted["Value"].values, bins=n_bins, color="#4F46E5", edgecolor="white", linewidth=0.3) | |
| ax2.set_xlabel(value_label) | |
| ax2.set_ylabel("Count") | |
| ax2.set_title(f"Raw Values Distribution ({transform_tag})") | |
| # Fine x-axis ticks adapted to transform range | |
| val_min = melted["Value"].min() | |
| val_max = melted["Value"].max() | |
| val_range = val_max - val_min | |
| if val_range <= 2: | |
| tick_step = 0.1 | |
| elif val_range <= 6: | |
| tick_step = 0.2 | |
| elif val_range <= 20: | |
| tick_step = 1 | |
| else: | |
| tick_step = 5 | |
| ax2.set_xticks(np.arange(np.floor(val_min / tick_step) * tick_step, | |
| val_max + tick_step, tick_step)) | |
| ax2.tick_params(axis='x', labelsize=8, rotation=45) | |
| ax2.grid(axis='y', alpha=0.3) | |
| fig2.tight_layout() | |
| st.pyplot(fig2) | |
| # ===================================================== | |
| # PLOT 3: FACS-style density scatter | |
| # ===================================================== | |
| st.markdown("#### 2️⃣ Density Scatter Plot (FACS-style)") | |
| st.caption("Each dot = one measurement (sample × position). Color = local point density.") | |
| x_vals = melted["Position_idx"].values.astype(float) | |
| y_vals = melted["Value"].values.astype(float) | |
| # Add small jitter to x for visual separation | |
| x_jittered = x_vals + np.random.default_rng(42).uniform(-0.3, 0.3, size=len(x_vals)) | |
| # Compute density | |
| with st.spinner("Computing point density..."): | |
| try: | |
| xy = np.vstack([x_jittered, y_vals]) | |
| density = gaussian_kde(xy)(xy) | |
| except np.linalg.LinAlgError: | |
| density = np.ones(len(x_vals)) | |
| # Sort by density so dense points render on top | |
| sort_idx = density.argsort() | |
| x_plot = x_jittered[sort_idx] | |
| y_plot = y_vals[sort_idx] | |
| d_plot = density[sort_idx] | |
| fig3, ax3 = plt.subplots(figsize=(12, 6)) | |
| scatter = ax3.scatter(x_plot, y_plot, c=d_plot, cmap="jet", s=8, alpha=0.7, edgecolors="none") | |
| cbar = fig3.colorbar(scatter, ax=ax3, label="Density") | |
| ax3.set_xlabel("Position") | |
| ax3.set_ylabel(value_label) | |
| ax3.set_title(f"Density Scatter — Position vs. {value_label}") | |
| ax3.set_xticks(sorted(melted["Position_idx"].unique())) | |
| ax3.grid(alpha=0.2) | |
| fig3.tight_layout() | |
| st.pyplot(fig3) | |
| # ===================================================== | |
| # PLOT 4: 2D Density Heatmap | |
| # ===================================================== | |
| st.markdown("#### 3️⃣ 2D Density Heatmap") | |
| st.caption("Binned heatmap of editing values by position — similar to a FACS density plot.") | |
| y_bins = st.slider("Vertical bins:", min_value=20, max_value=150, value=60, key="heatmap_ybins") | |
| positions_unique = sorted(melted["Position_idx"].unique()) | |
| n_positions = len(positions_unique) | |
| fig4, ax4 = plt.subplots(figsize=(12, 6)) | |
| h = ax4.hist2d( | |
| x_vals, y_vals, | |
| bins=[n_positions, y_bins], | |
| cmap="jet", | |
| norm=mcolors.LogNorm() if melted["Value"].max() > 0 else None, | |
| ) | |
| fig4.colorbar(h[3], ax=ax4, label="Count (log scale)") | |
| ax4.set_xlabel("Position") | |
| ax4.set_ylabel(value_label) | |
| ax4.set_title(f"2D Density Heatmap — Position vs. {value_label}") | |
| ax4.set_xticks(positions_unique) | |
| ax4.grid(alpha=0.15) | |
| fig4.tight_layout() | |
| st.pyplot(fig4) | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| import traceback | |
| st.code(traceback.format_exc()) | |
| else: | |
| st.info("👆 Upload a data file (CSV or Excel) to start exploring.") | |
| # -------------------------------------------------- | |
| # TAB 5: Pipetting Command Generator | |
| # -------------------------------------------------- | |
| with tab5: | |
| from math import ceil | |
| st.header("🧪 Pipetting Command Generator for Eppendorf epMotion liquid handler") | |
| st.markdown(""" | |
| Upload your sample file (Excel, CSV, or TXT) containing binary mutation data. | |
| The app will: | |
| - Auto-detect or create `Sample`, `Position#`, `Total edited`, and `Volume per "1"` columns | |
| - Let you set the **Maximum volume per input well (µL)** used to compute `Volume per "1"` | |
| - Calculate total demand per input and suggest a **uniform layout** (same # consecutive wells per input) | |
| - **Preview** the layout on a plate map (with tooltips) | |
| - After confirmation, generate pipetting commands and a source volume summary | |
| """) | |
| uploaded_writing = st.file_uploader( | |
| "📤 Upload data file", | |
| type=["xlsx", "csv", "txt"], | |
| key="writing_uploader" | |
| ) | |
| max_per_well_ul = st.number_input( | |
| "Maximum volume per source well (µL)", | |
| min_value=10.0, max_value=2000.0, value=160.0, step=10.0 | |
| ) | |
| # ---------- Helpers (plate geometry, parsing, viz) ---------- | |
| ROWS_96 = ["A", "B", "C", "D", "E", "F", "G", "H"] | |
| COLS_96 = list(range(1, 13)) | |
| def well_name(row_letter, col_number): | |
| return f"{row_letter}{col_number}" | |
| def enumerate_plate_wells(): | |
| for r in ROWS_96: | |
| for c in COLS_96: | |
| yield f"{r}{c}" | |
| def parse_well_name(well: str): | |
| m = re.match(r"([A-Ha-h])\s*([0-9]+)", str(well).strip()) | |
| if not m: | |
| return ("A", 0) | |
| return (m.group(1).upper(), int(m.group(2))) | |
| def sample_index_to_plate_and_well(sample_idx: int): | |
| plate_num = ((sample_idx - 1) // 96) + 1 | |
| within_plate = (sample_idx - 1) % 96 | |
| row_idx = within_plate // 12 | |
| col_idx = within_plate % 12 | |
| return plate_num, well_name(ROWS_96[row_idx], COLS_96[col_idx]) | |
| def build_global_wells_list(n_plates: int): | |
| out = [] | |
| for p in range(1, n_plates + 1): | |
| for w in enumerate_plate_wells(): | |
| out.append((p, w)) | |
| return out | |
| def pick_tool(volume_ul: float) -> str: | |
| return "TS_10" if volume_ul <= 10.0 else "TS_50" | |
| PALETTE = [ | |
| "#4F46E5", "#22C55E", "#F59E0B", "#EF4444", "#06B6D4", "#A855F7", "#84CC16", "#F97316", | |
| "#0EA5E9", "#E11D48", "#10B981", "#7C3AED", "#15803D", "#EA580C", "#2563EB", "#DC2626" | |
| ] | |
| def render_plate_map_html(plates_used, well_to_input, max_wells_per_source, inputs_count): | |
| legend_spans = [] | |
| for i in range(1, inputs_count + 1): | |
| color = PALETTE[(i-1) % len(PALETTE)] | |
| legend_spans.append( | |
| f"<span style='display:inline-block;margin-right:12px'>" | |
| f"<span style='display:inline-block;width:12px;height:12px;background:{color};border:1px solid #333;margin-right:6px;vertical-align:middle'></span>" | |
| f"Input {i}</span>" | |
| ) | |
| legend_html = "<div style='margin:8px 0 16px 0'>" + "".join(legend_spans) + "</div>" | |
| css = """ | |
| <style> | |
| .plate { margin: 10px 0 24px 0; } | |
| .plate-title { font-weight: 600; margin: 4px 0 8px 0; } | |
| .grid { display: grid; grid-template-columns: 32px repeat(12, 38px); grid-auto-rows: 32px; gap: 4px; } | |
| .cell { width: 38px; height: 32px; border: 1px solid #DDD; display:flex; align-items:center; justify-content:center; font-size:12px; background:#FAFAFA; position:relative; } | |
| .head { font-weight:600; background:#F3F4F6; } | |
| .cell[data-color] { color:#111; } | |
| .cell .tip { visibility:hidden; opacity:0; transition:opacity 0.15s ease; position:absolute; bottom:100%; transform:translateY(-6px); left:50%; transform:translate(-50%, -6px); background:#111; color:#fff; padding:4px 6px; font-size:11px; border-radius:4px; white-space:nowrap; pointer-events:none; } | |
| .cell:hover .tip { visibility:visible; opacity:0.95; } | |
| </style> | |
| """ | |
| body = [css, legend_html] | |
| for p in range(1, plates_used + 1): | |
| body.append(f"<div class='plate'><div class='plate-title'>Plate {p}</div>") | |
| body.append("<div class='grid'>") | |
| body.append("<div class='cell head'></div>") | |
| for c in COLS_96: | |
| body.append(f"<div class='cell head'>{c}</div>") | |
| for r in ROWS_96: | |
| body.append(f"<div class='cell head'>{r}</div>") | |
| for c in COLS_96: | |
| well = f"{r}{c}" | |
| key = (p, well) | |
| if key in well_to_input: | |
| input_idx, within_idx = well_to_input[key] | |
| color = PALETTE[(input_idx-1) % len(PALETTE)] | |
| tip = f"Input {input_idx} • P{p}:{well} • Block well {within_idx}/{max_wells_per_source}" | |
| cell_html = ( | |
| f"<div class='cell' data-color style='background:{color};border-color:#555' title='{tip}'>" | |
| f"<span class='tip'>{tip}</span>" | |
| "</div>" | |
| ) | |
| else: | |
| cell_html = "<div class='cell'></div>" | |
| body.append(cell_html) | |
| body.append("</div></div>") | |
| return "".join(body) | |
| # ---------- Main flow ---------- | |
| if uploaded_writing is not None: | |
| try: | |
| if uploaded_writing.name.endswith(".xlsx"): | |
| df = pd.read_excel(uploaded_writing) | |
| elif uploaded_writing.name.endswith(".csv"): | |
| df = pd.read_csv(uploaded_writing) | |
| else: | |
| try: | |
| df = pd.read_csv(uploaded_writing, sep="\t") | |
| except Exception: | |
| df = pd.read_csv(uploaded_writing) | |
| st.success(f"✅ Loaded file with {len(df)} rows and {len(df.columns)} columns") | |
| df.columns = [str(c).strip() for c in df.columns] | |
| if not any(c.lower() == "sample" for c in df.columns): | |
| df.insert(0, "Sample", np.arange(1, len(df) + 1)) | |
| st.info("`Sample` column missing — automatically generated 1..N.") | |
| position_cols = [c for c in df.columns if re.match(r"(?i)^position\s*\d+", c)] | |
| if not position_cols: | |
| non_pos_cols = {"sample", "total edited", 'volume per "1"', "volume per 1"} | |
| candidate_cols = [c for c in df.columns if c.lower() not in non_pos_cols] | |
| position_cols = candidate_cols | |
| st.info(f"Position columns inferred automatically: {len(position_cols)} detected.") | |
| def pos_key(col_name: str): | |
| m = re.search(r"(\d+)", col_name) | |
| return int(m.group(1)) if m else 10**9 | |
| position_cols = sorted(position_cols, key=pos_key) | |
| df[position_cols] = df[position_cols].apply(pd.to_numeric, errors="coerce").fillna(0).astype(int) | |
| if "Total edited" not in df.columns: | |
| df["Total edited"] = df[position_cols].sum(axis=1).astype(int) | |
| st.info("`Total edited` column missing — calculated automatically as sum of 1s per row.") | |
| st.markdown("#### ⚙️ Volume Calculation Settings") | |
| default_total_vol = st.number_input( | |
| "Maximum volume per input well (µL)", | |
| min_value=1.0, max_value=10000.0, value=64.0, step=1.0, | |
| help="Used to compute Volume per '1' as (Maximum volume per input well / Total edited) when not provided." | |
| ) | |
| vol_candidates = [c for c in df.columns if "volume per" in c.lower()] | |
| if not vol_candidates: | |
| df['Volume per "1"'] = default_total_vol / df["Total edited"].replace(0, np.nan) | |
| df['Volume per "1"'] = df['Volume per "1"'].fillna(0) | |
| st.info(f'`Volume per "1"` column missing — calculated automatically as {default_total_vol:.0f} µL (max per input well) / Total edited.') | |
| volume_col = 'Volume per "1"' | |
| else: | |
| volume_col = vol_candidates[0] | |
| if df[volume_col].max() > max_per_well_ul: | |
| st.error( | |
| f"❌ At least one row has `Volume per \"1\"` greater than the per-well cap ({max_per_well_ul} µL). " | |
| "Increase the cap or reduce per-transfer volume." | |
| ) | |
| st.stop() | |
| vol_per_one_series = pd.to_numeric(df[volume_col], errors="coerce").fillna(0.0) | |
| total_volume_per_input = [float(vol_per_one_series[df[pos] == 1].sum()) for pos in position_cols] | |
| wells_needed_per_input = [int(ceil(tv / max_per_well_ul)) if tv > 0 else 0 for tv in total_volume_per_input] | |
| num_inputs = len(position_cols) | |
| max_wells_per_source = max(wells_needed_per_input) if wells_needed_per_input else 0 | |
| st.markdown("### 👀 Preview: Suggested Uniform Layout") | |
| if max_wells_per_source == 0: | |
| st.info("No edits detected — nothing to allocate.") | |
| st.stop() | |
| st.write( | |
| f"💡 Suggested layout: **{max_wells_per_source} consecutive wells per input** " | |
| f"(cap {max_per_well_ul:.0f} µL/well)." | |
| ) | |
| total_wells_needed_uniform = num_inputs * max_wells_per_source | |
| plates_needed = int(ceil(total_wells_needed_uniform / 96)) or 1 | |
| global_wells = sorted( | |
| build_global_wells_list(plates_needed), | |
| key=lambda x: ( | |
| x[0], | |
| ROWS_96.index(parse_well_name(x[1])[0]), | |
| parse_well_name(x[1])[1] | |
| ) | |
| ) | |
| global_wells = global_wells[:total_wells_needed_uniform] | |
| assigned_wells_map, well_to_input, preview_rows = {}, {}, [] | |
| for i in range(1, num_inputs + 1): | |
| start, end = (i - 1) * max_wells_per_source, i * max_wells_per_source | |
| block = global_wells[start:end] | |
| assigned_wells_map[i] = block | |
| for j, (p, w) in enumerate(block, start=1): | |
| well_to_input[(p, w)] = (i, j) | |
| block_str = ", ".join([f"P{p}:{w}" for (p, w) in block]) | |
| preview_rows.append({ | |
| "Input (Position #)": i, | |
| "Total demand (µL)": round(total_volume_per_input[i-1], 2), | |
| "Wells needed (actual)": wells_needed_per_input[i-1], | |
| "Allocated (uniform)": max_wells_per_source, | |
| "Assigned wells": block_str | |
| }) | |
| preview_df = pd.DataFrame(preview_rows) | |
| st.dataframe(preview_df, width="stretch", height=300) | |
| st.markdown("#### Plate Map (hover cells for details)") | |
| plate_html = render_plate_map_html(plates_needed, well_to_input, max_wells_per_source, num_inputs) | |
| st.markdown(plate_html, unsafe_allow_html=True) | |
| st.markdown("### ✅ Generate Pipetting Commands") | |
| if st.button("Generate using this layout"): | |
| per_input_well_cum = {i: [0.0] * max_wells_per_source for i in range(1, num_inputs + 1)} | |
| commands, source_volume_totals = [], {} | |
| for _, row in df.iterrows(): | |
| sample_id = int(row["Sample"]) | |
| vol_per_one = float(row[volume_col]) | |
| if vol_per_one <= 0: | |
| continue | |
| dest_plate, dest_well = sample_index_to_plate_and_well(sample_id) | |
| tool = pick_tool(vol_per_one) | |
| for pos_idx, col in enumerate(position_cols, start=1): | |
| if int(row[col]) != 1: | |
| continue | |
| wells_for_input = assigned_wells_map[pos_idx] | |
| cum_list = per_input_well_cum[pos_idx] | |
| chosen = None | |
| for j, ((src_plate, src_well), current_vol) in enumerate(zip(wells_for_input, cum_list)): | |
| if current_vol + vol_per_one <= max_per_well_ul: | |
| chosen = (j, src_plate, src_well) | |
| break | |
| if chosen is None: | |
| st.error( | |
| f"Allocation exhausted for Input {pos_idx} while creating commands. " | |
| "Increase the max volume per well or review per-transfer volume." | |
| ) | |
| st.stop() | |
| j, src_plate, src_well = chosen | |
| cum_list[j] += vol_per_one | |
| per_input_well_cum[pos_idx] = cum_list | |
| source_volume_totals[(src_plate, src_well)] = source_volume_totals.get((src_plate, src_well), 0.0) + vol_per_one | |
| commands.append({ | |
| "Input #": pos_idx, | |
| "Source plate": src_plate, | |
| "Source well": src_well, | |
| "Destination plate": dest_plate, | |
| "Destination well": dest_well, | |
| "Volume": round(vol_per_one, 2), | |
| "Tool": tool | |
| }) | |
| commands_df = pd.DataFrame(commands) | |
| def row_idx_from_well(w): return ROWS_96.index(parse_well_name(w)[0]) | |
| def col_num_from_well(w): return parse_well_name(w)[1] | |
| commands_df["Src_row_idx"] = commands_df["Source well"].apply(row_idx_from_well) | |
| commands_df["Src_col_num"] = commands_df["Source well"].apply(col_num_from_well) | |
| commands_df["Dst_row_idx"] = commands_df["Destination well"].apply(row_idx_from_well) | |
| commands_df["Dst_col_num"] = commands_df["Destination well"].apply(col_num_from_well) | |
| commands_df = commands_df.sort_values( | |
| by=["Input #", "Source plate", "Src_row_idx", "Src_col_num", | |
| "Destination plate", "Dst_row_idx", "Dst_col_num"], | |
| kind="stable" | |
| ) | |
| commands_df = commands_df[[ | |
| "Input #", "Source plate", "Source well", | |
| "Destination plate", "Destination well", "Volume", "Tool" | |
| ]] | |
| st.success(f"✅ Generated {len(commands_df)} commands across {num_inputs} inputs.") | |
| summary_rows = [] | |
| for i in range(1, num_inputs + 1): | |
| for (p, w), used in zip(assigned_wells_map[i], per_input_well_cum[i]): | |
| total = source_volume_totals.get((p, w), 0.0) | |
| summary_rows.append({ | |
| "Source": i, "Source plate": p, "Source well": w, | |
| "Total volume taken (µL)": round(total, 2), | |
| "Allocated capacity (µL)": round(max_per_well_ul, 2) | |
| }) | |
| summary_df = pd.DataFrame(summary_rows) | |
| summary_df["Src_row_idx"] = summary_df["Source well"].apply(row_idx_from_well) | |
| summary_df["Src_col_num"] = summary_df["Source well"].apply(col_num_from_well) | |
| summary_df = summary_df.sort_values( | |
| by=["Source", "Source plate", "Src_row_idx", "Src_col_num"], | |
| kind="stable" | |
| )[ | |
| ["Source", "Source plate", "Source well", "Total volume taken (µL)", "Allocated capacity (µL)"] | |
| ] | |
| st.markdown("### 💧 Pipetting Commands") | |
| st.dataframe(commands_df, width="stretch", height=400) | |
| st.download_button("⬇️ Download Commands CSV", commands_df.to_csv(index=False), "pipetting_commands.csv", mime="text/csv") | |
| st.markdown("### 📊 Source Volume Summary") | |
| st.dataframe(summary_df, width="stretch", height=400) | |
| st.download_button("⬇️ Download Source Summary CSV", summary_df.to_csv(index=False), "source_volume_summary.csv", mime="text/csv") | |
| except Exception as e: | |
| st.error(f"❌ Error processing file: {e}") | |
| else: | |
| st.info("👆 Upload an Excel/CSV/TXT file to start.") |