|
|
@@ -0,0 +1,203 @@
|
|
|
+import socket
|
|
|
+import time
|
|
|
+import os
|
|
|
+import pandas as pd
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+# === Configuration ===
|
|
|
+SENSOR_IP = '158.193.241.163'
|
|
|
+SENSOR_PORT = 10001
|
|
|
+LOG_INTERVAL = 10 # Save particle data every 10 seconds
|
|
|
+CYCLE_MINUTES = 6 # Trigger Telegram 8 cycle every 6 minutes
|
|
|
+
|
|
|
+# === Paths ===
|
|
|
+BASE_DIR = '/home/ranny/Particle_Data'
|
|
|
+PARTICLE_DIR = os.path.join(BASE_DIR, 'ParticleData')
|
|
|
+TELEGRAM8_DIR = os.path.join(BASE_DIR, 'Telegram8Data')
|
|
|
+ERROR_LOG_FILE = os.path.join(BASE_DIR, 'ErrorLog', 'error_log.txt')
|
|
|
+
|
|
|
+# === Command Constants ===
|
|
|
+CMD_KY1 = '00KY00001\r\n'
|
|
|
+CMD_KY0 = '00KY00000\r\n'
|
|
|
+CMD_TM3 = '00TM00003\r\n'
|
|
|
+CMD_TR8 = '00TR00008\r\n'
|
|
|
+ACK_KY1 = '!00KY00001'
|
|
|
+ACK_KY0 = '!00KY00000'
|
|
|
+
|
|
|
+# === Data Columns ===
|
|
|
+particle_columns = ['Timestamp', 'Amplitude', 'Duration', 'ITS',
|
|
|
+ 'Diameter1 [mm]', 'Velocity [m/s]',
|
|
|
+ 'Diameter Hamburger [mm]', 'Velocity Hamburger [m/s]', 'Temperature [C]']
|
|
|
+
|
|
|
+telegram8_columns = ["Timestamp", "ID", "Serial", "Intensity", "Date", "Time", "Code1", "Code2", "METAR1",
|
|
|
+ "Value1", "Code3", "Code4", "METAR2", "Value2",
|
|
|
+ "Diameter", "Velocity", "Amount_mm",
|
|
|
+ "Visibility_m", "Reflectivity_dBZ", "Quality_pct",
|
|
|
+ "Hail_mm", "End"]
|
|
|
+
|
|
|
+particle_df = pd.DataFrame(columns=particle_columns)
|
|
|
+
|
|
|
+# === Ensure directories exist ===
|
|
|
+for folder in [PARTICLE_DIR, TELEGRAM8_DIR, os.path.dirname(ERROR_LOG_FILE)]:
|
|
|
+ os.makedirs(folder, exist_ok=True)
|
|
|
+
|
|
|
+def log_error(message):
|
|
|
+ timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
|
|
|
+ with open(ERROR_LOG_FILE, 'a') as f:
|
|
|
+ f.write(f"{timestamp} {message}\n")
|
|
|
+
|
|
|
+def communicate(command, expect_reply=True, expected_ack=None, retries=3):
|
|
|
+ for attempt in range(retries):
|
|
|
+ try:
|
|
|
+ print(f">>> {command.strip()}")
|
|
|
+ with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
|
|
|
+ s.sendall(command.encode())
|
|
|
+ if expect_reply:
|
|
|
+ response = s.recv(1024).decode(errors='ignore').strip()
|
|
|
+ print(f"<<< {response}")
|
|
|
+ if expected_ack is None or expected_ack in response:
|
|
|
+ return response
|
|
|
+ except Exception as e:
|
|
|
+ msg = f"Attempt {attempt + 1} failed communicating command '{command.strip()}': {e}"
|
|
|
+ print(f"[{datetime.now()}] {msg}")
|
|
|
+ log_error(msg)
|
|
|
+ time.sleep(1)
|
|
|
+ return None
|
|
|
+
|
|
|
+def save_df_to_csv(df, path):
|
|
|
+ file_exists = os.path.exists(path)
|
|
|
+ df.to_csv(path, mode='a', index=False, header=not file_exists)
|
|
|
+
|
|
|
+def collect_telegram8():
|
|
|
+ time.sleep(1)
|
|
|
+ try:
|
|
|
+ with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
|
|
|
+ s.sendall(CMD_TR8.encode())
|
|
|
+ time.sleep(1)
|
|
|
+ response = s.recv(2048).decode(errors='ignore').strip()
|
|
|
+ print("<<< FULL RESPONSE (Telegram 8):")
|
|
|
+ print(response)
|
|
|
+ except Exception as e:
|
|
|
+ log_error(f"TR8 communication error: {e}")
|
|
|
+ return
|
|
|
+
|
|
|
+ telegram_line = None
|
|
|
+ if response:
|
|
|
+ start = response.find("00;")
|
|
|
+ if start != -1:
|
|
|
+ cleaned = response[start:].replace('\x03', '').replace('\r', '').replace('\n', '')
|
|
|
+ if cleaned.count(';') >= 20:
|
|
|
+ telegram_line = cleaned
|
|
|
+
|
|
|
+ if telegram_line:
|
|
|
+ now = datetime.now()
|
|
|
+ iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ row = [iso_timestamp] + telegram_line.split(';')[:len(telegram8_columns) - 1]
|
|
|
+ df = pd.DataFrame([row], columns=telegram8_columns)
|
|
|
+ filename = os.path.join(TELEGRAM8_DIR, f"telegram8_{now.strftime('%Y-%m-%d')}.csv")
|
|
|
+ save_df_to_csv(df, filename)
|
|
|
+ print(f"Saved Telegram 8 to {filename}")
|
|
|
+ else:
|
|
|
+ print("No valid Telegram 8 found")
|
|
|
+
|
|
|
+ if communicate(CMD_KY1, expected_ack=ACK_KY1):
|
|
|
+ # DO NOT CHECK ACK_TM3, sensor returns just "!"
|
|
|
+ communicate(CMD_TM3, expect_reply=True) # accept any !
|
|
|
+ else:
|
|
|
+ log_error("Failed to re-enter config mode after Telegram 8")
|
|
|
+
|
|
|
+# === MAIN LOOP ===
|
|
|
+while True:
|
|
|
+ try:
|
|
|
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
|
+ s.settimeout(30)
|
|
|
+ s.connect((SENSOR_IP, SENSOR_PORT))
|
|
|
+ buffer = ""
|
|
|
+ print("Connected. Listening for particle data...")
|
|
|
+
|
|
|
+ last_write_time = time.time()
|
|
|
+ last_cycle_time = time.time()
|
|
|
+ skip_first_line = True
|
|
|
+ first_line_after_tr8 = False
|
|
|
+
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ data = s.recv(1024).decode('utf-8', errors='replace')
|
|
|
+ if not data:
|
|
|
+ break
|
|
|
+
|
|
|
+ buffer += data
|
|
|
+
|
|
|
+ while "\r\n" in buffer:
|
|
|
+ line, buffer = buffer.split("\r\n", 1)
|
|
|
+
|
|
|
+ if skip_first_line:
|
|
|
+ print("Skipping first corrupted line.")
|
|
|
+ skip_first_line = False
|
|
|
+ continue
|
|
|
+
|
|
|
+ if first_line_after_tr8:
|
|
|
+ idx = line.find("!00KY00000")
|
|
|
+ if idx != -1:
|
|
|
+ line = line[idx + len("!00KY00000"):].strip()
|
|
|
+ first_line_after_tr8 = False
|
|
|
+
|
|
|
+ now = datetime.now()
|
|
|
+ iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ fields = line.strip().split(';')
|
|
|
+
|
|
|
+ if len(fields) == 8:
|
|
|
+ try:
|
|
|
+ parsed = [int(fields[0]), int(fields[1]), int(fields[2])] + \
|
|
|
+ [float(f) for f in fields[3:7]] + [int(fields[7])]
|
|
|
+ except:
|
|
|
+ parsed = fields
|
|
|
+
|
|
|
+ row = [iso_timestamp] + parsed
|
|
|
+ particle_df.loc[len(particle_df)] = row
|
|
|
+ print(f"[{iso_timestamp}] {fields}")
|
|
|
+
|
|
|
+ except socket.timeout:
|
|
|
+ print("Timeout.")
|
|
|
+ except Exception as e:
|
|
|
+ log_error(f"Socket error: {e}")
|
|
|
+ break
|
|
|
+
|
|
|
+ if time.time() - last_write_time >= LOG_INTERVAL:
|
|
|
+ if not particle_df.empty:
|
|
|
+ filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
|
|
|
+ save_df_to_csv(particle_df, filename)
|
|
|
+ print(f"Saved {len(particle_df)} new rows to {filename}")
|
|
|
+ particle_df = pd.DataFrame(columns=particle_columns)
|
|
|
+ last_write_time = time.time()
|
|
|
+
|
|
|
+ if time.time() - last_cycle_time >= CYCLE_MINUTES * 60:
|
|
|
+ print(f"\n[{datetime.now()}] Triggering Telegram 8 cycle...")
|
|
|
+
|
|
|
+ if not particle_df.empty:
|
|
|
+ filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
|
|
|
+ save_df_to_csv(particle_df, filename)
|
|
|
+ particle_df = pd.DataFrame(columns=particle_columns)
|
|
|
+
|
|
|
+ s.close()
|
|
|
+ print("Closed particle socket. Waiting 2 seconds before switching to TR8 mode...")
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ for attempt in range(5):
|
|
|
+ try:
|
|
|
+ first_line_after_tr8 = True
|
|
|
+ skip_first_line = True
|
|
|
+ collect_telegram8()
|
|
|
+ break
|
|
|
+ except Exception as e:
|
|
|
+ log_error(f"Retry {attempt+1} Telegram 8 failed: {e}")
|
|
|
+ time.sleep(2 + attempt)
|
|
|
+ else:
|
|
|
+ log_error("Failed to collect Telegram 8 after multiple retries.")
|
|
|
+
|
|
|
+ last_cycle_time = time.time()
|
|
|
+ break # reconnect to sensor
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log_error(f"Top-level error: {e}")
|
|
|
+ time.sleep(5)
|