In the last couple of articles, I reviewed our tools for automating KStars and Ekos, which provide a complete suite of tools for operating a telescope and taking pictures. Using D-Bus, we can control these programs as if we’re at the keyboard, allowing us to take advantage of these programs' capabilities rather than reinvent the wheel. Now we can tie all the disparate parts together into a Master Control Program to run the observatory in a completely autonomous way. Each component needs to be working before we get to this point - if the telescope isn’t focusing with auto-focus, trying to run Ekos schedules that include auto-focusing is pointless.
To do this, I implemented a service on our Linux-based telescope controller (in my case, running the Stellarmate X OS on a Beelink MiniPC) that uses a Python program that continuously loops, checking weather, opening and closing the roof and invoking an Ekos scheduler.
I’ve simplified and removed comments, logging, etc., from the code. As the program evolves, you can see it here in my Github repo for EKOS Processing Scripts.
After we set up D-Bus and KStars/Ekos as in the previous articles, the main loop looks like this:
while True:
logger.info('Main loop starting...')
# If needed shut down and wait 5 mins
if getRain() or checkSun():
ekosScheduler.stop()
obsyState = "Closed"
obsyClose()
time.sleep(300)
continue
else:
logger.info('No rain or not daytime.')
# Stay closed or move to Close Pending if Open
if mlCloudDetect() or getWeather():
if obsyState == "Closed":
time.sleep(60)
continue
# If Open give it PENDING minutes to change
if obsyState == "Open":
obsyState="Close Pending"
pendingCount=1
if obsyState == "Close Pending":
pendingCount+=1
if pendingCount == maxPending:
obsyState="Closed"
obsyClose()
pendingCount=0
else:
# Good weather so set to Open Pending or Open
if obsyState != "Open":
obsyState="Open Pending"
pendingCount=1
if obsyState == "Open Pending":
pendingCount+=1
if pendingCount==maxPending:
obsyState="Open"
obsyOpen()
ekosScheduler.loadScheduler('daily.esl')
ekosScheduler.start()
pendingCount=0
time.sleep(60)
ekos.stop()
Since it’s not worth doing anything when the Sun is up, this routine calculates whether it’s up or not
import astropy.coordinates as coord
def checkSun():
loc = coord.EarthLocation(LONGITUDE * u.deg,LATITUDE * u.deg)
now = Time.now()
altaz = coord.AltAz(location=loc, obstime=now)
sun = coord.get_sun(now).transform_to(altaz)
if (sun.alt.degree > -6.0):
logging.info("Sun is up")
return True
else:
logging.info("Sun is down")
return False
The main loop essentially implements 4 states for the observatory: Closed, Close Pending (we’re open, but some clouds are rolling in), Open Pending (it’s clearing up!) and Open (execute the daily Ekos Schedule).
If we check the RG-11-based rain detector with getRain()
and it returns true, or clouds are detected by our cloud detector software in AllSkyCam images, we do a roof close and cancel any currently executing schedules. The code for these functions are below, details are in the articles linked above. Since I don’t want to load the model for the cloud detection every loop initialization is done elsewhere.
def getRain():
try:
ser = serial.Serial(getConfig(RAINPORT),2400,timeout=1)
ser.flush()
packet=ser.readline()
except Exception as msg:
logging.error("getRain error: "+msg)
if (packet != b"safe#"):
logging.info("Rain detected by Hydreon RG-11!")
return True
else:
logging.info("Rain not detected by Hydreon RG-11.")
return False
def mlCloudDetect():
# Create a data array
data = np.ndarray(shape=(1, 224, 224, 3), dtype=np.float32)
# Find the latest image in the allsky cam
image = Image.open("mlCloudDetect/latest.jpg").convert("RGB")
# resize the image and crop
size = (224, 224)
image = ImageOps.fit(image, size, Image.Resampling.LANCZOS)
# turn the image into a numpy array
image_array = np.asarray(image)
# Normalize the image
norm_image_array = (image_array.astype(np.float32) / 127.5) - 1
# Load the image into the array
data[0] = norm_image_array
# Run the Keras model
prediction = model.predict(data)
index = np.argmax(prediction)
class_name = class_names[index]
return (class_name != 'Clear')
I also pull weather information in the form of wind speed from a Argent ADS-WS1 weather station. Thanks to Evan Vander Stoep for most of this code1.
def getWeather():
try:
ser = serial.Serial(getConfig(WEATHERPORT),2400,timeout=1)
ser.flush()
packet=ser.readline()
except Exception as msg:
logging.error("getWeather error: "+msg)
return False
header = packet[0:2]
eom = packet[50:55]
if header == b"!!" and eom == b"\r\n":
# Wind Speed Calculations
wind_speed = int(codecs.decode(packet[2:6], 'UTF-8'), 16)
wind_speed = (wind_speed / 10)
wind_speed = (wind_speed / 1.609344)
wind_speed = round(wind_speed , 1)
wx_wind_speed = wind_speed
# Average Wind Speed Calculations
average_wind_speed = int(codecs.decode(packet[46:50],
'UTF-8'), 16)
average_wind_speed = (average_wind_speed / 10)
average_wind_speed = (average_wind_speed / 1.609344)
average_wind_speed = round(average_wind_speed , 1)
wx_average_wind_speed = average_wind_speed
# Determine whether we should open dome
if (wx_average_wind_speed < MAXAVWIND)
or (wx_wind_speed < MAXWIND):
return True
else:
return False
else:
logging.error("Unable to get weather data, returning False")
return False
Because (as you can see below) my telescope needs to be parked before I can close the roof, the obsyClose()
function parks the scope before the roof is closed. The function obsyOpen()
also checks for parking before opening the roof.
For INDI, we use PyIndi to communicate with the INDI server on the telescope computer. Since this is a network connection, we can run our code on any computer in the network and access remote devices. Similarly, since my observatory roof controller is on the network, I can control it from the telescope computer. INDI makes it very flexible as to where devices can reside on the network.
To connect to our INDI server, we do the following:
import PyIndi
indiclient=IndiClient()
indiclient.setServer(INDISERVER,INDIPORT)
logging.info('Connecting to INDI server')
if (not(indiclient.connectServer())):
logging.error("No indiserver running on "+indiclient.getHost()+":"+str(indiclient.getPort()))
sys.exit(1)
The IndiClient class below extends PyIndi.BaseClient so I can implement logging and other logic inside the class.
class IndiClient(PyIndi.BaseClient):
device=None
imgIdx=0
def __init__(self):
super(IndiClient, self).__init__()
self.logger = logging.getLogger('PyQtIndi.IndiClient')
self.logger.info('creating an instance of PyQtIndi.IndiClient')
def newDevice(self, d):
self.logger.info("new device " + d.getDeviceName())
if d.getDeviceName() == "CCD Simulator":
self.logger.info("Set new device CCD Simulator!")
# save reference to the device in member variable
self.device = d
def newProperty(self, p):
self.logger.info("new property "+ p.getName() +
" for device "+ p.getDeviceName())
if (self.device is not None
and p.getName() == "CONNECTION"
and p.getDeviceName() == self.device.getDeviceName()):
self.logger.info("Got property CONNECTION for CCD Simulator!")
# connect to device
self.connectDevice(self.device.getDeviceName())
# set BLOB mode to BLOB_ALSO
self.setBLOBMode(1, self.device.getDeviceName(), None)
if p.getName() == "CCD_EXPOSURE":
# take first exposure
self.takeExposure()
def removeProperty(self, p):
self.logger.info("remove property "+ p.getName() +
" for device "+ p.getDeviceName())
def newSwitch(self, svp):
self.logger.info ("new Switch "+ svp.name.decode() +
" for device "+ svp.device.decode())
def newNumber(self, nvp):
self.logger.info("new Number "+ nvp.name +
" for device "+ nvp.device)
def newText(self, tvp):
self.logger.info("new Text "+ tvp.name.decode() +
" for device "+ tvp.device.decode())
def newLight(self, lvp):
self.logger.info("new Light "+ lvp.name.decode() +
" for device "+ lvp.device.decode())
def newMessage(self, d, m):
self.logger.info("new Message "+ d.messageQueue(m))
def serverConnected(self):
print("Server connected ("+self.getHost()+":"+str(self.getPort())+")")
def serverDisconnected(self, code):
self.logger.info("Server disconnected (exit code = "+str(code)
+","+str(self.getHost())+":"+str(self.getPort())+")")
To Park the telescope, we need to connect to it. Usually, each of these calls would check that they were successful and re-run the command with a timeout, but for brevity, just the important code is included.
# connect the scope
telescope='Telescope Simulator'
# get the telescope device
device_telescope=indiclient.getDevice(telescope)
# get a CONNECTION property be defined for telescope
telescope_connect=device_telescope.getSwitch("CONNECTION")
# if the telescope device is not connected, we do connect it
if not(device_telescope.isConnected()):
telescope_connect[0].s=PyIndi.ISS_ON # the "CONNECT" switch
telescope_connect[1].s=PyIndi.ISS_OFF # the "DISCONNECT" switch
indiclient.sendNewSwitch(telescope_connect)
else:
logging.info('Telescope connected')
Finally, we parked the telescope and roll back the roof. This is a typical INDI command - get the value of a Switch in a list, set the list members to the correct value for a particular attribute (in this case, TELESCOPE_PARK), and send it back to initiate the action. Once the command is sent, monitor the state until it’s no longer showing busy, recheck the status to ensure it’s parked, and close the roof. Since my INDI-based observatory controller is “in the shop” getting upgraded, I use a simple network relay controller to toggle the roof switch on and off to activate the Aleko sliding gate opener that controls the roof.
# Tell INDI to Park the scope
telescope_parkstatus=device_telescope.getSwitch("TELESCOPE_PARK")
telescope_parkstatus[0].s=PyIndi.ISS_ON # the "PARK" switch
telescope_parkstatus[1].s=PyIndi.ISS_OFF # the "UNPARKED" switch
indiclient.sendNewSwitch(telescope_parkstatus) # send new value
# Wait til the scope is finished moving
telescope_parkstatus=device_telescope.getSwitch("TELESCOPE_PARK")
while (telescope_parkstatus.getState()==PyIndi.IPS_BUSY):
time.sleep(2)
# Double check parked status
telescope_parkstatus=device_telescope.getSwitch("TELESCOPE_PARK")
while not(telescope_parkstatus):
time.sleep(0.5)
telescope_parkstatus=device_telescope.getSwitch("TELESCOPE_PARK")
# Temporary use of network relay to open and close roof
# Turn on the first relay
url = 'http://10.0.0.101/30000/01'
response = requests.get(url)
# Wait
time.sleep(1)
# Turn it off again
url = 'http://10.0.0.101/30000/00'
response = requests.get(url)
The obsyClose() function is pretty much identical. Once I get my controller back in pace, the INDI code will replace the temporary code to activate the roof.
https://evan.kj7bre.com/blog/weather-station/