import socket import time import os import pandas as pd from datetime import datetime # === Configuration === SENSOR_IP = '158.193.241.163' SENSOR_PORT = 10001 LOG_INTERVAL = 10 # Save particle data every 10 seconds CYCLE_MINUTES = 6 # Trigger Telegram 8 cycle every 6 minutes # === Paths === BASE_DIR = '/home/ranny/Particle_Data' PARTICLE_DIR = os.path.join(BASE_DIR, 'ParticleData') TELEGRAM8_DIR = os.path.join(BASE_DIR, 'Telegram8Data') ERROR_LOG_FILE = os.path.join(BASE_DIR, 'ErrorLog', 'error_log.txt') # === Command Constants === CMD_KY1 = '00KY00001\r\n' CMD_KY0 = '00KY00000\r\n' CMD_TM3 = '00TM00003\r\n' CMD_TR8 = '00TR00008\r\n' ACK_KY1 = '!00KY00001' ACK_KY0 = '!00KY00000' # === Data Columns === particle_columns = ['Timestamp', 'Amplitude', 'Duration', 'ITS', 'Diameter1 [mm]', 'Velocity [m/s]', 'Diameter Hamburger [mm]', 'Velocity Hamburger [m/s]', 'Temperature [C]'] telegram8_columns = ["Timestamp", "ID", "Serial", "Intensity", "Date", "Time", "Code1", "Code2", "METAR1", "Value1", "Code3", "Code4", "METAR2", "Value2", "Diameter", "Velocity", "Amount_mm", "Visibility_m", "Reflectivity_dBZ", "Quality_pct", "Hail_mm", "End"] particle_df = pd.DataFrame(columns=particle_columns) # === Ensure directories exist === for folder in [PARTICLE_DIR, TELEGRAM8_DIR, os.path.dirname(ERROR_LOG_FILE)]: os.makedirs(folder, exist_ok=True) def log_error(message): timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") with open(ERROR_LOG_FILE, 'a') as f: f.write(f"{timestamp} {message}\n") def communicate(command, expect_reply=True, expected_ack=None, retries=3): for attempt in range(retries): try: print(f">>> {command.strip()}") with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s: s.sendall(command.encode()) if expect_reply: response = s.recv(1024).decode(errors='ignore').strip() print(f"<<< {response}") if expected_ack is None or expected_ack in response: return response except Exception as e: msg = f"Attempt {attempt + 1} failed communicating command '{command.strip()}': {e}" print(f"[{datetime.now()}] {msg}") log_error(msg) time.sleep(1) return None def save_df_to_csv(df, path): file_exists = os.path.exists(path) df.to_csv(path, mode='a', index=False, header=not file_exists) def collect_telegram8(): time.sleep(1) try: with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s: s.sendall(CMD_TR8.encode()) time.sleep(1) response = s.recv(2048).decode(errors='ignore').strip() print("<<< FULL RESPONSE (Telegram 8):") print(response) except Exception as e: log_error(f"TR8 communication error: {e}") return telegram_line = None if response: start = response.find("00;") if start != -1: cleaned = response[start:].replace('\x03', '').replace('\r', '').replace('\n', '') if cleaned.count(';') >= 20: telegram_line = cleaned if telegram_line: now = datetime.now() iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S') row = [iso_timestamp] + telegram_line.split(';')[:len(telegram8_columns) - 1] df = pd.DataFrame([row], columns=telegram8_columns) filename = os.path.join(TELEGRAM8_DIR, f"telegram8_{now.strftime('%Y-%m-%d')}.csv") save_df_to_csv(df, filename) print(f"Saved Telegram 8 to {filename}") else: print("No valid Telegram 8 found") if communicate(CMD_KY1, expected_ack=ACK_KY1): # DO NOT CHECK ACK_TM3, sensor returns just "!" communicate(CMD_TM3, expect_reply=True) # accept any ! else: log_error("Failed to re-enter config mode after Telegram 8") # === MAIN LOOP === while True: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(30) s.connect((SENSOR_IP, SENSOR_PORT)) buffer = "" print("Connected. Listening for particle data...") last_write_time = time.time() last_cycle_time = time.time() skip_first_line = True first_line_after_tr8 = False while True: try: data = s.recv(1024).decode('utf-8', errors='replace') if not data: break buffer += data while "\r\n" in buffer: line, buffer = buffer.split("\r\n", 1) if skip_first_line: print("Skipping first corrupted line.") skip_first_line = False continue if first_line_after_tr8: idx = line.find("!00KY00000") if idx != -1: line = line[idx + len("!00KY00000"):].strip() first_line_after_tr8 = False now = datetime.now() iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S') fields = line.strip().split(';') if len(fields) == 8: try: parsed = [int(fields[0]), int(fields[1]), int(fields[2])] + \ [float(f) for f in fields[3:7]] + [int(fields[7])] except: parsed = fields row = [iso_timestamp] + parsed particle_df.loc[len(particle_df)] = row print(f"[{iso_timestamp}] {fields}") except socket.timeout: print("Timeout.") except Exception as e: log_error(f"Socket error: {e}") break if time.time() - last_write_time >= LOG_INTERVAL: if not particle_df.empty: filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv") save_df_to_csv(particle_df, filename) print(f"Saved {len(particle_df)} new rows to {filename}") particle_df = pd.DataFrame(columns=particle_columns) last_write_time = time.time() if time.time() - last_cycle_time >= CYCLE_MINUTES * 60: print(f"\n[{datetime.now()}] Triggering Telegram 8 cycle...") if not particle_df.empty: filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv") save_df_to_csv(particle_df, filename) particle_df = pd.DataFrame(columns=particle_columns) s.close() print("Closed particle socket. Waiting 2 seconds before switching to TR8 mode...") time.sleep(2) for attempt in range(5): try: first_line_after_tr8 = True skip_first_line = True collect_telegram8() break except Exception as e: log_error(f"Retry {attempt+1} Telegram 8 failed: {e}") time.sleep(2 + attempt) else: log_error("Failed to collect Telegram 8 after multiple retries.") last_cycle_time = time.time() break # reconnect to sensor except Exception as e: log_error(f"Top-level error: {e}") time.sleep(5)