| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- import socket
- import time
- import os
- import pandas as pd
- from datetime import datetime
- # === Configuration ===
- SENSOR_IP = '158.193.241.163'
- SENSOR_PORT = 10001
- LOG_INTERVAL = 10 # Save particle data every 10 seconds
- CYCLE_MINUTES = 6 # Trigger Telegram 8 cycle every 6 minutes
- # === Paths ===
- BASE_DIR = '/home/ranny/Particle_Data'
- PARTICLE_DIR = os.path.join(BASE_DIR, 'ParticleData')
- TELEGRAM8_DIR = os.path.join(BASE_DIR, 'Telegram8Data')
- ERROR_LOG_FILE = os.path.join(BASE_DIR, 'ErrorLog', 'error_log.txt')
- # === Command Constants ===
- CMD_KY1 = '00KY00001\r\n'
- CMD_KY0 = '00KY00000\r\n'
- CMD_TM3 = '00TM00003\r\n'
- CMD_TR8 = '00TR00008\r\n'
- ACK_KY1 = '!00KY00001'
- ACK_KY0 = '!00KY00000'
- # === Data Columns ===
- particle_columns = ['Timestamp', 'Amplitude', 'Duration', 'ITS',
- 'Diameter1 [mm]', 'Velocity [m/s]',
- 'Diameter Hamburger [mm]', 'Velocity Hamburger [m/s]', 'Temperature [C]']
- telegram8_columns = ["Timestamp", "ID", "Serial", "Intensity", "Date", "Time", "Code1", "Code2", "METAR1",
- "Value1", "Code3", "Code4", "METAR2", "Value2",
- "Diameter", "Velocity", "Amount_mm",
- "Visibility_m", "Reflectivity_dBZ", "Quality_pct",
- "Hail_mm", "End"]
- particle_df = pd.DataFrame(columns=particle_columns)
- # === Ensure directories exist ===
- for folder in [PARTICLE_DIR, TELEGRAM8_DIR, os.path.dirname(ERROR_LOG_FILE)]:
- os.makedirs(folder, exist_ok=True)
- def log_error(message):
- timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
- with open(ERROR_LOG_FILE, 'a') as f:
- f.write(f"{timestamp} {message}\n")
- def communicate(command, expect_reply=True, expected_ack=None, retries=3):
- for attempt in range(retries):
- try:
- print(f">>> {command.strip()}")
- with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
- s.sendall(command.encode())
- if expect_reply:
- response = s.recv(1024).decode(errors='ignore').strip()
- print(f"<<< {response}")
- if expected_ack is None or expected_ack in response:
- return response
- except Exception as e:
- msg = f"Attempt {attempt + 1} failed communicating command '{command.strip()}': {e}"
- print(f"[{datetime.now()}] {msg}")
- log_error(msg)
- time.sleep(1)
- return None
- def save_df_to_csv(df, path):
- file_exists = os.path.exists(path)
- df.to_csv(path, mode='a', index=False, header=not file_exists)
- def collect_telegram8():
- time.sleep(1)
- try:
- with socket.create_connection((SENSOR_IP, SENSOR_PORT), timeout=5) as s:
- s.sendall(CMD_TR8.encode())
- time.sleep(1)
- response = s.recv(2048).decode(errors='ignore').strip()
- print("<<< FULL RESPONSE (Telegram 8):")
- print(response)
- except Exception as e:
- log_error(f"TR8 communication error: {e}")
- return
- telegram_line = None
- if response:
- start = response.find("00;")
- if start != -1:
- cleaned = response[start:].replace('\x03', '').replace('\r', '').replace('\n', '')
- if cleaned.count(';') >= 20:
- telegram_line = cleaned
- if telegram_line:
- now = datetime.now()
- iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
- row = [iso_timestamp] + telegram_line.split(';')[:len(telegram8_columns) - 1]
- df = pd.DataFrame([row], columns=telegram8_columns)
- filename = os.path.join(TELEGRAM8_DIR, f"telegram8_{now.strftime('%Y-%m-%d')}.csv")
- save_df_to_csv(df, filename)
- print(f"Saved Telegram 8 to {filename}")
- else:
- print("No valid Telegram 8 found")
- if communicate(CMD_KY1, expected_ack=ACK_KY1):
- # DO NOT CHECK ACK_TM3, sensor returns just "!"
- communicate(CMD_TM3, expect_reply=True) # accept any !
- else:
- log_error("Failed to re-enter config mode after Telegram 8")
- # === MAIN LOOP ===
- while True:
- try:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.settimeout(30)
- s.connect((SENSOR_IP, SENSOR_PORT))
- buffer = ""
- print("Connected. Listening for particle data...")
- last_write_time = time.time()
- last_cycle_time = time.time()
- skip_first_line = True
- first_line_after_tr8 = False
- while True:
- try:
- data = s.recv(1024).decode('utf-8', errors='replace')
- if not data:
- break
- buffer += data
- while "\r\n" in buffer:
- line, buffer = buffer.split("\r\n", 1)
- if skip_first_line:
- print("Skipping first corrupted line.")
- skip_first_line = False
- continue
- if first_line_after_tr8:
- idx = line.find("!00KY00000")
- if idx != -1:
- line = line[idx + len("!00KY00000"):].strip()
- first_line_after_tr8 = False
- now = datetime.now()
- iso_timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
- fields = line.strip().split(';')
- if len(fields) == 8:
- try:
- parsed = [int(fields[0]), int(fields[1]), int(fields[2])] + \
- [float(f) for f in fields[3:7]] + [int(fields[7])]
- except:
- parsed = fields
- row = [iso_timestamp] + parsed
- particle_df.loc[len(particle_df)] = row
- print(f"[{iso_timestamp}] {fields}")
- except socket.timeout:
- print("Timeout.")
- except Exception as e:
- log_error(f"Socket error: {e}")
- break
- if time.time() - last_write_time >= LOG_INTERVAL:
- if not particle_df.empty:
- filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
- save_df_to_csv(particle_df, filename)
- print(f"Saved {len(particle_df)} new rows to {filename}")
- particle_df = pd.DataFrame(columns=particle_columns)
- last_write_time = time.time()
- if time.time() - last_cycle_time >= CYCLE_MINUTES * 60:
- print(f"\n[{datetime.now()}] Triggering Telegram 8 cycle...")
- if not particle_df.empty:
- filename = os.path.join(PARTICLE_DIR, f"particle_data_{datetime.now().strftime('%Y-%m-%d')}.csv")
- save_df_to_csv(particle_df, filename)
- particle_df = pd.DataFrame(columns=particle_columns)
- s.close()
- print("Closed particle socket. Waiting 2 seconds before switching to TR8 mode...")
- time.sleep(2)
- for attempt in range(5):
- try:
- first_line_after_tr8 = True
- skip_first_line = True
- collect_telegram8()
- break
- except Exception as e:
- log_error(f"Retry {attempt+1} Telegram 8 failed: {e}")
- time.sleep(2 + attempt)
- else:
- log_error("Failed to collect Telegram 8 after multiple retries.")
- last_cycle_time = time.time()
- break # reconnect to sensor
- except Exception as e:
- log_error(f"Top-level error: {e}")
- time.sleep(5)
|