"""
GSC Software: symbolic encoding tools
Author: Nick Becker and Pyeong Whan Cho
Department of Cognitive Science, Johns Hopkins University
Summer 2015
NOTE (7/19/15): These functions are not extensively tested!
"""
import sys
import numpy as np
from numbers import Number
TP_SYMBOL = '@'
ADD_SYMBOL = '+'
SCALE_SYMBOL = '*'
COMPRESS_SYMBOL = '$'
def encode_symbols(nsymbols, reptype='local', dp=0, ndim=None):
"""
Encodes symbols as vectors.
"""
if reptype == 'local':
sym_mat = np.eye(nsymbols)
else:
if ndim==None:
ndim = nsymbols
if isinstance(dp, Number):
sym_mat = dot_products(nsymbols, ndim, dp)
else:
sym_mat = dot_products2(nsymbols, ndim, dp)
return sym_mat
def dot_products(nsymbols, ndim, s):
"""
Convert s to a matrix, then call the real dot product function...
"""
dp_mat = s * np.ones((nsymbols, nsymbols)) + (1-s) * np.eye(nsymbols, nsymbols)
sym_mat = dot_products2(nsymbols, ndim, dp_mat)
return sym_mat
def dot_products2(nsymbols, ndim, dp_mat):
"""
Given square matrix dpMatrix of dimension N-by-N, find N
dim-dimensional unit vectors whose pairwise dot products match
dpMatrix. Results are returned in the columns of M. itns is the
number of iterations of search required, and may be ignored.
Algorithm: Find a matrix M such that M'*M = dpMatrix. This is done
via gradient descent on a cost function that is the square of the
frobenius norm of (M'*M-dpMatrix).
NOTE: It has trouble finding more than about 16 vectors, possibly for
dumb numerical reasons (like stepsize and tolerance), which might be
fixable if necessary.
"""
if not (dp_mat.T == dp_mat).all():
sys.exit('dot_products2: dp_mat must be symmetric')
if (np.diag(dp_mat) != 1).any():
sys.exit('dot_products2: dp_mat must have all ones on the main diagonal')
sym_mat = np.random.uniform(size=ndim*nsymbols).reshape(ndim, nsymbols, order='F')
min_step = .1
tol = 1e-6
max_iter = 100000
converged = False
for iter_num in range(1, max_iter+1):
inc = sym_mat.dot(sym_mat.T.dot(sym_mat) - dp_mat)
step = min(min_step, .01/abs(inc).max())
sym_mat = sym_mat - step * inc
max_diff = abs(sym_mat.T.dot(sym_mat)-dp_mat).max()
if max_diff <= tol:
converged = True
break
if not converged:
print("Didn't converge after %d iterations" % max_iter)
return sym_mat
def add_symbols(names, colvecs, symdict):
"""
Adds symbols and corresponding vectors to a dictionary.
"""
for ii in range(len(names)):
symdict[names[ii]] = colvecs[:, ii]
return symdict
def interpretEqn(eqn, symdict, c34=False):
"""
Prepares a symbolic equation for interpretation using the recursive function
evaluate().
If c34 is set to 'True', then compress third and fourth vectors. This is a
temporary fix!!!
"""
eqnList = eqn.strip().split()
tempSymDict = symdict.copy()
newEqnList = []
for i in range(len(eqnList)):
currentSymbol1 = eqnList[i]
toInsert_1of3 = currentSymbol1.split('(')
for j in range(len(toInsert_1of3)-1):
toInsert_1of3.insert(j+1, '(')
for k in range(len(toInsert_1of3)):
currentSymbol2 = toInsert_1of3[k]
toInsert_2of3 = currentSymbol2.split(')')
for l in range(len(toInsert_2of3)-1):
toInsert_2of3.insert(l+1, ')')
for m in range(len(toInsert_2of3)):
currentSymbol3 = toInsert_2of3[m]
toInsert_3of3 = currentSymbol3.split(COMPRESS_SYMBOL)
for n in range(len(toInsert_3of3)-1):
toInsert_3of3.insert(n+1, COMPRESS_SYMBOL)
newEqnList += toInsert_3of3
eqnList = [x for x in newEqnList if x != '']
return evaluate(eqnList, tempSymDict)
def evaluate(symEqn, symdict, idNo=0):
"""
Recursive function to perform symbolc computation.
"""
if '(' not in symEqn:
while SCALE_SYMBOL in symEqn:
index = symEqn.index(SCALE_SYMBOL)
if index-1 < 0 or index+1 > len(symEqn):
sys.exit('Error: missing operand')
else:
newName = 'scaled' + str(idNo); idNo += 1
symdict[newName] = int(symEqn[index-1]) * symdict[symEqn[index+1]]
del symEqn[index-1:index+2]
symEqn.insert(index-1, newName)
while TP_SYMBOL in symEqn:
index = symEqn.index(TP_SYMBOL)
if index-1 < 0 or index+1 > len(symEqn):
sys.exit('Error: missing operand')
else:
newName = 'prod' + str(idNo); idNo += 1
symdict[newName] = np.tensordot(symdict[symEqn[index-1]], symdict[symEqn[index+1]], axes=0)
del symEqn[index-1:index+2]
symEqn.insert(index-1, newName)
while ADD_SYMBOL in symEqn:
index = symEqn.index(ADD_SYMBOL)
if index-1 < 0 or index+1 > len(symEqn):
sys.exit('Error: missing operand')
else:
newName = 'sum' + str(idNo); idNo += 1
symdict[newName] = (symdict[symEqn[index-1]] + symdict[symEqn[index+1]])
del symEqn[index-1:index+2]
symEqn.insert(index-1, newName)
return symdict[symEqn[0]]
else:
start = symEqn.index('(')
end = getClosingParenIndex(symEqn, start)
if COMPRESS_SYMBOL in symEqn:
cSymIndex = symEqn.index(COMPRESS_SYMBOL)
compressIndices = list(symEqn[cSymIndex+1])
toCompress = evaluate(symEqn[start+1:end-1], symdict, idNo)
nDim = len(toCompress.shape)
startIndex = 'a'
einSumString = []
for i in range(nDim):
einSumString.append(chr(ord(startIndex) + i))
compressIndices = list(map(int, compressIndices))
setIndexTo = einSumString[compressIndices[0]]
for i in range(len(compressIndices)):
einSumString[compressIndices[i]-1] = setIndexTo
einSumString = ''.join(einSumString)
newName = 'compress' + str(idNo); idNo += 1
symdict[newName] = np.einsum(einSumString, toCompress)
del symEqn[cSymIndex:end]
symEqn.insert(cSymIndex, newName)
else:
newName = 'paren' + str(idNo); idNo += 1
symdict[newName] = evaluate(symEqn[start+1:end-1], symdict, idNo)
del symEqn[start:end]
symEqn.insert(start, newName)
return evaluate(symEqn, symdict, idNo)
def getClosingParenIndex(stringExpr, openParenIndex):
"""
Helper function to get index of end paren corresponding to openParenIndex.
"""
balance = 1
end = openParenIndex + 1
while end < len(stringExpr) and balance != 0:
if stringExpr[end] == '(':
balance += 1
elif stringExpr[end] == ')':
balance -= 1
end += 1
return end